From 3a10d8c1b40650185e8c62cf750311185226943a Mon Sep 17 00:00:00 2001 From: Brice Date: Tue, 15 Dec 2020 15:50:15 +1100 Subject: [PATCH] MessageReceive & Send Jobs implementations --- .../messaging/jobs/MessageReceiveJob.kt | 42 ++++++++++++++- .../messaging/jobs/MessageSendJob.kt | 52 ++++++++++++++++++- 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index 5f9ee29c6b..126a2a91ad 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -1,6 +1,13 @@ package org.session.libsession.messaging.jobs +import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.deferred +import org.session.libsession.messaging.sending_receiving.MessageReceiver +import org.session.libsession.messaging.sending_receiving.handle +import org.session.libsignal.libsignal.logging.Log + class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val openGroupMessageServerID: Long? = null, val openGroupID: String? = null) : Job { + override var delegate: JobDelegate? = null override var id: String? = null override var failureCount: Int = 0 @@ -8,10 +15,43 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val // Settings override val maxFailureCount: Int = 10 companion object { + val TAG = MessageReceiveJob::class.qualifiedName + val collection: String = "MessageReceiveJobCollection" } override fun execute() { - TODO("Not yet implemented") + exec() + } + + fun exec(): Promise { + val deferred = deferred() + try { + val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID) + MessageReceiver.handle(message, proto, this.openGroupID) + this.handleSuccess() + deferred.resolve(Unit) + } catch (e: Exception) { + Log.d(TAG, "Couldn't receive message due to error: $e.") + val error = e as? MessageReceiver.Error + error?.let { + if (!error.isRetryable) this.handlePermanentFailure(error) + } + this.handleFailure(e) + deferred.resolve(Unit) // The promise is just used to keep track of when we're done + } + return deferred.promise + } + + private fun handleSuccess() { + delegate?.handleJobSucceeded(this) + } + + private fun handlePermanentFailure(e: Exception) { + delegate?.handleJobFailedPermanently(this, e) + } + + private fun handleFailure(e: Exception) { + delegate?.handleJobFailed(this, e) } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 71cab56c30..5d707d5e93 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -1,9 +1,15 @@ package org.session.libsession.messaging.jobs +import org.session.libsession.messaging.MessagingConfiguration import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Message +import org.session.libsession.messaging.messages.visible.VisibleMessage +import org.session.libsession.messaging.sending_receiving.MessageSender +import org.session.libsignal.libsignal.logging.Log +import org.session.libsignal.service.internal.push.SignalServiceProtos class MessageSendJob(val message: Message, val destination: Destination) : Job { + override var delegate: JobDelegate? = null override var id: String? = null override var failureCount: Int = 0 @@ -11,10 +17,54 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { // Settings override val maxFailureCount: Int = 10 companion object { + val TAG = MessageSendJob::class.qualifiedName + val collection: String = "MessageSendJobCollection" } override fun execute() { - TODO("Not yet implemented") + val messageDataProvider = MessagingConfiguration.shared.messageDataProvider + val message = message as? VisibleMessage + message?.let { + if(!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted + val attachments = message.attachmentIDs.map { messageDataProvider.getAttachment(it) }.filterNotNull() + val attachmentsToUpload = attachments.filter { !it.isUploaded } + attachmentsToUpload.forEach { + if(MessagingConfiguration.shared.storage.getAttachmentUploadJob(it.attachmentId) != null) { + // Wait for it to finish + } else { + val job = AttachmentUploadJob(it.attachmentId, message.threadID!!, message, id!!) + JobQueue.shared.add(job) + } + } + if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing + } + MessageSender.send(this.message, this.destination).success { + this.handleSuccess() + }.fail { exception -> + Log.e(TAG, "Couldn't send message due to error: $exception.") + val e = exception as? MessageSender.Error + e?.let { + if (!e.isRetryable) this.handlePermanentFailure(e) + } + this.handleFailure(exception) + } + } + + private fun handleSuccess() { + delegate?.handleJobSucceeded(this) + } + + private fun handlePermanentFailure(error: Exception) { + delegate?.handleJobFailedPermanently(this, error) + } + + private fun handleFailure(error: Exception) { + Log.w(TAG, "Failed to send $message::class.simpleName.") + val message = message as? VisibleMessage + message?.let { + if(!MessagingConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted + } + delegate?.handleJobFailed(this, error) } } \ No newline at end of file