From ad7792f6598b74a89c1679d2a83b171d52eafc57 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Mon, 24 Feb 2025 11:33:59 +1100 Subject: [PATCH] Rework group poller lifecycle so that it supports polling once (#967) --- app/build.gradle | 1 + app/src/main/AndroidManifest.xml | 18 +- .../securesms/ApplicationContext.java | 21 +- .../securesms/database/Storage.kt | 2 +- .../dependencies/SessionUtilModule.kt | 1 - .../securesms/groups/ExpiredGroupManager.kt | 3 +- .../securesms/groups/GroupManagerV2Impl.kt | 9 +- .../securesms/groups/GroupPoller.kt | 467 ++++++++++++++++++ .../securesms/groups/GroupPollerManager.kt | 104 ++-- .../notifications/BackgroundPollManager.kt | 65 +++ .../notifications/BackgroundPollWorker.kt | 205 ++++---- .../securesms/service/WebRtcCallService.kt | 2 +- .../libsession/database/StorageProtocol.kt | 2 +- .../ReceivedMessageHandler.kt | 2 +- .../notifications/PushRegistryV1.kt | 2 +- .../pollers/ClosedGroupPoller.kt | 398 --------------- .../pollers/LegacyClosedGroupPollerV2.kt | 2 +- 17 files changed, 743 insertions(+), 561 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollManager.kt delete mode 100644 libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt diff --git a/app/build.gradle b/app/build.gradle index d7925dcec3..d29be856c5 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -266,6 +266,7 @@ dependencies { ksp("com.google.dagger:hilt-compiler:$daggerHiltVersion") ksp("com.github.bumptech.glide:ksp:$glideVersion") implementation("androidx.hilt:hilt-navigation-compose:$androidxHiltVersion") + implementation("androidx.hilt:hilt-work:$androidxHiltVersion") implementation("com.google.dagger:hilt-android:$daggerHiltVersion") implementation "androidx.appcompat:appcompat:$appcompatVersion" diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml index ff8031044f..295d20ad21 100644 --- a/app/src/main/AndroidManifest.xml +++ b/app/src/main/AndroidManifest.xml @@ -447,7 +447,7 @@ @@ -473,6 +473,22 @@ + + + + + + + + + \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 683c4d943a..270b30fe43 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -34,9 +34,11 @@ import androidx.annotation.StringRes; import androidx.core.content.pm.ShortcutInfoCompat; import androidx.core.content.pm.ShortcutManagerCompat; import androidx.core.graphics.drawable.IconCompat; +import androidx.hilt.work.HiltWorkerFactory; import androidx.lifecycle.DefaultLifecycleObserver; import androidx.lifecycle.LifecycleOwner; import androidx.lifecycle.ProcessLifecycleOwner; +import androidx.work.Configuration; import com.squareup.phrase.Phrase; @@ -91,6 +93,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; import org.thoughtcrime.securesms.logging.AndroidLogger; import org.thoughtcrime.securesms.logging.PersistentLogger; import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger; +import org.thoughtcrime.securesms.notifications.BackgroundPollManager; import org.thoughtcrime.securesms.notifications.BackgroundPollWorker; import org.thoughtcrime.securesms.notifications.NotificationChannels; import org.thoughtcrime.securesms.notifications.PushRegistrationHandler; @@ -116,6 +119,7 @@ import java.util.Timer; import java.util.concurrent.Executors; import javax.inject.Inject; +import javax.inject.Provider; import dagger.Lazy; import dagger.hilt.EntryPoints; @@ -134,7 +138,7 @@ import network.loki.messenger.R; * @author Moxie Marlinspike */ @HiltAndroidApp -public class ApplicationContext extends Application implements DefaultLifecycleObserver, Toaster { +public class ApplicationContext extends Application implements DefaultLifecycleObserver, Toaster, Configuration.Provider { public static final String PREFERENCES_NAME = "SecureSMS-Preferences"; @@ -147,6 +151,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO private Handler conversationListHandler; private PersistentLogger persistentLogger; + @Inject HiltWorkerFactory workerFactory; @Inject LokiAPIDatabase lokiAPIDatabase; @Inject public Storage storage; @Inject Device device; @@ -176,6 +181,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO @Inject LegacyClosedGroupPollerV2 legacyClosedGroupPollerV2; @Inject LegacyGroupDeprecationManager legacyGroupDeprecationManager; @Inject CleanupInvitationHandler cleanupInvitationHandler; + @Inject BackgroundPollManager backgroundPollManager; // Exists here only to start upon app starts @Inject AppVisibilityManager appVisibilityManager; // Exists here only to start upon app starts @Inject GroupPollerManager groupPollerManager; // Exists here only to start upon app starts @Inject ExpiredGroupManager expiredGroupManager; // Exists here only to start upon app starts @@ -282,7 +288,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO broadcaster = new Broadcaster(this); boolean useTestNet = textSecurePreferences.getEnvironment() == Environment.TEST_NET; SnodeModule.Companion.configure(apiDB, broadcaster, useTestNet); - initializePeriodicTasks(); SSKEnvironment.Companion.configure(typingStatusRepository, readReceiptManager, profileManager, getMessageNotifier(), expiringMessageManager); initializeWebRtc(); initializeBlobProvider(); @@ -318,6 +323,14 @@ public class ApplicationContext extends Application implements DefaultLifecycleO } } + @NonNull + @Override + public Configuration getWorkManagerConfiguration() { + return new Configuration.Builder() + .setWorkerFactory(workerFactory) + .build(); + } + @Override public void onStart(@NonNull LifecycleOwner owner) { isAppVisible = true; @@ -434,10 +447,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionLogger(originalHandler)); } - private void initializePeriodicTasks() { - BackgroundPollWorker.schedulePeriodic(this); - } - private void initializeWebRtc() { try { PeerConnectionFactory.initialize(InitializationOptions.builder(this).createInitializationOptions()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 1cb6590369..54466c5e90 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -973,7 +973,7 @@ open class Storage @Inject constructor( return lokiAPIDatabase.getLatestClosedGroupEncryptionKeyPair(groupPublicKey) } - override fun getAllClosedGroupPublicKeys(): Set { + override fun getAllLegacyGroupPublicKeys(): Set { return lokiAPIDatabase.getAllClosedGroupPublicKeys() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt index 9be41c1191..521a3a621d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt @@ -13,7 +13,6 @@ import kotlinx.coroutines.GlobalScope import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupScope import org.session.libsession.messaging.groups.LegacyGroupDeprecationManager -import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2 import org.session.libsession.snode.SnodeClock import org.session.libsession.utilities.TextSecurePreferences diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/ExpiredGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/ExpiredGroupManager.kt index 0483d748b2..f2ea8f09cb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/ExpiredGroupManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/ExpiredGroupManager.kt @@ -6,7 +6,6 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.stateIn -import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Log import javax.inject.Inject @@ -27,7 +26,7 @@ class ExpiredGroupManager @Inject constructor( @Suppress("OPT_IN_USAGE") val expiredGroups: StateFlow> = pollerManager.watchAllGroupPollingState() .mapNotNull { (groupId, state) -> - val expired = (state as? ClosedGroupPoller.StartedState)?.expired + val expired = state.lastPoll?.groupExpired if (expired == null) { // Poller doesn't know about the expiration state yet, so we skip diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index 01ac480971..3025bd3965 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout @@ -30,7 +29,6 @@ import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.control.GroupUpdated import org.session.libsession.messaging.messages.visible.Profile import org.session.libsession.messaging.sending_receiving.MessageSender -import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.messaging.utilities.MessageAuthentication.buildDeleteMemberContentSignature import org.session.libsession.messaging.utilities.MessageAuthentication.buildInfoChangeSignature import org.session.libsession.messaging.utilities.MessageAuthentication.buildMemberChangeSignature @@ -676,10 +674,13 @@ class GroupManagerV2Impl @Inject constructor( // We need to wait until we have the first data polled from the poller, otherwise // we won't have the necessary configs to send invite response/or do anything else. - // We can't hang on here forever if things don't work out, bail out if it's the camse + // We can't hang on here forever if things don't work out, bail out if it's the case. withTimeout(20_000L) { + // We must tell the poller to poll once, as we could have received this invitation + // in the background where the poller isn't running + groupPollerManager.pollOnce(group.groupAccountId) + groupPollerManager.watchGroupPollingState(group.groupAccountId) - .filterIsInstance() .filter { it.hadAtLeastOneSuccessfulPoll } .first() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt new file mode 100644 index 0000000000..7103779405 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt @@ -0,0 +1,467 @@ +package org.thoughtcrime.securesms.groups + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.stateIn +import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope +import network.loki.messenger.libsession_util.util.Sodium +import org.session.libsession.database.StorageProtocol +import org.session.libsession.messaging.groups.GroupManagerV2 +import org.session.libsession.messaging.jobs.BatchMessageReceiveJob +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveParameters +import org.session.libsession.messaging.messages.Destination +import org.session.libsession.snode.RawResponse +import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.SnodeClock +import org.session.libsession.snode.model.RetrieveMessageResponse +import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigMessage +import org.session.libsession.utilities.getGroup +import org.session.libsignal.database.LokiAPIDatabaseProtocol +import org.session.libsignal.exceptions.NonRetryableException +import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.IdPrefix +import org.session.libsignal.utilities.Log +import org.session.libsignal.utilities.Namespace +import org.session.libsignal.utilities.Snode +import org.thoughtcrime.securesms.util.AppVisibilityManager +import java.time.Instant +import kotlin.coroutines.cancellation.CancellationException +import kotlin.time.Duration.Companion.days + +class GroupPoller( + scope: CoroutineScope, + private val groupId: AccountId, + private val configFactoryProtocol: ConfigFactoryProtocol, + private val groupManagerV2: GroupManagerV2, + private val storage: StorageProtocol, + private val lokiApiDatabase: LokiAPIDatabaseProtocol, + private val clock: SnodeClock, + private val appVisibilityManager: AppVisibilityManager, +) { + companion object { + private const val POLL_INTERVAL = 3_000L + + private const val TAG = "GroupPoller" + } + + data class State( + val hadAtLeastOneSuccessfulPoll: Boolean = false, + val lastPoll: PollResult? = null, + val inProgress: Boolean = false, + ) + + data class PollResult( + val startedAt: Instant, + val finishedAt: Instant, + val result: Result, + val groupExpired: Boolean? + ) { + fun hasNonRetryableError(): Boolean { + val e = result.exceptionOrNull() + return e != null && (e is NonRetryableException || e is CancellationException) + } + } + + private class InternalPollState( + var swarmNodes: MutableSet = mutableSetOf(), + var currentSnode: Snode? = null, + ) + + // A channel to send tokens to trigger a poll + private val pollOnceTokens = Channel() + + // A flow that represents the state of the poller. + val state: StateFlow = flow { + var lastState = State() + val pendingTokens = mutableListOf() + val internalPollState = InternalPollState() + + while (true) { + pendingTokens.add(pollOnceTokens.receive()) + + // Drain all the tokens we've received up to this point, so we can reply them all at once + while (true) { + val result = pollOnceTokens.tryReceive() + result.getOrNull()?.let(pendingTokens::add) ?: break + } + + lastState = lastState.copy(inProgress = true).also { emit(it) } + + val pollResult = doPollOnce(internalPollState) + + lastState = lastState.copy( + hadAtLeastOneSuccessfulPoll = lastState.hadAtLeastOneSuccessfulPoll || pollResult.result.isSuccess, + lastPoll = pollResult, + inProgress = false + ).also { emit(it) } + + // Notify all pending tokens + pendingTokens.forEach { it.resultCallback.send(pollResult) } + pendingTokens.clear() + } + }.stateIn(scope, SharingStarted.Eagerly, State()) + + init { + // This coroutine is here to periodically request polling the group when the app + // becomes visible + scope.launch { + while (true) { + // Wait for the app becomes visible + appVisibilityManager.isAppVisible.first { visible -> visible } + + // As soon as the app becomes visible, start polling + if (requestPollOnce().hasNonRetryableError()) { + Log.v(TAG, "Error polling group $groupId and stopped polling") + break + } + + // As long as the app is visible, keep polling + while (true) { + // Wait POLL_INTERVAL + delay(POLL_INTERVAL) + + val appInBackground = !appVisibilityManager.isAppVisible.value + + if (appInBackground) { + Log.d(TAG, "App became invisible, stopping polling group $groupId") + break + } + + if (requestPollOnce().hasNonRetryableError()) { + Log.v(TAG, "Error polling group $groupId and stopped polling") + return@launch + } + } + } + } + } + + /** + * Request to poll the group once and return the result. It's guaranteed that + * the poll will be run AT LEAST once after the request is sent, but it's not guaranteed + * that one request will result in one poll, as the poller may choose to batch multiple requests + * together. + */ + suspend fun requestPollOnce(): PollResult { + val resultChannel = Channel() + pollOnceTokens.send(PollOnceToken(resultChannel)) + return resultChannel.receive() + } + + private suspend fun doPollOnce(pollState: InternalPollState): PollResult { + val pollStartedAt = Instant.now() + var groupExpired: Boolean? = null + + val result = runCatching { + supervisorScope { + // Grab current snode or pick (and remove) a random one from the pool + val snode = pollState.currentSnode ?: run { + if (pollState.swarmNodes.isEmpty()) { + Log.d(TAG, "Fetching swarm nodes for $groupId") + pollState.swarmNodes.addAll(SnodeAPI.fetchSwarmNodes(groupId.hexString)) + } + + check(pollState.swarmNodes.isNotEmpty()) { "No swarm nodes found" } + pollState.swarmNodes.random().also { + pollState.currentSnode = it + pollState.swarmNodes.remove(it) + } + } + + val groupAuth = + configFactoryProtocol.getGroupAuth(groupId) ?: return@supervisorScope + val configHashesToExtends = configFactoryProtocol.withGroupConfigs(groupId) { + buildSet { + addAll(it.groupKeys.currentHashes()) + addAll(it.groupInfo.currentHashes()) + addAll(it.groupMembers.currentHashes()) + } + } + + val group = configFactoryProtocol.getGroup(groupId) + if (group == null) { + throw NonRetryableException("Group doesn't exist") + } + + if (group.kicked) { + throw NonRetryableException("Group has been kicked") + } + + val adminKey = group.adminKey + + val pollingTasks = mutableListOf>>() + + val receiveRevokeMessage = async { + SnodeAPI.sendBatchRequest( + snode, + groupId.hexString, + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + groupId.hexString, + Namespace.REVOKED_GROUP_MESSAGES() + ).orEmpty(), + auth = groupAuth, + namespace = Namespace.REVOKED_GROUP_MESSAGES(), + maxSize = null, + ), + RetrieveMessageResponse::class.java + ).messages.filterNotNull() + } + + if (configHashesToExtends.isNotEmpty() && adminKey != null) { + pollingTasks += "extending group config TTL" to async { + SnodeAPI.sendBatchRequest( + snode, + groupId.hexString, + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = configHashesToExtends.toList(), + auth = groupAuth, + newExpiry = clock.currentTimeMills() + 14.days.inWholeMilliseconds, + extend = true + ), + ) + } + } + + val groupMessageRetrieval = async { + val lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + groupId.hexString, + Namespace.CLOSED_GROUP_MESSAGES() + ).orEmpty() + + Log.d(TAG, "Retrieving group message since lastHash = $lastHash") + + SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = groupId.hexString, + request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lastHash, + auth = groupAuth, + namespace = Namespace.CLOSED_GROUP_MESSAGES(), + maxSize = null, + ), + responseType = Map::class.java + ) + } + + val groupConfigRetrieval = listOf( + Namespace.ENCRYPTION_KEYS(), + Namespace.CLOSED_GROUP_INFO(), + Namespace.CLOSED_GROUP_MEMBERS() + ).map { ns -> + async { + SnodeAPI.sendBatchRequest( + snode = snode, + publicKey = groupId.hexString, + request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode, + groupId.hexString, + ns + ).orEmpty(), + auth = groupAuth, + namespace = ns, + maxSize = null, + ), + responseType = RetrieveMessageResponse::class.java + ).messages.filterNotNull() + } + } + + // The retrieval of the all group messages can be done concurrently, + // however, in order for the messages to be able to be decrypted, the config messages + // must be processed first. + pollingTasks += "polling and handling group config keys and messages" to async { + val result = runCatching { + val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() } + handleGroupConfigMessages(keysMessage, infoMessage, membersMessage) + saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS()) + saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO()) + saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS()) + + groupExpired = configFactoryProtocol.withGroupConfigs(groupId) { + it.groupKeys.size() == 0 + } + + val regularMessages = groupMessageRetrieval.await() + handleMessages(regularMessages, snode) + } + + // Revoke message must be handled regardless, and at the end + val revokedMessages = receiveRevokeMessage.await() + handleRevoked(revokedMessages) + saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES()) + + // Propagate any prior exceptions + result.getOrThrow() + } + + // Wait for all tasks to complete, gather any exceptions happened during polling + val errors = pollingTasks.mapNotNull { (name, task) -> + runCatching { task.await() } + .exceptionOrNull() + ?.takeIf { it !is CancellationException } + ?.let { RuntimeException("Error $name", it) } + } + + // If there were any errors, throw the first one and add the rest as "suppressed" exceptions + if (errors.isNotEmpty()) { + throw errors.first().apply { + for (index in 1 until errors.size) { + addSuppressed(errors[index]) + } + } + } + } + } + + if (result.isFailure) { + val error = result.exceptionOrNull() + Log.e(TAG, "Error polling group", error) + + if (error !is NonRetryableException && error !is CancellationException) { + // If the error can be retried, reset the current snode so we use another one + pollState.currentSnode = null + } + } + + val pollResult = PollResult( + startedAt = pollStartedAt, + finishedAt = Instant.now(), + result = result, + groupExpired = groupExpired + ) + + Log.d(TAG, "Polling group $groupId result = $pollResult") + + return pollResult + } + + private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage { + return ConfigMessage(hash, data, timestamp ?: clock.currentTimeMills()) + } + + private fun saveLastMessageHash( + snode: Snode, + messages: List, + namespace: Int + ) { + if (messages.isNotEmpty()) { + lokiApiDatabase.setLastMessageHashValue( + snode = snode, + publicKey = groupId.hexString, + newValue = messages.last().hash, + namespace = namespace + ) + } + } + + private suspend fun handleRevoked(messages: List) { + messages.forEach { msg -> + val decoded = configFactoryProtocol.decryptForUser( + msg.data, + Sodium.KICKED_DOMAIN, + groupId, + ) + + if (decoded != null) { + // The message should be in the format of "", + // where the pub key is 32 bytes, so we need to have at least 33 bytes of data + if (decoded.size < 33) { + Log.w(TAG, "Received an invalid kicked message, expecting at least 33 bytes, got ${decoded.size}") + return@forEach + } + + val sessionId = AccountId(IdPrefix.STANDARD, decoded.copyOfRange(0, 32)) + val messageGeneration = decoded.copyOfRange(32, decoded.size).decodeToString().toIntOrNull() + if (messageGeneration == null) { + Log.w(TAG, "Received an invalid kicked message: missing message generation") + return@forEach + } + + val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(groupId) { + it.groupKeys.currentGeneration() + } + + val isForMe = sessionId.hexString == storage.getUserPublicKey() + Log.d(TAG, "Received kicked message, for us? ${isForMe}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration") + + if (isForMe && messageGeneration >= currentKeysGeneration) { + groupManagerV2.handleKicked(groupId) + } + } + } + } + + private fun handleGroupConfigMessages( + keysResponse: List, + infoResponse: List, + membersResponse: List + ) { + if (keysResponse.isEmpty() && infoResponse.isEmpty() && membersResponse.isEmpty()) { + return + } + + Log.d( + TAG, "Handling group config messages(" + + "info = ${infoResponse.size}, " + + "keys = ${keysResponse.size}, " + + "members = ${membersResponse.size})" + ) + + configFactoryProtocol.mergeGroupConfigMessages( + groupId = groupId, + keys = keysResponse.map { it.toConfigMessage() }, + info = infoResponse.map { it.toConfigMessage() }, + members = membersResponse.map { it.toConfigMessage() }, + ) + } + + private fun handleMessages(body: RawResponse, snode: Snode) { + val messages = configFactoryProtocol.withGroupConfigs(groupId) { + SnodeAPI.parseRawMessagesResponse( + rawResponse = body, + snode = snode, + publicKey = groupId.hexString, + decrypt = it.groupKeys::decrypt, + namespace = Namespace.CLOSED_GROUP_MESSAGES(), + ) + } + + val parameters = messages.map { (envelope, serverHash) -> + MessageReceiveParameters( + envelope.toByteArray(), + serverHash = serverHash, + closedGroup = Destination.ClosedGroup(groupId.hexString) + ) + } + + parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> + val job = BatchMessageReceiveJob(chunk) + JobQueue.shared.add(job) + } + + if (messages.isNotEmpty()) { + Log.d(TAG, "Received and handled ${messages.size} group messages") + } + } + + /** + * A token to poll a group once and receive the result. Note that it's not guaranteed that + * one token will trigger one poll, as the poller may batch multiple requests together. + */ + private data class PollOnceToken(val resultCallback: SendChannel) +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt index 02080c70bd..3c48cdf30f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt @@ -1,26 +1,31 @@ package org.thoughtcrime.securesms.groups import dagger.Lazy -import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.stateIn +import kotlinx.coroutines.supervisorScope import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 -import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.snode.SnodeClock import org.session.libsession.utilities.ConfigUpdateNotification import org.session.libsession.utilities.TextSecurePreferences @@ -28,26 +33,26 @@ import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Log import org.thoughtcrime.securesms.dependencies.ConfigFactory -import org.thoughtcrime.securesms.dependencies.POLLER_SCOPE import org.thoughtcrime.securesms.util.AppVisibilityManager +import org.thoughtcrime.securesms.util.InternetConnectivity import javax.inject.Inject -import javax.inject.Named import javax.inject.Singleton /** * This class manages the lifecycle of group pollers. * - * It listens for changes in the user's groups config and starts or stops pollers as needed. It - * also considers the app's visibility state to decide whether to start or stop pollers. + * It listens for changes in the user's groups config and create or destroyed pollers as needed. Other + * factors like network availability and user's local number are also taken into account. * - * All the processes here are automatic and you don't need to do anything to start or stop pollers. + * All the processes here are automatic and you don't need to do anything to create or destroy pollers. * * This class also provide state monitoring facilities to check the state of a group poller. + * + * Note that whether a [GroupPoller] is polling things or not is determined by itself. The manager + * class is only responsible for the overall lifecycle of the pollers. */ @Singleton class GroupPollerManager @Inject constructor( - @Named(POLLER_SCOPE) scope: CoroutineScope, - @Named(POLLER_SCOPE) executor: CoroutineDispatcher, configFactory: ConfigFactory, groupManagerV2: Lazy, storage: StorageProtocol, @@ -55,15 +60,14 @@ class GroupPollerManager @Inject constructor( clock: SnodeClock, preferences: TextSecurePreferences, appVisibilityManager: AppVisibilityManager, + connectivity: InternetConnectivity, ) { @Suppress("OPT_IN_USAGE") - private val activeGroupPollers: StateFlow> = + private val groupPollers: StateFlow> = combine( - preferences.watchLocalNumber(), - appVisibilityManager.isAppVisible - ) { localNumber, visible -> localNumber != null && visible } - .distinctUntilChanged() - + connectivity.networkAvailable, + preferences.watchLocalNumber() + ) { networkAvailable, localNumber -> networkAvailable && localNumber != null } // This flatMap produces a flow of groups that should be polled now .flatMapLatest { shouldPoll -> if (shouldPoll) { @@ -86,12 +90,12 @@ class GroupPollerManager @Inject constructor( // This scan compares the previous active group pollers with the incoming set of groups // that should be polled now, to work out which pollers should be started or stopped, // and finally emits the new state - .scan(emptyMap()) { previous, newActiveGroupIDs -> + .scan(emptyMap()) { previous, newActiveGroupIDs -> // Go through previous pollers and stop those that are not in the new set for ((groupId, poller) in previous) { if (groupId !in newActiveGroupIDs) { Log.d(TAG, "Stopping poller for $groupId") - poller.stop() + poller.scope.cancel() } } @@ -102,16 +106,20 @@ class GroupPollerManager @Inject constructor( if (poller == null) { Log.d(TAG, "Starting poller for $groupId") - poller = ClosedGroupPoller( - scope = scope, - executor = executor, - closedGroupSessionId = groupId, - configFactoryProtocol = configFactory, - groupManagerV2 = groupManagerV2.get(), - storage = storage, - lokiApiDatabase = lokiApiDatabase, - clock = clock, - ).also { it.start() } + val scope = CoroutineScope(Dispatchers.Default) + poller = GroupPollerHandle( + poller = GroupPoller( + scope = scope, + groupId = groupId, + configFactoryProtocol = configFactory, + groupManagerV2 = groupManagerV2.get(), + storage = storage, + lokiApiDatabase = lokiApiDatabase, + clock = clock, + appVisibilityManager = appVisibilityManager, + ), + scope = scope + ) } poller @@ -122,27 +130,55 @@ class GroupPollerManager @Inject constructor( @Suppress("OPT_IN_USAGE") - fun watchGroupPollingState(groupId: AccountId): Flow { - return activeGroupPollers + fun watchGroupPollingState(groupId: AccountId): Flow { + return groupPollers .flatMapLatest { pollers -> - pollers[groupId]?.state ?: flowOf(ClosedGroupPoller.IdleState) + pollers[groupId]?.poller?.state ?: flowOf(GroupPoller.State()) } .distinctUntilChanged() } @OptIn(ExperimentalCoroutinesApi::class) - fun watchAllGroupPollingState(): Flow> { - return activeGroupPollers + fun watchAllGroupPollingState(): Flow> { + return groupPollers .flatMapLatest { pollers -> // Merge all poller states into a single flow of (groupId, state) pairs merge( *pollers - .map { (id, poller) -> poller.state.map { state -> id to state } } - .toTypedArray() + .map { (id, poller) -> poller.poller.state.map { state -> id to state } } + .toTypedArray() ) } } + suspend fun pollAllGroupsOnce() { + supervisorScope { + groupPollers.value.values.map { + async { + it.poller.requestPollOnce() + } + }.awaitAll() + } + } + + /** + * Wait for a group to be polled once and return the poll result + * + * Note that if the group is not supposed to be polled (kicked, destroyed, etc) then + * this function will hang forever. It's your responsibility to set a timeout if needed. + */ + suspend fun pollOnce(groupId: AccountId): GroupPoller.PollResult { + return groupPollers.mapNotNull { it[groupId] } + .first() + .poller + .requestPollOnce() + } + + data class GroupPollerHandle( + val poller: GroupPoller, + val scope: CoroutineScope, + ) + companion object { private const val TAG = "GroupPollerHandler" } diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollManager.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollManager.kt new file mode 100644 index 0000000000..75d15eedfc --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollManager.kt @@ -0,0 +1,65 @@ +package org.thoughtcrime.securesms.notifications + +import android.annotation.SuppressLint +import android.app.Application +import android.content.BroadcastReceiver +import android.content.Context +import android.content.Intent +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.launch +import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.util.AppVisibilityManager +import javax.inject.Inject +import javax.inject.Singleton + +/** + * This class automatically schedules and cancels the background polling work based on the + * visibility of the app and the availability of the logged in user. + */ +@OptIn(FlowPreview::class) +@Singleton +class BackgroundPollManager @Inject constructor( + application: Application, + appVisibilityManager: AppVisibilityManager, + textSecurePreferences: TextSecurePreferences, +) { + init { + @Suppress("OPT_IN_USAGE") + GlobalScope.launch { + combine( + textSecurePreferences.watchLocalNumber(), + // Debounce to avoid rapid toggling on visible app starts + appVisibilityManager.isAppVisible.debounce(1_000L) + ) { localNumber, appVisible -> localNumber != null && !appVisible } + .distinctUntilChanged() + .collectLatest { shouldSchedule -> + if (shouldSchedule) { + Log.i(TAG, "Scheduling background polling work.") + BackgroundPollWorker.schedulePeriodic(application) + } else { + Log.i(TAG, "Cancelling background polling work.") + BackgroundPollWorker.cancelPeriodic(application) + } + } + } + } + + class BootBroadcastReceiver : BroadcastReceiver() { + @SuppressLint("UnsafeProtectedBroadcastReceiver") + override fun onReceive(context: Context, intent: Intent) { + // This broadcast receiver does nothing but to bring up the app, + // once the app is up, the `BackgroundPollWorker` will have the chance to + // schedule any background polling work accordingly + } + } + + companion object { + private const val TAG = "BackgroundPollManager" + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt index 5cdc594d2c..208c05c955 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/BackgroundPollWorker.kt @@ -1,58 +1,64 @@ package org.thoughtcrime.securesms.notifications -import android.content.BroadcastReceiver import android.content.Context -import android.content.Intent +import androidx.hilt.work.HiltWorker import androidx.work.Constraints +import androidx.work.CoroutineWorker import androidx.work.Data import androidx.work.ExistingPeriodicWorkPolicy import androidx.work.NetworkType import androidx.work.OneTimeWorkRequestBuilder import androidx.work.PeriodicWorkRequestBuilder import androidx.work.WorkManager -import androidx.work.Worker import androidx.work.WorkerParameters -import kotlinx.coroutines.GlobalScope -import nl.komponents.kovenant.Promise -import nl.komponents.kovenant.all -import nl.komponents.kovenant.functional.bind +import dagger.assisted.Assisted +import dagger.assisted.AssistedInject +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.supervisorScope +import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth -import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.groups.LegacyGroupDeprecationManager import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.MessageReceiveParameters import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2 import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller import org.session.libsession.snode.SnodeAPI -import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.snode.utilities.await -import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.recover -import org.thoughtcrime.securesms.dependencies.DatabaseComponent +import org.thoughtcrime.securesms.database.LokiThreadDatabase +import org.thoughtcrime.securesms.groups.GroupPollerManager import java.util.concurrent.TimeUnit - -class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { - enum class Targets { - DMS, CLOSED_GROUPS, OPEN_GROUPS +import kotlin.time.Duration.Companion.minutes + +@HiltWorker +class BackgroundPollWorker @AssistedInject constructor( + @Assisted val context: Context, + @Assisted params: WorkerParameters, + private val storage: StorageProtocol, + private val deprecationManager: LegacyGroupDeprecationManager, + private val lokiThreadDatabase: LokiThreadDatabase, + private val groupPollerManager: GroupPollerManager, +) : CoroutineWorker(context, params) { + enum class Target { + ONE_TO_ONE, + LEGACY_GROUPS, + GROUPS, + OPEN_GROUPS } companion object { - const val TAG = "BackgroundPollWorker" - const val INITIAL_SCHEDULE_TIME = "INITIAL_SCHEDULE_TIME" - const val REQUEST_TARGETS = "REQUEST_TARGETS" - - @JvmStatic - fun schedulePeriodic(context: Context) = schedulePeriodic(context, targets = Targets.values()) + private const val TAG = "BackgroundPollWorker" + private const val REQUEST_TARGETS = "REQUEST_TARGETS" - @JvmStatic - fun schedulePeriodic(context: Context, targets: Array) { + fun schedulePeriodic(context: Context, targets: Collection = Target.entries) { Log.v(TAG, "Scheduling periodic work.") - val durationMinutes: Long = 15 - val builder = PeriodicWorkRequestBuilder(durationMinutes, TimeUnit.MINUTES) + val interval = 15.minutes + val builder = PeriodicWorkRequestBuilder(interval.inWholeSeconds, TimeUnit.SECONDS) builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) + .setInitialDelay(interval.inWholeSeconds, TimeUnit.SECONDS) val dataBuilder = Data.Builder() - dataBuilder.putLong(INITIAL_SCHEDULE_TIME, System.currentTimeMillis() + (durationMinutes * 60 * 1000)) dataBuilder.putStringArray(REQUEST_TARGETS, targets.map { it.name }.toTypedArray()) builder.setInputData(dataBuilder.build()) @@ -64,8 +70,12 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor ) } - @JvmStatic - fun scheduleOnce(context: Context, targets: Array = Targets.values()) { + fun cancelPeriodic(context: Context) { + Log.v(TAG, "Cancelling periodic work.") + WorkManager.getInstance(context).cancelUniqueWork(TAG) + } + + fun scheduleOnce(context: Context, targets: Collection = Target.entries) { Log.v(TAG, "Scheduling single run.") val builder = OneTimeWorkRequestBuilder() builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) @@ -79,99 +89,85 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor } } - override fun doWork(): Result { - if (TextSecurePreferences.getLocalNumber(context) == null) { + override suspend fun doWork(): Result { + val userAuth = storage.userAuth + if (userAuth == null) { Log.v(TAG, "User not registered yet.") return Result.failure() } - // If this is a scheduled run and it is happening before the initial scheduled time (as - // periodic background tasks run immediately when scheduled) then don't actually do anything - // because this might slow requests on initial startup or triggered by PNs - val initialScheduleTime = inputData.getLong(INITIAL_SCHEDULE_TIME, -1) - - if (initialScheduleTime != -1L && System.currentTimeMillis() < (initialScheduleTime - (60 * 1000))) { - Log.v(TAG, "Skipping initial run.") - return Result.success() - } - // Retrieve the desired targets (defaulting to all if not provided or empty) - val requestTargets: List = (inputData.getStringArray(REQUEST_TARGETS) ?: emptyArray()) - .map { - try { Targets.valueOf(it) } - catch(e: Exception) { null } - } - .filterNotNull() - .ifEmpty { Targets.values().toList() } + val requestTargets: List = (inputData.getStringArray(REQUEST_TARGETS) ?: emptyArray()) + .map { enumValueOf(it) } try { Log.v(TAG, "Performing background poll for ${requestTargets.joinToString { it.name }}.") - val promises = mutableListOf>() - - // DMs - var dmsPromise: Promise = Promise.ofSuccess(Unit) + supervisorScope { + val tasks = mutableListOf>() + + // DMs + if (requestTargets.contains(Target.ONE_TO_ONE)) { + tasks += async { + Log.d(TAG, "Polling messages.") + val params = SnodeAPI.getMessages(userAuth).await().map { (envelope, serverHash) -> + MessageReceiveParameters(envelope.toByteArray(), serverHash, null) + } - if (requestTargets.contains(Targets.DMS)) { - val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) - dmsPromise = SnodeAPI.getMessages(userAuth).bind { envelopes -> - val params = envelopes.map { (envelope, serverHash) -> // FIXME: Using a job here seems like a bad idea... - MessageReceiveParameters(envelope.toByteArray(), serverHash, null) - } - - GlobalScope.asyncPromise { BatchMessageReceiveJob(params).executeAsync("background") } } - promises.add(dmsPromise) - } - - // Closed groups - if (requestTargets.contains(Targets.CLOSED_GROUPS)) { - val closedGroupPoller = LegacyClosedGroupPollerV2( - MessagingModuleConfiguration.shared.storage, - MessagingModuleConfiguration.shared.legacyClosedGroupPollerV2.deprecationManager - ) // Intentionally don't use shared - val storage = MessagingModuleConfiguration.shared.storage - val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys() - allGroupPublicKeys.iterator().forEach { closedGroupPoller.poll(it) } - } - // Open Groups - var ogPollError: Exception? = null + // Legacy groups + if (requestTargets.contains(Target.LEGACY_GROUPS)) { + val poller = LegacyClosedGroupPollerV2(storage, deprecationManager) - if (requestTargets.contains(Targets.OPEN_GROUPS)) { - val threadDB = DatabaseComponent.get(context).lokiThreadDatabase() - val openGroups = threadDB.getAllOpenGroups() - val openGroupServers = openGroups.map { it.value.server }.toSet() - - for (server in openGroupServers) { - val poller = OpenGroupPoller(server, null) - poller.hasStarted = true - - // If one of the open group pollers fails we don't want it to cancel the DM - // poller so just hold on to the error for later - promises.add( - poller.poll().recover { - if (dmsPromise.isDone()) { - throw it + storage.getAllLegacyGroupPublicKeys() + .mapTo(tasks) { key -> + async { + Log.d(TAG, "Polling legacy group ${key.substring(0, 8)}...") + poller.poll(key) } + } + } - ogPollError = it + // Open groups + if (requestTargets.contains(Target.OPEN_GROUPS)) { + lokiThreadDatabase.getAllOpenGroups() + .mapTo(hashSetOf()) { it.value.server } + .mapTo(tasks) { server -> + async { + Log.d(TAG, "Polling open group server $server.") + OpenGroupPoller(server, null) + .apply { hasStarted = true } + .poll() + .await() + } } - ) } - } - // Wait until all the promises are resolved - all(promises).get() + // Close group + if (requestTargets.contains(Target.GROUPS)) { + tasks += async { + Log.d(TAG, "Polling all groups.") + groupPollerManager.pollAllGroupsOnce() + } + } - // If the Open Group pollers threw an exception then re-throw it here (now that - // the DM promise has completed) - val localOgPollException = ogPollError + val caughtException = tasks + .fold(null) { acc: Throwable?, result -> + try { + result.await() + acc + } catch (ec: Exception) { + Log.e(TAG, "Failed to poll group due to error.", ec) + acc?.also { it.addSuppressed(ec) } ?: ec + } + } - if (localOgPollException != null) { - throw localOgPollException + if (caughtException != null) { + throw caughtException + } } return Result.success() @@ -181,13 +177,4 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor } } - class BootBroadcastReceiver: BroadcastReceiver() { - - override fun onReceive(context: Context, intent: Intent) { - if (intent.action == Intent.ACTION_BOOT_COMPLETED) { - Log.v(TAG, "Boot broadcast caught.") - schedulePeriodic(context) - } - } - } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/service/WebRtcCallService.kt b/app/src/main/java/org/thoughtcrime/securesms/service/WebRtcCallService.kt index 9622777af0..8ae08c9c47 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/service/WebRtcCallService.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/service/WebRtcCallService.kt @@ -426,7 +426,7 @@ class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener { BackgroundPollWorker.scheduleOnce( this, - arrayOf(BackgroundPollWorker.Targets.DMS) + listOf(BackgroundPollWorker.Target.ONE_TO_ONE) ) } } diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 8bb02adc2a..bd545cfebf 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -144,7 +144,7 @@ interface StorageProtocol { fun removeMember(groupID: String, member: Address) fun updateMembers(groupID: String, members: List
) fun setZombieMembers(groupID: String, members: List
) - fun getAllClosedGroupPublicKeys(): Set + fun getAllLegacyGroupPublicKeys(): Set fun getAllActiveClosedGroupPublicKeys(): Set fun addClosedGroupPublicKey(groupPublicKey: String) fun removeClosedGroupPublicKey(groupPublicKey: String) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 1ee331fed4..c5ae961b2f 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -227,7 +227,7 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) { TextSecurePreferences.setHasLegacyConfig(context, true) if (!firstTimeSync) return - val allClosedGroupPublicKeys = storage.getAllClosedGroupPublicKeys() + val allClosedGroupPublicKeys = storage.getAllLegacyGroupPublicKeys() for (closedGroup in message.closedGroups) { if (allClosedGroupPublicKeys.contains(closedGroup.publicKey)) { // just handle the closed group encryption key pairs to avoid sync'd devices getting out of sync diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/notifications/PushRegistryV1.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/notifications/PushRegistryV1.kt index 24688a94ee..827731450d 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/notifications/PushRegistryV1.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/notifications/PushRegistryV1.kt @@ -36,7 +36,7 @@ object PushRegistryV1 { device: Device, isPushEnabled: Boolean = TextSecurePreferences.isPushEnabled(context), publicKey: String? = TextSecurePreferences.getLocalNumber(context), - legacyGroupPublicKeys: Collection = MessagingModuleConfiguration.shared.storage.getAllClosedGroupPublicKeys() + legacyGroupPublicKeys: Collection = MessagingModuleConfiguration.shared.storage.getAllLegacyGroupPublicKeys() ): Promise<*, Exception> = scope.asyncPromise { if (isPushEnabled) { retryWithUniformInterval(maxRetryCount = MAX_RETRY_COUNT) { doRegister(publicKey, device, legacyGroupPublicKeys) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt deleted file mode 100644 index b4b0328189..0000000000 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ /dev/null @@ -1,398 +0,0 @@ -package org.session.libsession.messaging.sending_receiving.pollers - -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.Job -import kotlinx.coroutines.async -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.update -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.supervisorScope -import network.loki.messenger.libsession_util.util.Sodium -import org.session.libsession.database.StorageProtocol -import org.session.libsession.messaging.groups.GroupManagerV2 -import org.session.libsession.messaging.jobs.BatchMessageReceiveJob -import org.session.libsession.messaging.jobs.JobQueue -import org.session.libsession.messaging.jobs.MessageReceiveParameters -import org.session.libsession.messaging.messages.Destination -import org.session.libsession.snode.RawResponse -import org.session.libsession.snode.SnodeAPI -import org.session.libsession.snode.SnodeClock -import org.session.libsession.snode.model.RetrieveMessageResponse -import org.session.libsession.utilities.ConfigFactoryProtocol -import org.session.libsession.utilities.ConfigMessage -import org.session.libsession.utilities.getGroup -import org.session.libsignal.database.LokiAPIDatabaseProtocol -import org.session.libsignal.exceptions.NonRetryableException -import org.session.libsignal.utilities.AccountId -import org.session.libsignal.utilities.IdPrefix -import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.Namespace -import org.session.libsignal.utilities.Snode -import kotlin.coroutines.cancellation.CancellationException -import kotlin.time.Duration.Companion.days - -class ClosedGroupPoller( - private val scope: CoroutineScope, - private val executor: CoroutineDispatcher, - private val closedGroupSessionId: AccountId, - private val configFactoryProtocol: ConfigFactoryProtocol, - private val groupManagerV2: GroupManagerV2, - private val storage: StorageProtocol, - private val lokiApiDatabase: LokiAPIDatabaseProtocol, - private val clock: SnodeClock, -) { - companion object { - private const val POLL_INTERVAL = 3_000L - private const val POLL_ERROR_RETRY_DELAY = 10_000L - - private const val TAG = "ClosedGroupPoller" - } - - sealed interface State - data object IdleState : State - data class StartedState( - internal val job: Job, - val expired: Boolean? = null, - val hadAtLeastOneSuccessfulPoll: Boolean = false, - ) : State - - private val mutableState = MutableStateFlow(IdleState) - val state: StateFlow get() = mutableState - - fun start() { - if ((state.value as? StartedState)?.job?.isActive == true) return // already started, don't restart - - Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}") - val job = scope.launch(executor) { - while (isActive) { - try { - val swarmNodes = - SnodeAPI.fetchSwarmNodes(closedGroupSessionId.hexString).toMutableSet() - var currentSnode: Snode? = null - - while (isActive) { - if (currentSnode == null) { - check(swarmNodes.isNotEmpty()) { "No more swarm nodes found" } - Log.d( - TAG, - "No current snode, getting a new one. Remaining in pool = ${swarmNodes.size - 1}" - ) - currentSnode = swarmNodes.random() - swarmNodes.remove(currentSnode) - } - - val result = runCatching { poll(currentSnode!!) } - when { - result.isSuccess -> { - delay(POLL_INTERVAL) - } - - result.isFailure -> { - val error = result.exceptionOrNull()!! - if (error is CancellationException || error is NonRetryableException) { - throw error - } - - Log.e(TAG, "Error polling closed group", error) - // Clearing snode so we get a new one next time - currentSnode = null - delay(POLL_INTERVAL) - } - } - } - } catch (e: CancellationException) { - throw e - } catch (e: NonRetryableException) { - Log.e(TAG, "Non-retryable error during group poller", e) - throw e - } catch (e: Exception) { - Log.e(TAG, "Error during group poller", e) - delay(POLL_ERROR_RETRY_DELAY) - } - } - } - - mutableState.value = StartedState(job = job) - - job.invokeOnCompletion { - mutableState.value = IdleState - } - } - - fun stop() { - Log.d(TAG, "Stopping closed group poller for $closedGroupSessionId") - (state.value as? StartedState)?.job?.cancel() - } - - private suspend fun poll(snode: Snode): Unit = supervisorScope { - val groupAuth = - configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@supervisorScope - val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { - buildSet { - addAll(it.groupKeys.currentHashes()) - addAll(it.groupInfo.currentHashes()) - addAll(it.groupMembers.currentHashes()) - } - } - - val group = configFactoryProtocol.getGroup(closedGroupSessionId) - if (group == null) { - throw NonRetryableException("Group doesn't exist") - } - - if (group.kicked) { - throw NonRetryableException("Group has been kicked") - } - - val adminKey = group.adminKey - - val pollingTasks = mutableListOf>>() - - val receiveRevokeMessage = async { - SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - closedGroupSessionId.hexString, - Namespace.REVOKED_GROUP_MESSAGES() - ).orEmpty(), - auth = groupAuth, - namespace = Namespace.REVOKED_GROUP_MESSAGES(), - maxSize = null, - ), - RetrieveMessageResponse::class.java - ).messages.filterNotNull() - } - - if (configHashesToExtends.isNotEmpty() && adminKey != null) { - pollingTasks += "extending group config TTL" to async { - SnodeAPI.sendBatchRequest( - snode, - closedGroupSessionId.hexString, - SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( - messageHashes = configHashesToExtends.toList(), - auth = groupAuth, - newExpiry = clock.currentTimeMills() + 14.days.inWholeMilliseconds, - extend = true - ), - ) - } - } - - val groupMessageRetrieval = async { - val lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - closedGroupSessionId.hexString, - Namespace.CLOSED_GROUP_MESSAGES() - ).orEmpty() - - Log.d(TAG, "Retrieving group message since lastHash = $lastHash") - - SnodeAPI.sendBatchRequest( - snode = snode, - publicKey = closedGroupSessionId.hexString, - request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lastHash, - auth = groupAuth, - namespace = Namespace.CLOSED_GROUP_MESSAGES(), - maxSize = null, - ), - responseType = Map::class.java - ) - } - - val groupConfigRetrieval = listOf( - Namespace.ENCRYPTION_KEYS(), - Namespace.CLOSED_GROUP_INFO(), - Namespace.CLOSED_GROUP_MEMBERS() - ).map { ns -> - async { - SnodeAPI.sendBatchRequest( - snode = snode, - publicKey = closedGroupSessionId.hexString, - request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - closedGroupSessionId.hexString, - ns - ).orEmpty(), - auth = groupAuth, - namespace = ns, - maxSize = null, - ), - responseType = RetrieveMessageResponse::class.java - ).messages.filterNotNull() - } - } - - // The retrieval of the all group messages can be done concurrently, - // however, in order for the messages to be able to be decrypted, the config messages - // must be processed first. - pollingTasks += "polling and handling group config keys and messages" to async { - val result = runCatching { - val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() } - handleGroupConfigMessages(keysMessage, infoMessage, membersMessage) - saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS()) - saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO()) - saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS()) - - val isGroupExpired = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { - it.groupKeys.size() == 0 - } - - // As soon as we have handled config messages, the polling count as successful, - // as normally the outside world really only cares about configs. - mutableState.update { - (it as? StartedState)?.copy( - hadAtLeastOneSuccessfulPoll = true, - expired = isGroupExpired, - ) ?: it - } - - val regularMessages = groupMessageRetrieval.await() - handleMessages(regularMessages, snode) - } - - // Revoke message must be handled regardless, and at the end - val revokedMessages = receiveRevokeMessage.await() - handleRevoked(revokedMessages) - saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES()) - - // Propagate any prior exceptions - result.getOrThrow() - } - - // Wait for all tasks to complete, gather any exceptions happened during polling - val errors = pollingTasks.mapNotNull { (name, task) -> - runCatching { task.await() } - .exceptionOrNull() - ?.takeIf { it !is CancellationException } - ?.let { RuntimeException("Error $name", it) } - } - - // If there were any errors, throw the first one and add the rest as "suppressed" exceptions - if (errors.isNotEmpty()) { - throw errors.first().apply { - for (index in 1 until errors.size) { - addSuppressed(errors[index]) - } - } - } - } - - private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage { - return ConfigMessage(hash, data, timestamp ?: clock.currentTimeMills()) - } - - private fun saveLastMessageHash( - snode: Snode, - messages: List, - namespace: Int - ) { - if (messages.isNotEmpty()) { - lokiApiDatabase.setLastMessageHashValue( - snode = snode, - publicKey = closedGroupSessionId.hexString, - newValue = messages.last().hash, - namespace = namespace - ) - } - } - - private suspend fun handleRevoked(messages: List) { - messages.forEach { msg -> - val decoded = configFactoryProtocol.decryptForUser( - msg.data, - Sodium.KICKED_DOMAIN, - closedGroupSessionId, - ) - - if (decoded != null) { - // The message should be in the format of "", - // where the pub key is 32 bytes, so we need to have at least 33 bytes of data - if (decoded.size < 33) { - Log.w(TAG, "Received an invalid kicked message, expecting at least 33 bytes, got ${decoded.size}") - return@forEach - } - - val sessionId = AccountId(IdPrefix.STANDARD, decoded.copyOfRange(0, 32)) - val messageGeneration = decoded.copyOfRange(32, decoded.size).decodeToString().toIntOrNull() - if (messageGeneration == null) { - Log.w(TAG, "Received an invalid kicked message: missing message generation") - return@forEach - } - - val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { - it.groupKeys.currentGeneration() - } - - val isForMe = sessionId.hexString == storage.getUserPublicKey() - Log.d(TAG, "Received kicked message, for us? ${isForMe}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration") - - if (isForMe && messageGeneration >= currentKeysGeneration) { - groupManagerV2.handleKicked(closedGroupSessionId) - } - } - } - } - - private fun handleGroupConfigMessages( - keysResponse: List, - infoResponse: List, - membersResponse: List - ) { - if (keysResponse.isEmpty() && infoResponse.isEmpty() && membersResponse.isEmpty()) { - return - } - - Log.d( - TAG, "Handling group config messages(" + - "info = ${infoResponse.size}, " + - "keys = ${keysResponse.size}, " + - "members = ${membersResponse.size})" - ) - - configFactoryProtocol.mergeGroupConfigMessages( - groupId = closedGroupSessionId, - keys = keysResponse.map { it.toConfigMessage() }, - info = infoResponse.map { it.toConfigMessage() }, - members = membersResponse.map { it.toConfigMessage() }, - ) - } - - private fun handleMessages(body: RawResponse, snode: Snode) { - val messages = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) { - SnodeAPI.parseRawMessagesResponse( - rawResponse = body, - snode = snode, - publicKey = closedGroupSessionId.hexString, - decrypt = it.groupKeys::decrypt, - namespace = Namespace.CLOSED_GROUP_MESSAGES(), - ) - } - - val parameters = messages.map { (envelope, serverHash) -> - MessageReceiveParameters( - envelope.toByteArray(), - serverHash = serverHash, - closedGroup = Destination.ClosedGroup(closedGroupSessionId.hexString) - ) - } - - parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> - val job = BatchMessageReceiveJob(chunk) - JobQueue.shared.add(job) - } - - if (messages.isNotEmpty()) { - Log.d(TAG, "Received and handled ${messages.size} group messages") - } - } -} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt index 9697b2a08b..2e05051442 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/LegacyClosedGroupPollerV2.kt @@ -48,7 +48,7 @@ class LegacyClosedGroupPollerV2( class PollingCanceledException() : Exception("Polling canceled.") fun start() { - val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys() + val allGroupPublicKeys = storage.getAllLegacyGroupPublicKeys() allGroupPublicKeys.iterator().forEach { startPolling(it) } }