diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java index 8adbe17308..0a295c260e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java @@ -62,6 +62,7 @@ import org.thoughtcrime.securesms.mms.SlideDeck; import org.thoughtcrime.securesms.notifications.MarkReadReceiver; import org.thoughtcrime.securesms.util.SessionMetaProtocol; import java.io.Closeable; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -861,15 +862,28 @@ public class ThreadDatabase extends Database { } public Reader readerFor(Cursor cursor) { - return new Reader(cursor); + return readerFor(cursor, true); + } + + /** + * Create a reader to conveniently access the thread cursor + * + * @param retrieveGroupStatus Whether group status should be calculated based on the config data. + * Normally you always want it, but if you don't want the reader + * to access the config system, this is the flag to turn it off. + */ + public Reader readerFor(Cursor cursor, boolean retrieveGroupStatus) { + return new Reader(cursor, retrieveGroupStatus); } public class Reader implements Closeable { private final Cursor cursor; + private final boolean retrieveGroupStatus; - public Reader(Cursor cursor) { + public Reader(Cursor cursor, boolean retrieveGroupStatus) { this.cursor = cursor; + this.retrieveGroupStatus = retrieveGroupStatus; } public int getCount() { @@ -931,7 +945,7 @@ public class ThreadDatabase extends Database { } final GroupThreadStatus groupThreadStatus; - if (recipient.isGroupV2Recipient()) { + if (recipient.isGroupV2Recipient() && retrieveGroupStatus) { GroupInfo.ClosedGroupInfo group = ConfigFactoryProtocolKt.getGroup( MessagingModuleConfiguration.getShared().getConfigFactory(), new AccountId(recipient.getAddress().serialize()) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt index 2180d4a8ea..696c98898a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ConfigFactory.kt @@ -554,7 +554,7 @@ private fun MutableUserGroupsConfig.initFrom(storage: StorageProtocol) { private fun MutableConversationVolatileConfig.initFrom(storage: StorageProtocol, threadDb: ThreadDatabase) { threadDb.approvedConversationList.use { cursor -> - val reader = threadDb.readerFor(cursor) + val reader = threadDb.readerFor(cursor, false) var current = reader.next while (current != null) { val recipient = current.recipient diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt index 56e0012da8..9acdc8e172 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/SessionUtilModule.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 +import org.session.libsession.messaging.groups.GroupScope import org.session.libsession.snode.SnodeClock import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsignal.database.LokiAPIDatabaseProtocol @@ -59,4 +60,8 @@ object SessionUtilModule { @Provides @Singleton fun provideSnodeClock() = SnodeClock() + + @Provides + @Singleton + fun provideGroupScope() = GroupScope() } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index 5b0e94abcd..c4c562da6e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -5,15 +5,14 @@ import com.google.protobuf.ByteString import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeout import network.loki.messenger.R import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE import network.loki.messenger.libsession_util.util.Conversation @@ -25,6 +24,7 @@ import org.session.libsession.database.MessageDataProvider import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth import org.session.libsession.messaging.groups.GroupManagerV2 +import org.session.libsession.messaging.groups.GroupScope import org.session.libsession.messaging.jobs.InviteContactsJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.messages.Destination @@ -83,6 +83,7 @@ class GroupManagerV2Impl @Inject constructor( private val clock: SnodeClock, private val messageDataProvider: MessageDataProvider, private val lokiAPIDatabase: LokiAPIDatabase, + private val scope: GroupScope, ) : GroupManagerV2 { private val dispatcher = Dispatchers.Default @@ -206,7 +207,7 @@ class GroupManagerV2Impl @Inject constructor( group: AccountId, newMembers: List, shareHistory: Boolean - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(group, "Invite members") { val adminKey = requireAdminAccess(group) val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey) @@ -376,7 +377,7 @@ class GroupManagerV2Impl @Inject constructor( override suspend fun removeMemberMessages( groupAccountId: AccountId, members: List - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(groupAccountId, "Remove member messages") { val messagesToDelete = mutableListOf() val threadId = storage.getThreadId(Address.fromSerialized(groupAccountId.hexString)) @@ -394,18 +395,18 @@ class GroupManagerV2Impl @Inject constructor( } if (messagesToDelete.isEmpty()) { - return@withContext + return@launchAndWait } val groupAdminAuth = configFactory.getGroup(groupAccountId)?.adminKey?.let { OwnedSwarmAuth.ofClosedGroup(groupAccountId, it) - } ?: return@withContext + } ?: return@launchAndWait SnodeAPI.deleteMessage(groupAccountId.hexString, groupAdminAuth, messagesToDelete) } - override suspend fun handleMemberLeftMessage(memberId: AccountId, group: AccountId) { - val closedGroup = configFactory.getGroup(group) ?: return + override suspend fun handleMemberLeftMessage(memberId: AccountId, group: AccountId) = scope.launchAndWait(group, "Handle member left message") { + val closedGroup = configFactory.getGroup(group) ?: return@launchAndWait val groupAdminKey = closedGroup.adminKey if (groupAdminKey != null) { @@ -418,155 +419,159 @@ class GroupManagerV2Impl @Inject constructor( } } - override suspend fun leaveGroup(groupId: AccountId) = withContext(dispatcher + SupervisorJob()) { - val group = configFactory.getGroup(groupId) + override suspend fun leaveGroup(groupId: AccountId) { + scope.launchAndWait(groupId, "Leave group") { + withContext(SupervisorJob()) { + val group = configFactory.getGroup(groupId) + + if (group?.destroyed != true) { + // Only send the left/left notification group message when we are not kicked and we are not the only admin (only admin has a special treatment) + val weAreTheOnlyAdmin = configFactory.withGroupConfigs(groupId) { config -> + val allMembers = config.groupMembers.all() + allMembers.count { it.admin } == 1 && + allMembers.first { it.admin } + .accountIdString() == storage.getUserPublicKey() + } - if (group?.destroyed != true) { - // Only send the left/left notification group message when we are not kicked and we are not the only admin (only admin has a special treatment) - val weAreTheOnlyAdmin = configFactory.withGroupConfigs(groupId) { config -> - val allMembers = config.groupMembers.all() - allMembers.count { it.admin } == 1 && - allMembers.first { it.admin } - .accountIdString() == storage.getUserPublicKey() - } + if (group != null && !group.kicked && !weAreTheOnlyAdmin) { + val destination = Destination.ClosedGroup(groupId.hexString) + val sendMessageTasks = mutableListOf>() + + // Always send a "XXX left" message to the group if we can + sendMessageTasks += async { + MessageSender.send( + GroupUpdated( + GroupUpdateMessage.newBuilder() + .setMemberLeftNotificationMessage(DataMessage.GroupUpdateMemberLeftNotificationMessage.getDefaultInstance()) + .build() + ), + destination, + isSyncMessage = false + ).await() + } - if (group != null && !group.kicked && !weAreTheOnlyAdmin) { - val destination = Destination.ClosedGroup(groupId.hexString) - val sendMessageTasks = mutableListOf>() - - // Always send a "XXX left" message to the group if we can - sendMessageTasks += async { - MessageSender.send( - GroupUpdated( - GroupUpdateMessage.newBuilder() - .setMemberLeftNotificationMessage(DataMessage.GroupUpdateMemberLeftNotificationMessage.getDefaultInstance()) - .build() - ), - destination, - isSyncMessage = false - ).await() - } + // If we are not the only admin, send a left message for other admin to handle the member removal + sendMessageTasks += async { + MessageSender.send( + GroupUpdated( + GroupUpdateMessage.newBuilder() + .setMemberLeftMessage(DataMessage.GroupUpdateMemberLeftMessage.getDefaultInstance()) + .build() + ), + destination, + isSyncMessage = false + ).await() + } - // If we are not the only admin, send a left message for other admin to handle the member removal - sendMessageTasks += async { - MessageSender.send( - GroupUpdated( - GroupUpdateMessage.newBuilder() - .setMemberLeftMessage(DataMessage.GroupUpdateMemberLeftMessage.getDefaultInstance()) - .build() - ), - destination, - isSyncMessage = false - ).await() - } + sendMessageTasks.awaitAll() + } - sendMessageTasks.awaitAll() - } + // If we are the only admin, leaving this group will destroy the group + if (weAreTheOnlyAdmin) { + configFactory.withMutableGroupConfigs(groupId) { configs -> + configs.groupInfo.destroyGroup() + } - // If we are the only admin, leaving this group will destroy the group - if (weAreTheOnlyAdmin) { - configFactory.withMutableGroupConfigs(groupId) { configs -> - configs.groupInfo.destroyGroup() + // Must wait until the config is pushed, otherwise if we go through the rest + // of the code it will destroy the conversation, destroying the necessary configs + // along the way, we won't be able to push the "destroyed" state anymore. + configFactory.waitUntilGroupConfigsPushed(groupId) + } } - // Must wait until the config is pushed, otherwise if we go through the rest - // of the code it will destroy the conversation, destroying the necessary configs - // along the way, we won't be able to push the "destroyed" state anymore. - configFactory.waitUntilGroupConfigsPushed(groupId) - } - } + pollerFactory.pollerFor(groupId)?.stop() - pollerFactory.pollerFor(groupId)?.stop() - - // Delete conversation and group configs - storage.getThreadId(Address.fromSerialized(groupId.hexString)) - ?.let(storage::deleteConversation) - configFactory.removeGroup(groupId) - lokiAPIDatabase.clearLastMessageHashes(groupId.hexString) - lokiAPIDatabase.clearReceivedMessageHashValues(groupId.hexString) - } + // Delete conversation and group configs + storage.getThreadId(Address.fromSerialized(groupId.hexString)) + ?.let(storage::deleteConversation) + configFactory.removeGroup(groupId) + lokiAPIDatabase.clearLastMessageHashes(groupId.hexString) + lokiAPIDatabase.clearReceivedMessageHashValues(groupId.hexString) + } + } } override suspend fun promoteMember( group: AccountId, members: List - ): Unit = withContext(dispatcher + SupervisorJob()) { - val adminKey = requireAdminAccess(group) - val groupName = configFactory.withMutableGroupConfigs(group) { configs -> - // Update the group member's promotion status - members.asSequence() - .mapNotNull { configs.groupMembers.get(it.hexString) } - .onEach(GroupMember::setPromoted) - .forEach(configs.groupMembers::set) - - configs.groupInfo.getName() - } + ): Unit = scope.launchAndWait(group, "Promote member") { + withContext(SupervisorJob()) { + val adminKey = requireAdminAccess(group) + val groupName = configFactory.withMutableGroupConfigs(group) { configs -> + // Update the group member's promotion status + members.asSequence() + .mapNotNull { configs.groupMembers.get(it.hexString) } + .onEach(GroupMember::setPromoted) + .forEach(configs.groupMembers::set) + + configs.groupInfo.getName() + } - // Send out the promote message to the members concurrently - val promoteMessage = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setPromoteMessage( - DataMessage.GroupUpdatePromoteMessage.newBuilder() - .setGroupIdentitySeed(ByteString.copyFrom(adminKey).substring(0, 32)) - .setName(groupName) - ) - .build() - ) + // Send out the promote message to the members concurrently + val promoteMessage = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setPromoteMessage( + DataMessage.GroupUpdatePromoteMessage.newBuilder() + .setGroupIdentitySeed(ByteString.copyFrom(adminKey).substring(0, 32)) + .setName(groupName) + ) + .build() + ) - val promotionDeferred = members.associateWith { member -> - async { - MessageSender.sendNonDurably( - message = promoteMessage, - address = Address.fromSerialized(member.hexString), - isSyncMessage = false - ).await() + val promotionDeferred = members.associateWith { member -> + async { + MessageSender.sendNonDurably( + message = promoteMessage, + address = Address.fromSerialized(member.hexString), + isSyncMessage = false + ).await() + } } - } - // Wait and gather all the promote message sending result into a result map - val promotedByMemberIDs = promotionDeferred - .mapValues { - runCatching { it.value.await() }.isSuccess - } + // Wait and gather all the promote message sending result into a result map + val promotedByMemberIDs = promotionDeferred + .mapValues { + runCatching { it.value.await() }.isSuccess + } - // Update each member's status - configFactory.withMutableGroupConfigs(group) { configs -> - promotedByMemberIDs.asSequence() - .mapNotNull { (member, success) -> - configs.groupMembers.get(member.hexString)?.apply { - if (success) { - setPromotionSent() - } else { - setPromotionFailed() + // Update each member's status + configFactory.withMutableGroupConfigs(group) { configs -> + promotedByMemberIDs.asSequence() + .mapNotNull { (member, success) -> + configs.groupMembers.get(member.hexString)?.apply { + if (success) { + setPromotionSent() + } else { + setPromotionFailed() + } } } - } - .forEach(configs.groupMembers::set) - } + .forEach(configs.groupMembers::set) + } - // Send a group update message to the group telling members someone has been promoted - val timestamp = clock.currentTimeMills() - val signature = SodiumUtilities.sign( - buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp), - adminKey - ) - val message = GroupUpdated( - GroupUpdateMessage.newBuilder() - .setMemberChangeMessage( - GroupUpdateMemberChangeMessage.newBuilder() - .addAllMemberSessionIds(members.map { it.hexString }) - .setType(GroupUpdateMemberChangeMessage.Type.PROMOTED) - .setAdminSignature(ByteString.copyFrom(signature)) - ) - .build() - ).apply { - sentTimestamp = timestamp - } + // Send a group update message to the group telling members someone has been promoted + val timestamp = clock.currentTimeMills() + val signature = SodiumUtilities.sign( + buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp), + adminKey + ) + val message = GroupUpdated( + GroupUpdateMessage.newBuilder() + .setMemberChangeMessage( + GroupUpdateMemberChangeMessage.newBuilder() + .addAllMemberSessionIds(members.map { it.hexString }) + .setType(GroupUpdateMemberChangeMessage.Type.PROMOTED) + .setAdminSignature(ByteString.copyFrom(signature)) + ) + .build() + ).apply { + sentTimestamp = timestamp + } - MessageSender.send(message, Destination.ClosedGroup(group.hexString), false).await() - storage.insertGroupInfoChange(message, group) + MessageSender.send(message, Destination.ClosedGroup(group.hexString), false).await() + storage.insertGroupInfoChange(message, group) + } } - /** * Mark this member as "removed" in the group config. * @@ -591,7 +596,7 @@ class GroupManagerV2Impl @Inject constructor( } override suspend fun respondToInvitation(groupId: AccountId, approved: Boolean) = - withContext(dispatcher) { + scope.launchAndWait(groupId, "Respond to invitation") { val group = requireNotNull( configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) } ) { "User groups config is not available" } @@ -633,6 +638,17 @@ class GroupManagerV2Impl @Inject constructor( "Our account ID is not available" } + val poller = checkNotNull(pollerFactory.pollerFor(group.groupAccountId)) { "Unable to start a poller for groups " } + poller.start() + + // We need to wait until we have the first data polled from the poller, otherwise + // we won't have the necessary configs to send invite response/or do anything else. + // We can't hang on here forever if things don't work out, bail out if it's the camse + withTimeout(20_000L) { + poller.state.filterIsInstance() + .filter { it.hadAtLeastOneSuccessfulPoll } + .first() + } // Clear the invited flag of the group in the config configFactory.withMutableUserConfigs { configs -> configs.userGroups.set(group.copy( @@ -641,15 +657,6 @@ class GroupManagerV2Impl @Inject constructor( )) } - val poller = checkNotNull(pollerFactory.pollerFor(group.groupAccountId)) { "Unable to start a poller for groups " } - poller.start() - - // We need to wait until we have the first data polled from the poller, otherwise - // we won't have the necessary configs to send invite response/or do anything else - poller.state.filterIsInstance() - .filter { it.hadAtLeastOneSuccessfulPoll } - .first() - if (group.adminKey == null) { // Send an invite response to the group if we are invited as a regular member val inviteResponse = GroupUpdateInviteResponseMessage.newBuilder() @@ -695,7 +702,7 @@ class GroupManagerV2Impl @Inject constructor( inviterName: String?, inviteMessageHash: String, inviteMessageTimestamp: Long, - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(groupId, "Handle invitation") { handleInvitation( groupId = groupId, groupName = groupName, @@ -716,7 +723,7 @@ class GroupManagerV2Impl @Inject constructor( promoterName: String?, promoteMessageHash: String, promoteMessageTimestamp: Long, - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(groupId, "Handle promotion") { val userAuth = requireNotNull(storage.userAuth) { "No current user available" } val group = configFactory.getGroup(groupId) @@ -834,15 +841,15 @@ class GroupManagerV2Impl @Inject constructor( groupId: AccountId, sender: AccountId, approved: Boolean - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(groupId, "Handle invite response") { if (!approved) { // We should only see approved coming through - return@withContext + return@launchAndWait } val adminKey = configFactory.getGroup(groupId)?.adminKey if (adminKey == null || adminKey.isEmpty()) { - return@withContext // We don't have the admin key, we can't process the invite response + return@launchAndWait // We don't have the admin key, we can't process the invite response } configFactory.withMutableGroupConfigs(groupId) { configs -> @@ -857,14 +864,14 @@ class GroupManagerV2Impl @Inject constructor( } } - override suspend fun handleKicked(groupId: AccountId): Unit = withContext(dispatcher) { + override suspend fun handleKicked(groupId: AccountId): Unit = scope.launchAndWait(groupId, "Handle kicked") { Log.d(TAG, "We were kicked from the group, delete and stop polling") // Stop polling the group immediately pollerFactory.pollerFor(groupId)?.stop() val userId = requireNotNull(storage.getUserPublicKey()) { "No current user available" } - val group = configFactory.getGroup(groupId) ?: return@withContext + val group = configFactory.getGroup(groupId) ?: return@launchAndWait // Retrieve the group name one last time from the group info, // as we are going to clear the keys, we won't have the chance to @@ -897,7 +904,7 @@ class GroupManagerV2Impl @Inject constructor( } override suspend fun setName(groupId: AccountId, newName: String): Unit = - withContext(dispatcher) { + scope.launchAndWait(groupId, "Set group name") { val adminKey = requireAdminAccess(groupId) val nameChanged = configFactory.withMutableGroupConfigs(groupId) { configs -> @@ -910,7 +917,7 @@ class GroupManagerV2Impl @Inject constructor( } if (!nameChanged) { - return@withContext + return@launchAndWait } val timestamp = clock.currentTimeMills() @@ -940,7 +947,7 @@ class GroupManagerV2Impl @Inject constructor( override suspend fun requestMessageDeletion( groupId: AccountId, messageHashes: Set - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(groupId, "Request message deletion") { // To delete messages from a group, there are a few considerations: // 1. Messages are stored on every member's device, we need a way to ask them to delete their stored messages // 2. Messages are also stored on the group swarm, only the group admin can delete them @@ -1014,7 +1021,7 @@ class GroupManagerV2Impl @Inject constructor( timestamp: Long, sender: AccountId, senderIsVerifiedAdmin: Boolean, - ): Unit = withContext(dispatcher) { + ): Unit = scope.launchAndWait(groupId, "Handle delete member content") { val threadId = requireNotNull(storage.getThreadId(Address.fromSerialized(groupId.hexString))) { "No thread ID found for the group" @@ -1091,7 +1098,7 @@ class GroupManagerV2Impl @Inject constructor( } override fun onBlocked(groupAccountId: AccountId) { - GlobalScope.launch(dispatcher) { + scope.launch(groupAccountId, "On blocked") { respondToInvitation(groupAccountId, false) // Remove this group from config regardless diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/handler/DestroyedGroupSync.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/handler/DestroyedGroupSync.kt index 39fa46d8f3..f750d48ef2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/handler/DestroyedGroupSync.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/handler/DestroyedGroupSync.kt @@ -10,7 +10,7 @@ import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigUpdateNotification import org.session.libsession.utilities.waitUntilGroupConfigsPushed import org.session.libsignal.utilities.Log -import org.thoughtcrime.securesms.database.Storage +import org.session.libsession.messaging.groups.GroupScope import javax.inject.Inject import javax.inject.Singleton @@ -21,6 +21,7 @@ import javax.inject.Singleton @Singleton class DestroyedGroupSync @Inject constructor( private val configFactory: ConfigFactoryProtocol, + private val groupScope: GroupScope, private val storage: StorageProtocol, ) { private var job: Job? = null @@ -39,15 +40,17 @@ class DestroyedGroupSync @Inject constructor( Log.d("DestroyedGroupSync", "Group is destroyed: $isDestroyed") if (isDestroyed) { - // If there's un-pushed group config updates, wait until they are pushed. - // This is important, as the pushing process might need to access the UserGroupConfig, - // if we delete the UserGroupConfig before the pushing process, the pushing - // process will fail. - configFactory.waitUntilGroupConfigsPushed(update.groupId) - - configFactory.withMutableUserConfigs { configs -> - configs.userGroups.getClosedGroup(update.groupId.hexString)?.let { group -> - configs.userGroups.set(group.copy(destroyed = true)) + groupScope.launch(update.groupId, "DestroyedGroupSync") { + // If there's un-pushed group config updates, wait until they are pushed. + // This is important, as the pushing process might need to access the UserGroupConfig, + // if we delete the UserGroupConfig before the pushing process, the pushing + // process will fail. + configFactory.waitUntilGroupConfigsPushed(update.groupId) + + configFactory.withMutableUserConfigs { configs -> + configs.userGroups.getClosedGroup(update.groupId.hexString)?.let { group -> + configs.userGroups.set(group.copy(destroyed = true)) + } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/handler/RemoveGroupMemberHandler.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/handler/RemoveGroupMemberHandler.kt index f5fc97b0aa..a2d19af7e9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/handler/RemoveGroupMemberHandler.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/handler/RemoveGroupMemberHandler.kt @@ -1,14 +1,14 @@ package org.thoughtcrime.securesms.groups.handler import android.content.Context -import android.os.SystemClock import com.google.protobuf.ByteString import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.firstOrNull +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.launch import network.loki.messenger.R import network.loki.messenger.libsession_util.ReadableGroupKeysConfig @@ -17,7 +17,6 @@ import network.loki.messenger.libsession_util.util.GroupMember import network.loki.messenger.libsession_util.util.Sodium import org.session.libsession.database.MessageDataProvider import org.session.libsession.database.StorageProtocol -import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.control.GroupUpdated import org.session.libsession.messaging.sending_receiving.MessageSender @@ -30,18 +29,20 @@ import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.Address import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigUpdateNotification import org.session.libsession.utilities.TextSecurePreferences +import org.session.libsession.utilities.getGroup import org.session.libsession.utilities.waitUntilGroupConfigsPushed import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace +import org.session.libsession.messaging.groups.GroupScope import javax.inject.Inject import javax.inject.Singleton private const val TAG = "RemoveGroupMemberHandler" -private const val MIN_PROCESS_INTERVAL_MILLS = 1_000L /** * This handler is responsible for processing pending group member removals. @@ -56,55 +57,41 @@ class RemoveGroupMemberHandler @Inject constructor( private val clock: SnodeClock, private val messageDataProvider: MessageDataProvider, private val storage: StorageProtocol, + private val groupScope: GroupScope, ) { private var job: Job? = null + @OptIn(ExperimentalCoroutinesApi::class) fun start() { require(job == null) { "Already started" } job = GlobalScope.launch { - while (true) { - // Make sure we have a local number before we start processing - textSecurePreferences.watchLocalNumber().first { it != null } - - val processStartedAt = SystemClock.uptimeMillis() + textSecurePreferences + .watchLocalNumber() + .flatMapLatest { localNumber -> + if (localNumber == null) { + return@flatMapLatest emptyFlow() + } - try { - processPendingMemberRemoval() - } catch (e: Exception) { - Log.e(TAG, "Error processing pending member removal", e) + configFactory.configUpdateNotifications } - - configFactory.configUpdateNotifications.firstOrNull() - - // Make sure we don't process too often. As some of the config changes don't apply - // to us, but we have no way to tell if it does or not. The safest way is to process - // everytime any config changes, with a minimum interval. - val delayMills = - MIN_PROCESS_INTERVAL_MILLS - (SystemClock.uptimeMillis() - processStartedAt) - - if (delayMills > 0) { - delay(delayMills) + .filterIsInstance() + .collect { update -> + val adminKey = configFactory.getGroup(update.groupId)?.adminKey + if (adminKey != null) { + groupScope.launch(update.groupId, "Handle possible group removals") { + processPendingRemovalsForGroup(update.groupId, adminKey) + } + } } - } } } - private suspend fun processPendingMemberRemoval() { - configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() } - .asSequence() - .filter { it.hasAdminKey() } - .forEach { group -> - processPendingRemovalsForGroup(group.groupAccountId, group.adminKey!!) - } - } private suspend fun processPendingRemovalsForGroup( groupAccountId: AccountId, adminKey: ByteArray ) { - val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupAccountId, adminKey) - val (pendingRemovals, batchCalls) = configFactory.withGroupConfigs(groupAccountId) { configs -> val pendingRemovals = configs.groupMembers.allWithStatus() .filter { (member, status) -> member.isRemoved(status) } @@ -124,6 +111,8 @@ class RemoveGroupMemberHandler @Inject constructor( // can be performed by everyone in the group. val calls = ArrayList(3) + val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupAccountId, adminKey) + // Call No 1. Revoke sub-key. This call is crucial and must not fail for the rest of the operation to be successful. calls += checkNotNull( SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest( @@ -171,7 +160,12 @@ class RemoveGroupMemberHandler @Inject constructor( val node = SnodeAPI.getSingleTargetSnode(groupAccountId.hexString).await() val response = - SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, sequence = true) + SnodeAPI.getBatchResponse( + node, + groupAccountId.hexString, + batchCalls, + sequence = true + ) val firstError = response.results.firstOrNull { !it.isSuccessful } check(firstError == null) { @@ -195,7 +189,8 @@ class RemoveGroupMemberHandler @Inject constructor( // Try to delete members' message. It's ok to fail as they will be re-tried in different // cases (a.k.a the GroupUpdateDeleteMemberContent message handling) and could be by different admins. - val deletingMessagesForMembers = pendingRemovals.filter { (member, status) -> member.shouldRemoveMessages(status) } + val deletingMessagesForMembers = + pendingRemovals.filter { (member, status) -> member.shouldRemoveMessages(status) } if (deletingMessagesForMembers.isNotEmpty()) { val threadId = storage.getThreadId(Address.fromSerialized(groupAccountId.hexString)) if (threadId != null) { diff --git a/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt b/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt index 7f6bd8b8f5..e74733cfde 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/MessagingModuleConfiguration.kt @@ -5,6 +5,7 @@ import com.goterl.lazysodium.utils.KeyPair import org.session.libsession.database.MessageDataProvider import org.session.libsession.database.StorageProtocol import org.session.libsession.messaging.groups.GroupManagerV2 +import org.session.libsession.messaging.groups.GroupScope import org.session.libsession.messaging.notifications.TokenFetcher import org.session.libsession.snode.OwnedSwarmAuth import org.session.libsession.snode.SnodeClock diff --git a/libsession/src/main/java/org/session/libsession/messaging/groups/GroupScope.kt b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupScope.kt new file mode 100644 index 0000000000..34fde55a03 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/groups/GroupScope.kt @@ -0,0 +1,109 @@ +package org.session.libsession.messaging.groups + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.Log +import java.util.concurrent.atomic.AtomicLong + +private const val TAG = "GroupScope" + +/** + * A coroutine utility that limit the tasks into a group to be executed sequentially. + * + * This is useful for tasks that are related to group management, where the order of execution is important. + * It's probably harmful if you apply the scope on message retrieval, as normally the message retriveal + * doesn't have any order requirement and it will likly slow down usual group operations. + */ +class GroupScope(private val scope: CoroutineScope = GlobalScope) { + private val tasksByGroupId = hashMapOf>>() + + /** + * Launch a coroutine in a group context. The coroutine will be executed sequentially + * in the order they are launched, and the next coroutine will not be started until the previous one is completed. + * Each group has their own queue of tasks so they won't block each other. + * + * @groupId The group id that the coroutine belongs to. + * @debugName A debug name for the coroutine. + * @block The coroutine block. + */ + fun launch(groupId: AccountId, debugName: String, block: suspend () -> Unit) : Job { + return async(groupId, debugName) { block() } + } + + /** + * Launch a coroutine in the given group scope and wait for it to complete. + * + * See [launch] for more details. + */ + suspend fun launchAndWait(groupId: AccountId, debugName: String, block: suspend () -> T): T { + return async(groupId, debugName, block).await() + } + + /** + * Launch a coroutine in the given group scope and return a deferred result. + * + * See [launch] for more details. + */ + fun async(groupId: AccountId, debugName: String, block: suspend () -> T) : Deferred { + val completion = CompletableDeferred() + + synchronized(tasksByGroupId) { + val tasks = tasksByGroupId.getOrPut(groupId) { ArrayDeque() } + + val task = Task(groupId, debugName, block, completion) + tasks.addLast(task) + + Log.d(TAG, "Added $task to queue, queue size: ${tasks.size}") + + // If this is the first task in the queue, start it directly (otherwise the next will be started by the previous task) + if (tasks.size == 1) { + scope.launch { + task.run() + } + } + } + + return completion + } + + private val taskIdSeq = AtomicLong(0) + + private inner class Task(val groupId: AccountId, val debugName: String, val block: suspend () -> T, val completion: CompletableDeferred) { + private val id = taskIdSeq.getAndIncrement() + + suspend fun run() { + Log.d(TAG, "Task started: $this") + try { + completion.complete(block()) + } catch (e: Throwable) { + completion.completeExceptionally(e) + } finally { + Log.d(TAG, "Task completed: $this") + // Remove self from the queue and start the next task + synchronized(tasksByGroupId) { + val tasks = tasksByGroupId[groupId] + require(tasks != null && tasks.firstOrNull() == this) { "Task is not the first in the queue: $this" } + tasks.removeFirst() + + if (tasks.isEmpty()) { + tasksByGroupId.remove(groupId) + } else { + val nextTask = tasks.first() + scope.launch { + nextTask.run() + } + } + } + } + } + + override fun toString(): String { + return "Task($debugName, id=$id)" + } + } +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 693cbb8a8d..96c66054c3 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -260,7 +260,7 @@ object MessageSender { SnodeAPI.sendMessage( auth = groupAuth, message = snodeMessage, - namespace = namespace + namespace = namespace, ) } } else { diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt index e2b1158e33..fc7a1c2602 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -238,6 +238,13 @@ class ClosedGroupPoller( saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO()) saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS()) + // As soon as we have handled config messages, the polling count as successful, + // as normally the outside world really only cares about configs. + val currentState = state.value as? StartedState + if (currentState != null && !currentState.hadAtLeastOneSuccessfulPoll) { + mutableState.value = currentState.copy(hadAtLeastOneSuccessfulPoll = true) + } + val regularMessages = groupMessageRetrieval.await() handleMessages(regularMessages, snode) } @@ -267,12 +274,6 @@ class ClosedGroupPoller( } } } - - // Update the state to indicate that we had at least one successful poll - val currentState = state.value as? StartedState - if (currentState != null && !currentState.hadAtLeastOneSuccessfulPoll) { - mutableState.value = currentState.copy(hadAtLeastOneSuccessfulPoll = true) - } } private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage { diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index fb92859642..68b5f7914f 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -644,9 +644,7 @@ object SnodeAPI { getBatchResponse( snode = snode, publicKey = batch.first().publicKey, - requests = batch.mapNotNull { info -> - info.request.takeIf { !info.callback.isClosedForSend } - }, + requests = batch.map { it.request }, sequence = false ) } catch (e: Exception) { @@ -691,9 +689,15 @@ object SnodeAPI { request: SnodeBatchRequestInfo, responseType: Class, ): T { - val callback = Channel>() + val callback = Channel>(capacity = 1) @Suppress("UNCHECKED_CAST") - batchedRequestsSender.send(RequestInfo(snode, publicKey, request, responseType, callback as SendChannel)) + batchedRequestsSender.send(RequestInfo( + snode = snode, + publicKey = publicKey, + request = request, + responseType = responseType, + callback = callback as SendChannel + )) try { return callback.receive().getOrThrow() } catch (e: CancellationException) {