Add a group scope to limit group work concurrency (#881)

pull/1709/head
SessionHero01 3 months ago committed by GitHub
parent 388a7a2f8c
commit 9ba8e11b66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -62,6 +62,7 @@ import org.thoughtcrime.securesms.mms.SlideDeck;
import org.thoughtcrime.securesms.notifications.MarkReadReceiver;
import org.thoughtcrime.securesms.util.SessionMetaProtocol;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -861,15 +862,28 @@ public class ThreadDatabase extends Database {
}
public Reader readerFor(Cursor cursor) {
return new Reader(cursor);
return readerFor(cursor, true);
}
/**
* Create a reader to conveniently access the thread cursor
*
* @param retrieveGroupStatus Whether group status should be calculated based on the config data.
* Normally you always want it, but if you don't want the reader
* to access the config system, this is the flag to turn it off.
*/
public Reader readerFor(Cursor cursor, boolean retrieveGroupStatus) {
return new Reader(cursor, retrieveGroupStatus);
}
public class Reader implements Closeable {
private final Cursor cursor;
private final boolean retrieveGroupStatus;
public Reader(Cursor cursor) {
public Reader(Cursor cursor, boolean retrieveGroupStatus) {
this.cursor = cursor;
this.retrieveGroupStatus = retrieveGroupStatus;
}
public int getCount() {
@ -931,7 +945,7 @@ public class ThreadDatabase extends Database {
}
final GroupThreadStatus groupThreadStatus;
if (recipient.isGroupV2Recipient()) {
if (recipient.isGroupV2Recipient() && retrieveGroupStatus) {
GroupInfo.ClosedGroupInfo group = ConfigFactoryProtocolKt.getGroup(
MessagingModuleConfiguration.getShared().getConfigFactory(),
new AccountId(recipient.getAddress().serialize())

@ -554,7 +554,7 @@ private fun MutableUserGroupsConfig.initFrom(storage: StorageProtocol) {
private fun MutableConversationVolatileConfig.initFrom(storage: StorageProtocol, threadDb: ThreadDatabase) {
threadDb.approvedConversationList.use { cursor ->
val reader = threadDb.readerFor(cursor)
val reader = threadDb.readerFor(cursor, false)
var current = reader.next
while (current != null) {
val recipient = current.recipient

@ -14,6 +14,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.groups.GroupScope
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.database.LokiAPIDatabaseProtocol
@ -59,4 +60,8 @@ object SessionUtilModule {
@Provides
@Singleton
fun provideSnodeClock() = SnodeClock()
@Provides
@Singleton
fun provideGroupScope() = GroupScope()
}

@ -5,15 +5,14 @@ import com.google.protobuf.ByteString
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import network.loki.messenger.R
import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE
import network.loki.messenger.libsession_util.util.Conversation
@ -25,6 +24,7 @@ import org.session.libsession.database.MessageDataProvider
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.groups.GroupScope
import org.session.libsession.messaging.jobs.InviteContactsJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.messages.Destination
@ -83,6 +83,7 @@ class GroupManagerV2Impl @Inject constructor(
private val clock: SnodeClock,
private val messageDataProvider: MessageDataProvider,
private val lokiAPIDatabase: LokiAPIDatabase,
private val scope: GroupScope,
) : GroupManagerV2 {
private val dispatcher = Dispatchers.Default
@ -206,7 +207,7 @@ class GroupManagerV2Impl @Inject constructor(
group: AccountId,
newMembers: List<AccountId>,
shareHistory: Boolean
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(group, "Invite members") {
val adminKey = requireAdminAccess(group)
val groupAuth = OwnedSwarmAuth.ofClosedGroup(group, adminKey)
@ -376,7 +377,7 @@ class GroupManagerV2Impl @Inject constructor(
override suspend fun removeMemberMessages(
groupAccountId: AccountId,
members: List<AccountId>
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(groupAccountId, "Remove member messages") {
val messagesToDelete = mutableListOf<String>()
val threadId = storage.getThreadId(Address.fromSerialized(groupAccountId.hexString))
@ -394,18 +395,18 @@ class GroupManagerV2Impl @Inject constructor(
}
if (messagesToDelete.isEmpty()) {
return@withContext
return@launchAndWait
}
val groupAdminAuth = configFactory.getGroup(groupAccountId)?.adminKey?.let {
OwnedSwarmAuth.ofClosedGroup(groupAccountId, it)
} ?: return@withContext
} ?: return@launchAndWait
SnodeAPI.deleteMessage(groupAccountId.hexString, groupAdminAuth, messagesToDelete)
}
override suspend fun handleMemberLeftMessage(memberId: AccountId, group: AccountId) {
val closedGroup = configFactory.getGroup(group) ?: return
override suspend fun handleMemberLeftMessage(memberId: AccountId, group: AccountId) = scope.launchAndWait(group, "Handle member left message") {
val closedGroup = configFactory.getGroup(group) ?: return@launchAndWait
val groupAdminKey = closedGroup.adminKey
if (groupAdminKey != null) {
@ -418,155 +419,159 @@ class GroupManagerV2Impl @Inject constructor(
}
}
override suspend fun leaveGroup(groupId: AccountId) = withContext(dispatcher + SupervisorJob()) {
val group = configFactory.getGroup(groupId)
override suspend fun leaveGroup(groupId: AccountId) {
scope.launchAndWait(groupId, "Leave group") {
withContext(SupervisorJob()) {
val group = configFactory.getGroup(groupId)
if (group?.destroyed != true) {
// Only send the left/left notification group message when we are not kicked and we are not the only admin (only admin has a special treatment)
val weAreTheOnlyAdmin = configFactory.withGroupConfigs(groupId) { config ->
val allMembers = config.groupMembers.all()
allMembers.count { it.admin } == 1 &&
allMembers.first { it.admin }
.accountIdString() == storage.getUserPublicKey()
}
if (group?.destroyed != true) {
// Only send the left/left notification group message when we are not kicked and we are not the only admin (only admin has a special treatment)
val weAreTheOnlyAdmin = configFactory.withGroupConfigs(groupId) { config ->
val allMembers = config.groupMembers.all()
allMembers.count { it.admin } == 1 &&
allMembers.first { it.admin }
.accountIdString() == storage.getUserPublicKey()
}
if (group != null && !group.kicked && !weAreTheOnlyAdmin) {
val destination = Destination.ClosedGroup(groupId.hexString)
val sendMessageTasks = mutableListOf<Deferred<*>>()
// Always send a "XXX left" message to the group if we can
sendMessageTasks += async {
MessageSender.send(
GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberLeftNotificationMessage(DataMessage.GroupUpdateMemberLeftNotificationMessage.getDefaultInstance())
.build()
),
destination,
isSyncMessage = false
).await()
}
if (group != null && !group.kicked && !weAreTheOnlyAdmin) {
val destination = Destination.ClosedGroup(groupId.hexString)
val sendMessageTasks = mutableListOf<Deferred<*>>()
// Always send a "XXX left" message to the group if we can
sendMessageTasks += async {
MessageSender.send(
GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberLeftNotificationMessage(DataMessage.GroupUpdateMemberLeftNotificationMessage.getDefaultInstance())
.build()
),
destination,
isSyncMessage = false
).await()
}
// If we are not the only admin, send a left message for other admin to handle the member removal
sendMessageTasks += async {
MessageSender.send(
GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberLeftMessage(DataMessage.GroupUpdateMemberLeftMessage.getDefaultInstance())
.build()
),
destination,
isSyncMessage = false
).await()
}
// If we are not the only admin, send a left message for other admin to handle the member removal
sendMessageTasks += async {
MessageSender.send(
GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberLeftMessage(DataMessage.GroupUpdateMemberLeftMessage.getDefaultInstance())
.build()
),
destination,
isSyncMessage = false
).await()
}
sendMessageTasks.awaitAll()
}
sendMessageTasks.awaitAll()
}
// If we are the only admin, leaving this group will destroy the group
if (weAreTheOnlyAdmin) {
configFactory.withMutableGroupConfigs(groupId) { configs ->
configs.groupInfo.destroyGroup()
}
// If we are the only admin, leaving this group will destroy the group
if (weAreTheOnlyAdmin) {
configFactory.withMutableGroupConfigs(groupId) { configs ->
configs.groupInfo.destroyGroup()
// Must wait until the config is pushed, otherwise if we go through the rest
// of the code it will destroy the conversation, destroying the necessary configs
// along the way, we won't be able to push the "destroyed" state anymore.
configFactory.waitUntilGroupConfigsPushed(groupId)
}
}
// Must wait until the config is pushed, otherwise if we go through the rest
// of the code it will destroy the conversation, destroying the necessary configs
// along the way, we won't be able to push the "destroyed" state anymore.
configFactory.waitUntilGroupConfigsPushed(groupId)
}
}
pollerFactory.pollerFor(groupId)?.stop()
pollerFactory.pollerFor(groupId)?.stop()
// Delete conversation and group configs
storage.getThreadId(Address.fromSerialized(groupId.hexString))
?.let(storage::deleteConversation)
configFactory.removeGroup(groupId)
lokiAPIDatabase.clearLastMessageHashes(groupId.hexString)
lokiAPIDatabase.clearReceivedMessageHashValues(groupId.hexString)
}
// Delete conversation and group configs
storage.getThreadId(Address.fromSerialized(groupId.hexString))
?.let(storage::deleteConversation)
configFactory.removeGroup(groupId)
lokiAPIDatabase.clearLastMessageHashes(groupId.hexString)
lokiAPIDatabase.clearReceivedMessageHashValues(groupId.hexString)
}
} }
override suspend fun promoteMember(
group: AccountId,
members: List<AccountId>
): Unit = withContext(dispatcher + SupervisorJob()) {
val adminKey = requireAdminAccess(group)
val groupName = configFactory.withMutableGroupConfigs(group) { configs ->
// Update the group member's promotion status
members.asSequence()
.mapNotNull { configs.groupMembers.get(it.hexString) }
.onEach(GroupMember::setPromoted)
.forEach(configs.groupMembers::set)
configs.groupInfo.getName()
}
): Unit = scope.launchAndWait(group, "Promote member") {
withContext(SupervisorJob()) {
val adminKey = requireAdminAccess(group)
val groupName = configFactory.withMutableGroupConfigs(group) { configs ->
// Update the group member's promotion status
members.asSequence()
.mapNotNull { configs.groupMembers.get(it.hexString) }
.onEach(GroupMember::setPromoted)
.forEach(configs.groupMembers::set)
configs.groupInfo.getName()
}
// Send out the promote message to the members concurrently
val promoteMessage = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setPromoteMessage(
DataMessage.GroupUpdatePromoteMessage.newBuilder()
.setGroupIdentitySeed(ByteString.copyFrom(adminKey).substring(0, 32))
.setName(groupName)
)
.build()
)
// Send out the promote message to the members concurrently
val promoteMessage = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setPromoteMessage(
DataMessage.GroupUpdatePromoteMessage.newBuilder()
.setGroupIdentitySeed(ByteString.copyFrom(adminKey).substring(0, 32))
.setName(groupName)
)
.build()
)
val promotionDeferred = members.associateWith { member ->
async {
MessageSender.sendNonDurably(
message = promoteMessage,
address = Address.fromSerialized(member.hexString),
isSyncMessage = false
).await()
val promotionDeferred = members.associateWith { member ->
async {
MessageSender.sendNonDurably(
message = promoteMessage,
address = Address.fromSerialized(member.hexString),
isSyncMessage = false
).await()
}
}
}
// Wait and gather all the promote message sending result into a result map
val promotedByMemberIDs = promotionDeferred
.mapValues {
runCatching { it.value.await() }.isSuccess
}
// Wait and gather all the promote message sending result into a result map
val promotedByMemberIDs = promotionDeferred
.mapValues {
runCatching { it.value.await() }.isSuccess
}
// Update each member's status
configFactory.withMutableGroupConfigs(group) { configs ->
promotedByMemberIDs.asSequence()
.mapNotNull { (member, success) ->
configs.groupMembers.get(member.hexString)?.apply {
if (success) {
setPromotionSent()
} else {
setPromotionFailed()
// Update each member's status
configFactory.withMutableGroupConfigs(group) { configs ->
promotedByMemberIDs.asSequence()
.mapNotNull { (member, success) ->
configs.groupMembers.get(member.hexString)?.apply {
if (success) {
setPromotionSent()
} else {
setPromotionFailed()
}
}
}
}
.forEach(configs.groupMembers::set)
}
.forEach(configs.groupMembers::set)
}
// Send a group update message to the group telling members someone has been promoted
val timestamp = clock.currentTimeMills()
val signature = SodiumUtilities.sign(
buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp),
adminKey
)
val message = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberChangeMessage(
GroupUpdateMemberChangeMessage.newBuilder()
.addAllMemberSessionIds(members.map { it.hexString })
.setType(GroupUpdateMemberChangeMessage.Type.PROMOTED)
.setAdminSignature(ByteString.copyFrom(signature))
)
.build()
).apply {
sentTimestamp = timestamp
}
// Send a group update message to the group telling members someone has been promoted
val timestamp = clock.currentTimeMills()
val signature = SodiumUtilities.sign(
buildMemberChangeSignature(GroupUpdateMemberChangeMessage.Type.PROMOTED, timestamp),
adminKey
)
val message = GroupUpdated(
GroupUpdateMessage.newBuilder()
.setMemberChangeMessage(
GroupUpdateMemberChangeMessage.newBuilder()
.addAllMemberSessionIds(members.map { it.hexString })
.setType(GroupUpdateMemberChangeMessage.Type.PROMOTED)
.setAdminSignature(ByteString.copyFrom(signature))
)
.build()
).apply {
sentTimestamp = timestamp
}
MessageSender.send(message, Destination.ClosedGroup(group.hexString), false).await()
storage.insertGroupInfoChange(message, group)
MessageSender.send(message, Destination.ClosedGroup(group.hexString), false).await()
storage.insertGroupInfoChange(message, group)
}
}
/**
* Mark this member as "removed" in the group config.
*
@ -591,7 +596,7 @@ class GroupManagerV2Impl @Inject constructor(
}
override suspend fun respondToInvitation(groupId: AccountId, approved: Boolean) =
withContext(dispatcher) {
scope.launchAndWait(groupId, "Respond to invitation") {
val group = requireNotNull(
configFactory.withUserConfigs { it.userGroups.getClosedGroup(groupId.hexString) }
) { "User groups config is not available" }
@ -633,6 +638,17 @@ class GroupManagerV2Impl @Inject constructor(
"Our account ID is not available"
}
val poller = checkNotNull(pollerFactory.pollerFor(group.groupAccountId)) { "Unable to start a poller for groups " }
poller.start()
// We need to wait until we have the first data polled from the poller, otherwise
// we won't have the necessary configs to send invite response/or do anything else.
// We can't hang on here forever if things don't work out, bail out if it's the camse
withTimeout(20_000L) {
poller.state.filterIsInstance<ClosedGroupPoller.StartedState>()
.filter { it.hadAtLeastOneSuccessfulPoll }
.first()
}
// Clear the invited flag of the group in the config
configFactory.withMutableUserConfigs { configs ->
configs.userGroups.set(group.copy(
@ -641,15 +657,6 @@ class GroupManagerV2Impl @Inject constructor(
))
}
val poller = checkNotNull(pollerFactory.pollerFor(group.groupAccountId)) { "Unable to start a poller for groups " }
poller.start()
// We need to wait until we have the first data polled from the poller, otherwise
// we won't have the necessary configs to send invite response/or do anything else
poller.state.filterIsInstance<ClosedGroupPoller.StartedState>()
.filter { it.hadAtLeastOneSuccessfulPoll }
.first()
if (group.adminKey == null) {
// Send an invite response to the group if we are invited as a regular member
val inviteResponse = GroupUpdateInviteResponseMessage.newBuilder()
@ -695,7 +702,7 @@ class GroupManagerV2Impl @Inject constructor(
inviterName: String?,
inviteMessageHash: String,
inviteMessageTimestamp: Long,
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(groupId, "Handle invitation") {
handleInvitation(
groupId = groupId,
groupName = groupName,
@ -716,7 +723,7 @@ class GroupManagerV2Impl @Inject constructor(
promoterName: String?,
promoteMessageHash: String,
promoteMessageTimestamp: Long,
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(groupId, "Handle promotion") {
val userAuth = requireNotNull(storage.userAuth) { "No current user available" }
val group = configFactory.getGroup(groupId)
@ -834,15 +841,15 @@ class GroupManagerV2Impl @Inject constructor(
groupId: AccountId,
sender: AccountId,
approved: Boolean
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(groupId, "Handle invite response") {
if (!approved) {
// We should only see approved coming through
return@withContext
return@launchAndWait
}
val adminKey = configFactory.getGroup(groupId)?.adminKey
if (adminKey == null || adminKey.isEmpty()) {
return@withContext // We don't have the admin key, we can't process the invite response
return@launchAndWait // We don't have the admin key, we can't process the invite response
}
configFactory.withMutableGroupConfigs(groupId) { configs ->
@ -857,14 +864,14 @@ class GroupManagerV2Impl @Inject constructor(
}
}
override suspend fun handleKicked(groupId: AccountId): Unit = withContext(dispatcher) {
override suspend fun handleKicked(groupId: AccountId): Unit = scope.launchAndWait(groupId, "Handle kicked") {
Log.d(TAG, "We were kicked from the group, delete and stop polling")
// Stop polling the group immediately
pollerFactory.pollerFor(groupId)?.stop()
val userId = requireNotNull(storage.getUserPublicKey()) { "No current user available" }
val group = configFactory.getGroup(groupId) ?: return@withContext
val group = configFactory.getGroup(groupId) ?: return@launchAndWait
// Retrieve the group name one last time from the group info,
// as we are going to clear the keys, we won't have the chance to
@ -897,7 +904,7 @@ class GroupManagerV2Impl @Inject constructor(
}
override suspend fun setName(groupId: AccountId, newName: String): Unit =
withContext(dispatcher) {
scope.launchAndWait(groupId, "Set group name") {
val adminKey = requireAdminAccess(groupId)
val nameChanged = configFactory.withMutableGroupConfigs(groupId) { configs ->
@ -910,7 +917,7 @@ class GroupManagerV2Impl @Inject constructor(
}
if (!nameChanged) {
return@withContext
return@launchAndWait
}
val timestamp = clock.currentTimeMills()
@ -940,7 +947,7 @@ class GroupManagerV2Impl @Inject constructor(
override suspend fun requestMessageDeletion(
groupId: AccountId,
messageHashes: Set<String>
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(groupId, "Request message deletion") {
// To delete messages from a group, there are a few considerations:
// 1. Messages are stored on every member's device, we need a way to ask them to delete their stored messages
// 2. Messages are also stored on the group swarm, only the group admin can delete them
@ -1014,7 +1021,7 @@ class GroupManagerV2Impl @Inject constructor(
timestamp: Long,
sender: AccountId,
senderIsVerifiedAdmin: Boolean,
): Unit = withContext(dispatcher) {
): Unit = scope.launchAndWait(groupId, "Handle delete member content") {
val threadId =
requireNotNull(storage.getThreadId(Address.fromSerialized(groupId.hexString))) {
"No thread ID found for the group"
@ -1091,7 +1098,7 @@ class GroupManagerV2Impl @Inject constructor(
}
override fun onBlocked(groupAccountId: AccountId) {
GlobalScope.launch(dispatcher) {
scope.launch(groupAccountId, "On blocked") {
respondToInvitation(groupAccountId, false)
// Remove this group from config regardless

@ -10,7 +10,7 @@ import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigUpdateNotification
import org.session.libsession.utilities.waitUntilGroupConfigsPushed
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.database.Storage
import org.session.libsession.messaging.groups.GroupScope
import javax.inject.Inject
import javax.inject.Singleton
@ -21,6 +21,7 @@ import javax.inject.Singleton
@Singleton
class DestroyedGroupSync @Inject constructor(
private val configFactory: ConfigFactoryProtocol,
private val groupScope: GroupScope,
private val storage: StorageProtocol,
) {
private var job: Job? = null
@ -39,15 +40,17 @@ class DestroyedGroupSync @Inject constructor(
Log.d("DestroyedGroupSync", "Group is destroyed: $isDestroyed")
if (isDestroyed) {
// If there's un-pushed group config updates, wait until they are pushed.
// This is important, as the pushing process might need to access the UserGroupConfig,
// if we delete the UserGroupConfig before the pushing process, the pushing
// process will fail.
configFactory.waitUntilGroupConfigsPushed(update.groupId)
configFactory.withMutableUserConfigs { configs ->
configs.userGroups.getClosedGroup(update.groupId.hexString)?.let { group ->
configs.userGroups.set(group.copy(destroyed = true))
groupScope.launch(update.groupId, "DestroyedGroupSync") {
// If there's un-pushed group config updates, wait until they are pushed.
// This is important, as the pushing process might need to access the UserGroupConfig,
// if we delete the UserGroupConfig before the pushing process, the pushing
// process will fail.
configFactory.waitUntilGroupConfigsPushed(update.groupId)
configFactory.withMutableUserConfigs { configs ->
configs.userGroups.getClosedGroup(update.groupId.hexString)?.let { group ->
configs.userGroups.set(group.copy(destroyed = true))
}
}
}

@ -1,14 +1,14 @@
package org.thoughtcrime.securesms.groups.handler
import android.content.Context
import android.os.SystemClock
import com.google.protobuf.ByteString
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.launch
import network.loki.messenger.R
import network.loki.messenger.libsession_util.ReadableGroupKeysConfig
@ -17,7 +17,6 @@ import network.loki.messenger.libsession_util.util.GroupMember
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.MessageDataProvider
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.GroupUpdated
import org.session.libsession.messaging.sending_receiving.MessageSender
@ -30,18 +29,20 @@ import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigUpdateNotification
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.getGroup
import org.session.libsession.utilities.waitUntilGroupConfigsPushed
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsession.messaging.groups.GroupScope
import javax.inject.Inject
import javax.inject.Singleton
private const val TAG = "RemoveGroupMemberHandler"
private const val MIN_PROCESS_INTERVAL_MILLS = 1_000L
/**
* This handler is responsible for processing pending group member removals.
@ -56,55 +57,41 @@ class RemoveGroupMemberHandler @Inject constructor(
private val clock: SnodeClock,
private val messageDataProvider: MessageDataProvider,
private val storage: StorageProtocol,
private val groupScope: GroupScope,
) {
private var job: Job? = null
@OptIn(ExperimentalCoroutinesApi::class)
fun start() {
require(job == null) { "Already started" }
job = GlobalScope.launch {
while (true) {
// Make sure we have a local number before we start processing
textSecurePreferences.watchLocalNumber().first { it != null }
val processStartedAt = SystemClock.uptimeMillis()
textSecurePreferences
.watchLocalNumber()
.flatMapLatest { localNumber ->
if (localNumber == null) {
return@flatMapLatest emptyFlow()
}
try {
processPendingMemberRemoval()
} catch (e: Exception) {
Log.e(TAG, "Error processing pending member removal", e)
configFactory.configUpdateNotifications
}
configFactory.configUpdateNotifications.firstOrNull()
// Make sure we don't process too often. As some of the config changes don't apply
// to us, but we have no way to tell if it does or not. The safest way is to process
// everytime any config changes, with a minimum interval.
val delayMills =
MIN_PROCESS_INTERVAL_MILLS - (SystemClock.uptimeMillis() - processStartedAt)
if (delayMills > 0) {
delay(delayMills)
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
.collect { update ->
val adminKey = configFactory.getGroup(update.groupId)?.adminKey
if (adminKey != null) {
groupScope.launch(update.groupId, "Handle possible group removals") {
processPendingRemovalsForGroup(update.groupId, adminKey)
}
}
}
}
}
}
private suspend fun processPendingMemberRemoval() {
configFactory.withUserConfigs { it.userGroups.allClosedGroupInfo() }
.asSequence()
.filter { it.hasAdminKey() }
.forEach { group ->
processPendingRemovalsForGroup(group.groupAccountId, group.adminKey!!)
}
}
private suspend fun processPendingRemovalsForGroup(
groupAccountId: AccountId,
adminKey: ByteArray
) {
val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupAccountId, adminKey)
val (pendingRemovals, batchCalls) = configFactory.withGroupConfigs(groupAccountId) { configs ->
val pendingRemovals = configs.groupMembers.allWithStatus()
.filter { (member, status) -> member.isRemoved(status) }
@ -124,6 +111,8 @@ class RemoveGroupMemberHandler @Inject constructor(
// can be performed by everyone in the group.
val calls = ArrayList<SnodeAPI.SnodeBatchRequestInfo>(3)
val groupAuth = OwnedSwarmAuth.ofClosedGroup(groupAccountId, adminKey)
// Call No 1. Revoke sub-key. This call is crucial and must not fail for the rest of the operation to be successful.
calls += checkNotNull(
SnodeAPI.buildAuthenticatedRevokeSubKeyBatchRequest(
@ -171,7 +160,12 @@ class RemoveGroupMemberHandler @Inject constructor(
val node = SnodeAPI.getSingleTargetSnode(groupAccountId.hexString).await()
val response =
SnodeAPI.getBatchResponse(node, groupAccountId.hexString, batchCalls, sequence = true)
SnodeAPI.getBatchResponse(
node,
groupAccountId.hexString,
batchCalls,
sequence = true
)
val firstError = response.results.firstOrNull { !it.isSuccessful }
check(firstError == null) {
@ -195,7 +189,8 @@ class RemoveGroupMemberHandler @Inject constructor(
// Try to delete members' message. It's ok to fail as they will be re-tried in different
// cases (a.k.a the GroupUpdateDeleteMemberContent message handling) and could be by different admins.
val deletingMessagesForMembers = pendingRemovals.filter { (member, status) -> member.shouldRemoveMessages(status) }
val deletingMessagesForMembers =
pendingRemovals.filter { (member, status) -> member.shouldRemoveMessages(status) }
if (deletingMessagesForMembers.isNotEmpty()) {
val threadId = storage.getThreadId(Address.fromSerialized(groupAccountId.hexString))
if (threadId != null) {

@ -5,6 +5,7 @@ import com.goterl.lazysodium.utils.KeyPair
import org.session.libsession.database.MessageDataProvider
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.groups.GroupScope
import org.session.libsession.messaging.notifications.TokenFetcher
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeClock

@ -0,0 +1,109 @@
package org.session.libsession.messaging.groups
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log
import java.util.concurrent.atomic.AtomicLong
private const val TAG = "GroupScope"
/**
* A coroutine utility that limit the tasks into a group to be executed sequentially.
*
* This is useful for tasks that are related to group management, where the order of execution is important.
* It's probably harmful if you apply the scope on message retrieval, as normally the message retriveal
* doesn't have any order requirement and it will likly slow down usual group operations.
*/
class GroupScope(private val scope: CoroutineScope = GlobalScope) {
private val tasksByGroupId = hashMapOf<AccountId, ArrayDeque<Task<*>>>()
/**
* Launch a coroutine in a group context. The coroutine will be executed sequentially
* in the order they are launched, and the next coroutine will not be started until the previous one is completed.
* Each group has their own queue of tasks so they won't block each other.
*
* @groupId The group id that the coroutine belongs to.
* @debugName A debug name for the coroutine.
* @block The coroutine block.
*/
fun launch(groupId: AccountId, debugName: String, block: suspend () -> Unit) : Job {
return async(groupId, debugName) { block() }
}
/**
* Launch a coroutine in the given group scope and wait for it to complete.
*
* See [launch] for more details.
*/
suspend fun <T> launchAndWait(groupId: AccountId, debugName: String, block: suspend () -> T): T {
return async(groupId, debugName, block).await()
}
/**
* Launch a coroutine in the given group scope and return a deferred result.
*
* See [launch] for more details.
*/
fun <T> async(groupId: AccountId, debugName: String, block: suspend () -> T) : Deferred<T> {
val completion = CompletableDeferred<T>()
synchronized(tasksByGroupId) {
val tasks = tasksByGroupId.getOrPut(groupId) { ArrayDeque() }
val task = Task(groupId, debugName, block, completion)
tasks.addLast(task)
Log.d(TAG, "Added $task to queue, queue size: ${tasks.size}")
// If this is the first task in the queue, start it directly (otherwise the next will be started by the previous task)
if (tasks.size == 1) {
scope.launch {
task.run()
}
}
}
return completion
}
private val taskIdSeq = AtomicLong(0)
private inner class Task<T>(val groupId: AccountId, val debugName: String, val block: suspend () -> T, val completion: CompletableDeferred<T>) {
private val id = taskIdSeq.getAndIncrement()
suspend fun run() {
Log.d(TAG, "Task started: $this")
try {
completion.complete(block())
} catch (e: Throwable) {
completion.completeExceptionally(e)
} finally {
Log.d(TAG, "Task completed: $this")
// Remove self from the queue and start the next task
synchronized(tasksByGroupId) {
val tasks = tasksByGroupId[groupId]
require(tasks != null && tasks.firstOrNull() == this) { "Task is not the first in the queue: $this" }
tasks.removeFirst()
if (tasks.isEmpty()) {
tasksByGroupId.remove(groupId)
} else {
val nextTask = tasks.first()
scope.launch {
nextTask.run()
}
}
}
}
}
override fun toString(): String {
return "Task($debugName, id=$id)"
}
}
}

@ -260,7 +260,7 @@ object MessageSender {
SnodeAPI.sendMessage(
auth = groupAuth,
message = snodeMessage,
namespace = namespace
namespace = namespace,
)
}
} else {

@ -238,6 +238,13 @@ class ClosedGroupPoller(
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
// As soon as we have handled config messages, the polling count as successful,
// as normally the outside world really only cares about configs.
val currentState = state.value as? StartedState
if (currentState != null && !currentState.hadAtLeastOneSuccessfulPoll) {
mutableState.value = currentState.copy(hadAtLeastOneSuccessfulPoll = true)
}
val regularMessages = groupMessageRetrieval.await()
handleMessages(regularMessages, snode)
}
@ -267,12 +274,6 @@ class ClosedGroupPoller(
}
}
}
// Update the state to indicate that we had at least one successful poll
val currentState = state.value as? StartedState
if (currentState != null && !currentState.hadAtLeastOneSuccessfulPoll) {
mutableState.value = currentState.copy(hadAtLeastOneSuccessfulPoll = true)
}
}
private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage {

@ -644,9 +644,7 @@ object SnodeAPI {
getBatchResponse(
snode = snode,
publicKey = batch.first().publicKey,
requests = batch.mapNotNull { info ->
info.request.takeIf { !info.callback.isClosedForSend }
},
requests = batch.map { it.request },
sequence = false
)
} catch (e: Exception) {
@ -691,9 +689,15 @@ object SnodeAPI {
request: SnodeBatchRequestInfo,
responseType: Class<T>,
): T {
val callback = Channel<Result<T>>()
val callback = Channel<Result<T>>(capacity = 1)
@Suppress("UNCHECKED_CAST")
batchedRequestsSender.send(RequestInfo(snode, publicKey, request, responseType, callback as SendChannel<Any>))
batchedRequestsSender.send(RequestInfo(
snode = snode,
publicKey = publicKey,
request = request,
responseType = responseType,
callback = callback as SendChannel<Any>
))
try {
return callback.receive().getOrThrow()
} catch (e: CancellationException) {

Loading…
Cancel
Save