From 5f7278b9c4bbfb385b1f2b132ead741a7021ca4f Mon Sep 17 00:00:00 2001 From: jubb Date: Thu, 15 Jul 2021 12:00:30 +1000 Subject: [PATCH] refactor: OpenGroupPollerV2.kt no longer queues jobs and executes synchronously, BackgroundPollWorker.kt no longer replaces periodic tasks but keeps existing ones, removing unused references --- .../notifications/BackgroundPollWorker.kt | 4 ++-- .../messaging/jobs/MessageReceiveJob.kt | 4 +--- .../sending_receiving/MessageReceiver.kt | 3 +-- .../pollers/OpenGroupPollerV2.kt | 22 +++++-------------- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt index 1674caf629..afd4ad74a8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt @@ -25,12 +25,12 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor @JvmStatic fun schedulePeriodic(context: Context) { Log.v(TAG, "Scheduling periodic work.") - val builder = PeriodicWorkRequestBuilder(5, TimeUnit.MINUTES) + val builder = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES) builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) val workRequest = builder.build() WorkManager.getInstance(context).enqueueUniquePeriodicWork( TAG, - ExistingPeriodicWorkPolicy.REPLACE, + ExistingPeriodicWorkPolicy.KEEP, workRequest ) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index 9a2692a97f..923e48a83f 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -21,8 +21,6 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long? // Keys used for database storage private val DATA_KEY = "data" - // FIXME: We probably shouldn't be using this job when background polling - private val IS_BACKGROUND_POLL_KEY = "is_background_poll" private val OPEN_GROUP_MESSAGE_SERVER_ID_KEY = "openGroupMessageServerID" private val OPEN_GROUP_ID_KEY = "open_group_id" } @@ -35,7 +33,7 @@ class MessageReceiveJob(val data: ByteArray, val openGroupMessageServerID: Long? val deferred = deferred() try { val isRetry: Boolean = failureCount != 0 - val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, isRetry) + val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID) synchronized(RECEIVE_LOCK) { // FIXME: Do we need this? MessageReceiver.handle(message, proto, this.openGroupID) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt index 29bcfc36c2..99ad38bd5c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageReceiver.kt @@ -4,7 +4,6 @@ import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.control.* import org.session.libsession.messaging.messages.visible.VisibleMessage -import org.session.libsession.utilities.GroupUtil import org.session.libsignal.crypto.PushTransportDetails import org.session.libsignal.protos.SignalServiceProtos @@ -32,7 +31,7 @@ object MessageReceiver { } } - internal fun parse(data: ByteArray, openGroupServerID: Long?, isRetry: Boolean = false): Pair { + internal fun parse(data: ByteArray, openGroupServerID: Long?): Pair { val storage = MessagingModuleConfiguration.shared.storage val userPublicKey = storage.getUserPublicKey() val isOpenGroupMessage = (openGroupServerID != null) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt index b08fa66f34..aadb73b9e1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -8,6 +8,8 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.TrimThreadJob import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 +import org.session.libsession.messaging.sending_receiving.MessageReceiver +import org.session.libsession.messaging.sending_receiving.handle import org.session.libsession.utilities.Address import org.session.libsession.utilities.GroupUtil import org.session.libsignal.protos.SignalServiceProtos @@ -64,7 +66,6 @@ class OpenGroupPollerV2(private val server: String, private val executorService: val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1 val threadExists = threadId >= 0 if (!hasStarted || !threadExists) { return } - var latestJob: MessageReceiveJob? = null messages.sortedBy { it.serverID!! }.forEach { message -> try { val senderPublicKey = message.sender!! @@ -75,20 +76,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService: builder.content = message.toProto().toByteString() builder.timestamp = message.sentTimestamp val envelope = builder.build() - val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, openGroupID) - if (isBackgroundPoll) { - job.executeAsync() - } else { - JobQueue.shared.add(job) - if (!isCaughtUp) { - secondToLastJob = latestJob - } - latestJob = job - } + val (parsedMessage, content) = MessageReceiver.parse(envelope.toByteArray(), message.serverID) + MessageReceiver.handle(parsedMessage, content, openGroupID) } catch (e: Exception) { Log.e("Loki", "Exception parsing message", e) } } + val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0 val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID) if (actualMax > 0) { @@ -105,11 +99,7 @@ class OpenGroupPollerV2(private val server: String, private val executorService: val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) val threadID = storage.getThreadId(Address.fromSerialized(groupID)) ?: return val deletedMessageIDs = deletions.mapNotNull { deletion -> - val messageID = dataProvider.getMessageID(deletion.deletedMessageServerID, threadID) - if (messageID == null) { - Log.d("Loki", "Couldn't find message ID for message with serverID: ${deletion.deletedMessageServerID}.") - } - messageID + dataProvider.getMessageID(deletion.deletedMessageServerID, threadID) } deletedMessageIDs.forEach { (messageId, isSms) -> MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)