Rework group poller lifecycle so that it supports polling once (#967)

pull/1710/head
SessionHero01 1 month ago committed by GitHub
parent df977ad15d
commit ad7792f659
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -266,6 +266,7 @@ dependencies {
ksp("com.google.dagger:hilt-compiler:$daggerHiltVersion")
ksp("com.github.bumptech.glide:ksp:$glideVersion")
implementation("androidx.hilt:hilt-navigation-compose:$androidxHiltVersion")
implementation("androidx.hilt:hilt-work:$androidxHiltVersion")
implementation("com.google.dagger:hilt-android:$daggerHiltVersion")
implementation "androidx.appcompat:appcompat:$appcompatVersion"

@ -447,7 +447,7 @@
</intent-filter>
</receiver>
<receiver
android:name="org.thoughtcrime.securesms.notifications.BackgroundPollWorker$BootBroadcastReceiver"
android:name="org.thoughtcrime.securesms.notifications.BackgroundPollManager$BootBroadcastReceiver"
android:enabled="true"
android:exported="true">
<intent-filter>
@ -473,6 +473,22 @@
<meta-data
android:name="com.sec.android.multiwindow.MINIMUM_SIZE_H"
android:value="598.0dip" />
<!-- Disable work manager initializer so it uses our configuration -->
<provider
android:name="androidx.startup.InitializationProvider"
android:authorities="${applicationId}.androidx-startup"
android:exported="false"
tools:node="merge">
<!-- If you are using androidx.startup to initialize other components -->
<meta-data
android:name="androidx.work.WorkManagerInitializer"
android:value="androidx.startup"
tools:node="remove" />
</provider>
</application>
</manifest>

@ -34,9 +34,11 @@ import androidx.annotation.StringRes;
import androidx.core.content.pm.ShortcutInfoCompat;
import androidx.core.content.pm.ShortcutManagerCompat;
import androidx.core.graphics.drawable.IconCompat;
import androidx.hilt.work.HiltWorkerFactory;
import androidx.lifecycle.DefaultLifecycleObserver;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.ProcessLifecycleOwner;
import androidx.work.Configuration;
import com.squareup.phrase.Phrase;
@ -91,6 +93,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.AndroidLogger;
import org.thoughtcrime.securesms.logging.PersistentLogger;
import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger;
import org.thoughtcrime.securesms.notifications.BackgroundPollManager;
import org.thoughtcrime.securesms.notifications.BackgroundPollWorker;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.notifications.PushRegistrationHandler;
@ -116,6 +119,7 @@ import java.util.Timer;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import javax.inject.Provider;
import dagger.Lazy;
import dagger.hilt.EntryPoints;
@ -134,7 +138,7 @@ import network.loki.messenger.R;
* @author Moxie Marlinspike
*/
@HiltAndroidApp
public class ApplicationContext extends Application implements DefaultLifecycleObserver, Toaster {
public class ApplicationContext extends Application implements DefaultLifecycleObserver, Toaster, Configuration.Provider {
public static final String PREFERENCES_NAME = "SecureSMS-Preferences";
@ -147,6 +151,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
private Handler conversationListHandler;
private PersistentLogger persistentLogger;
@Inject HiltWorkerFactory workerFactory;
@Inject LokiAPIDatabase lokiAPIDatabase;
@Inject public Storage storage;
@Inject Device device;
@ -176,6 +181,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
@Inject LegacyClosedGroupPollerV2 legacyClosedGroupPollerV2;
@Inject LegacyGroupDeprecationManager legacyGroupDeprecationManager;
@Inject CleanupInvitationHandler cleanupInvitationHandler;
@Inject BackgroundPollManager backgroundPollManager; // Exists here only to start upon app starts
@Inject AppVisibilityManager appVisibilityManager; // Exists here only to start upon app starts
@Inject GroupPollerManager groupPollerManager; // Exists here only to start upon app starts
@Inject ExpiredGroupManager expiredGroupManager; // Exists here only to start upon app starts
@ -282,7 +288,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
broadcaster = new Broadcaster(this);
boolean useTestNet = textSecurePreferences.getEnvironment() == Environment.TEST_NET;
SnodeModule.Companion.configure(apiDB, broadcaster, useTestNet);
initializePeriodicTasks();
SSKEnvironment.Companion.configure(typingStatusRepository, readReceiptManager, profileManager, getMessageNotifier(), expiringMessageManager);
initializeWebRtc();
initializeBlobProvider();
@ -318,6 +323,14 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
}
}
@NonNull
@Override
public Configuration getWorkManagerConfiguration() {
return new Configuration.Builder()
.setWorkerFactory(workerFactory)
.build();
}
@Override
public void onStart(@NonNull LifecycleOwner owner) {
isAppVisible = true;
@ -434,10 +447,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionLogger(originalHandler));
}
private void initializePeriodicTasks() {
BackgroundPollWorker.schedulePeriodic(this);
}
private void initializeWebRtc() {
try {
PeerConnectionFactory.initialize(InitializationOptions.builder(this).createInitializationOptions());

@ -973,7 +973,7 @@ open class Storage @Inject constructor(
return lokiAPIDatabase.getLatestClosedGroupEncryptionKeyPair(groupPublicKey)
}
override fun getAllClosedGroupPublicKeys(): Set<String> {
override fun getAllLegacyGroupPublicKeys(): Set<String> {
return lokiAPIDatabase.getAllClosedGroupPublicKeys()
}

@ -13,7 +13,6 @@ import kotlinx.coroutines.GlobalScope
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupScope
import org.session.libsession.messaging.groups.LegacyGroupDeprecationManager
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.TextSecurePreferences

@ -6,7 +6,6 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.stateIn
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log
import javax.inject.Inject
@ -27,7 +26,7 @@ class ExpiredGroupManager @Inject constructor(
@Suppress("OPT_IN_USAGE")
val expiredGroups: StateFlow<Set<AccountId>> = pollerManager.watchAllGroupPollingState()
.mapNotNull { (groupId, state) ->
val expired = (state as? ClosedGroupPoller.StartedState)?.expired
val expired = state.lastPoll?.groupExpired
if (expired == null) {
// Poller doesn't know about the expiration state yet, so we skip

@ -7,7 +7,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
@ -30,7 +29,6 @@ import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.GroupUpdated
import org.session.libsession.messaging.messages.visible.Profile
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsession.messaging.utilities.MessageAuthentication.buildDeleteMemberContentSignature
import org.session.libsession.messaging.utilities.MessageAuthentication.buildInfoChangeSignature
import org.session.libsession.messaging.utilities.MessageAuthentication.buildMemberChangeSignature
@ -676,10 +674,13 @@ class GroupManagerV2Impl @Inject constructor(
// We need to wait until we have the first data polled from the poller, otherwise
// we won't have the necessary configs to send invite response/or do anything else.
// We can't hang on here forever if things don't work out, bail out if it's the camse
// We can't hang on here forever if things don't work out, bail out if it's the case.
withTimeout(20_000L) {
// We must tell the poller to poll once, as we could have received this invitation
// in the background where the poller isn't running
groupPollerManager.pollOnce(group.groupAccountId)
groupPollerManager.watchGroupPollingState(group.groupAccountId)
.filterIsInstance<ClosedGroupPoller.StartedState>()
.filter { it.hadAtLeastOneSuccessfulPoll }
.first()
}

@ -0,0 +1,467 @@
package org.thoughtcrime.securesms.groups
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeClock
import org.session.libsession.snode.model.RetrieveMessageResponse
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.getGroup
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.exceptions.NonRetryableException
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import org.thoughtcrime.securesms.util.AppVisibilityManager
import java.time.Instant
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration.Companion.days
class GroupPoller(
scope: CoroutineScope,
private val groupId: AccountId,
private val configFactoryProtocol: ConfigFactoryProtocol,
private val groupManagerV2: GroupManagerV2,
private val storage: StorageProtocol,
private val lokiApiDatabase: LokiAPIDatabaseProtocol,
private val clock: SnodeClock,
private val appVisibilityManager: AppVisibilityManager,
) {
companion object {
private const val POLL_INTERVAL = 3_000L
private const val TAG = "GroupPoller"
}
data class State(
val hadAtLeastOneSuccessfulPoll: Boolean = false,
val lastPoll: PollResult? = null,
val inProgress: Boolean = false,
)
data class PollResult(
val startedAt: Instant,
val finishedAt: Instant,
val result: Result<Unit>,
val groupExpired: Boolean?
) {
fun hasNonRetryableError(): Boolean {
val e = result.exceptionOrNull()
return e != null && (e is NonRetryableException || e is CancellationException)
}
}
private class InternalPollState(
var swarmNodes: MutableSet<Snode> = mutableSetOf(),
var currentSnode: Snode? = null,
)
// A channel to send tokens to trigger a poll
private val pollOnceTokens = Channel<PollOnceToken>()
// A flow that represents the state of the poller.
val state: StateFlow<State> = flow {
var lastState = State()
val pendingTokens = mutableListOf<PollOnceToken>()
val internalPollState = InternalPollState()
while (true) {
pendingTokens.add(pollOnceTokens.receive())
// Drain all the tokens we've received up to this point, so we can reply them all at once
while (true) {
val result = pollOnceTokens.tryReceive()
result.getOrNull()?.let(pendingTokens::add) ?: break
}
lastState = lastState.copy(inProgress = true).also { emit(it) }
val pollResult = doPollOnce(internalPollState)
lastState = lastState.copy(
hadAtLeastOneSuccessfulPoll = lastState.hadAtLeastOneSuccessfulPoll || pollResult.result.isSuccess,
lastPoll = pollResult,
inProgress = false
).also { emit(it) }
// Notify all pending tokens
pendingTokens.forEach { it.resultCallback.send(pollResult) }
pendingTokens.clear()
}
}.stateIn(scope, SharingStarted.Eagerly, State())
init {
// This coroutine is here to periodically request polling the group when the app
// becomes visible
scope.launch {
while (true) {
// Wait for the app becomes visible
appVisibilityManager.isAppVisible.first { visible -> visible }
// As soon as the app becomes visible, start polling
if (requestPollOnce().hasNonRetryableError()) {
Log.v(TAG, "Error polling group $groupId and stopped polling")
break
}
// As long as the app is visible, keep polling
while (true) {
// Wait POLL_INTERVAL
delay(POLL_INTERVAL)
val appInBackground = !appVisibilityManager.isAppVisible.value
if (appInBackground) {
Log.d(TAG, "App became invisible, stopping polling group $groupId")
break
}
if (requestPollOnce().hasNonRetryableError()) {
Log.v(TAG, "Error polling group $groupId and stopped polling")
return@launch
}
}
}
}
}
/**
* Request to poll the group once and return the result. It's guaranteed that
* the poll will be run AT LEAST once after the request is sent, but it's not guaranteed
* that one request will result in one poll, as the poller may choose to batch multiple requests
* together.
*/
suspend fun requestPollOnce(): PollResult {
val resultChannel = Channel<PollResult>()
pollOnceTokens.send(PollOnceToken(resultChannel))
return resultChannel.receive()
}
private suspend fun doPollOnce(pollState: InternalPollState): PollResult {
val pollStartedAt = Instant.now()
var groupExpired: Boolean? = null
val result = runCatching {
supervisorScope {
// Grab current snode or pick (and remove) a random one from the pool
val snode = pollState.currentSnode ?: run {
if (pollState.swarmNodes.isEmpty()) {
Log.d(TAG, "Fetching swarm nodes for $groupId")
pollState.swarmNodes.addAll(SnodeAPI.fetchSwarmNodes(groupId.hexString))
}
check(pollState.swarmNodes.isNotEmpty()) { "No swarm nodes found" }
pollState.swarmNodes.random().also {
pollState.currentSnode = it
pollState.swarmNodes.remove(it)
}
}
val groupAuth =
configFactoryProtocol.getGroupAuth(groupId) ?: return@supervisorScope
val configHashesToExtends = configFactoryProtocol.withGroupConfigs(groupId) {
buildSet {
addAll(it.groupKeys.currentHashes())
addAll(it.groupInfo.currentHashes())
addAll(it.groupMembers.currentHashes())
}
}
val group = configFactoryProtocol.getGroup(groupId)
if (group == null) {
throw NonRetryableException("Group doesn't exist")
}
if (group.kicked) {
throw NonRetryableException("Group has been kicked")
}
val adminKey = group.adminKey
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
val receiveRevokeMessage = async {
SnodeAPI.sendBatchRequest(
snode,
groupId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
groupId.hexString,
Namespace.REVOKED_GROUP_MESSAGES()
).orEmpty(),
auth = groupAuth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
RetrieveMessageResponse::class.java
).messages.filterNotNull()
}
if (configHashesToExtends.isNotEmpty() && adminKey != null) {
pollingTasks += "extending group config TTL" to async {
SnodeAPI.sendBatchRequest(
snode,
groupId.hexString,
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = configHashesToExtends.toList(),
auth = groupAuth,
newExpiry = clock.currentTimeMills() + 14.days.inWholeMilliseconds,
extend = true
),
)
}
}
val groupMessageRetrieval = async {
val lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
groupId.hexString,
Namespace.CLOSED_GROUP_MESSAGES()
).orEmpty()
Log.d(TAG, "Retrieving group message since lastHash = $lastHash")
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = groupId.hexString,
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lastHash,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
responseType = Map::class.java
)
}
val groupConfigRetrieval = listOf(
Namespace.ENCRYPTION_KEYS(),
Namespace.CLOSED_GROUP_INFO(),
Namespace.CLOSED_GROUP_MEMBERS()
).map { ns ->
async {
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = groupId.hexString,
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
groupId.hexString,
ns
).orEmpty(),
auth = groupAuth,
namespace = ns,
maxSize = null,
),
responseType = RetrieveMessageResponse::class.java
).messages.filterNotNull()
}
}
// The retrieval of the all group messages can be done concurrently,
// however, in order for the messages to be able to be decrypted, the config messages
// must be processed first.
pollingTasks += "polling and handling group config keys and messages" to async {
val result = runCatching {
val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() }
handleGroupConfigMessages(keysMessage, infoMessage, membersMessage)
saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
groupExpired = configFactoryProtocol.withGroupConfigs(groupId) {
it.groupKeys.size() == 0
}
val regularMessages = groupMessageRetrieval.await()
handleMessages(regularMessages, snode)
}
// Revoke message must be handled regardless, and at the end
val revokedMessages = receiveRevokeMessage.await()
handleRevoked(revokedMessages)
saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES())
// Propagate any prior exceptions
result.getOrThrow()
}
// Wait for all tasks to complete, gather any exceptions happened during polling
val errors = pollingTasks.mapNotNull { (name, task) ->
runCatching { task.await() }
.exceptionOrNull()
?.takeIf { it !is CancellationException }
?.let { RuntimeException("Error $name", it) }
}
// If there were any errors, throw the first one and add the rest as "suppressed" exceptions
if (errors.isNotEmpty()) {
throw errors.first().apply {
for (index in 1 until errors.size) {
addSuppressed(errors[index])
}
}
}
}
}
if (result.isFailure) {
val error = result.exceptionOrNull()
Log.e(TAG, "Error polling group", error)
if (error !is NonRetryableException && error !is CancellationException) {
// If the error can be retried, reset the current snode so we use another one
pollState.currentSnode = null
}
}
val pollResult = PollResult(
startedAt = pollStartedAt,
finishedAt = Instant.now(),
result = result,
groupExpired = groupExpired
)
Log.d(TAG, "Polling group $groupId result = $pollResult")
return pollResult
}
private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage {
return ConfigMessage(hash, data, timestamp ?: clock.currentTimeMills())
}
private fun saveLastMessageHash(
snode: Snode,
messages: List<RetrieveMessageResponse.Message>,
namespace: Int
) {
if (messages.isNotEmpty()) {
lokiApiDatabase.setLastMessageHashValue(
snode = snode,
publicKey = groupId.hexString,
newValue = messages.last().hash,
namespace = namespace
)
}
}
private suspend fun handleRevoked(messages: List<RetrieveMessageResponse.Message>) {
messages.forEach { msg ->
val decoded = configFactoryProtocol.decryptForUser(
msg.data,
Sodium.KICKED_DOMAIN,
groupId,
)
if (decoded != null) {
// The message should be in the format of "<sessionIdPubKeyBinary><messageGenerationASCII>",
// where the pub key is 32 bytes, so we need to have at least 33 bytes of data
if (decoded.size < 33) {
Log.w(TAG, "Received an invalid kicked message, expecting at least 33 bytes, got ${decoded.size}")
return@forEach
}
val sessionId = AccountId(IdPrefix.STANDARD, decoded.copyOfRange(0, 32))
val messageGeneration = decoded.copyOfRange(32, decoded.size).decodeToString().toIntOrNull()
if (messageGeneration == null) {
Log.w(TAG, "Received an invalid kicked message: missing message generation")
return@forEach
}
val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(groupId) {
it.groupKeys.currentGeneration()
}
val isForMe = sessionId.hexString == storage.getUserPublicKey()
Log.d(TAG, "Received kicked message, for us? ${isForMe}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration")
if (isForMe && messageGeneration >= currentKeysGeneration) {
groupManagerV2.handleKicked(groupId)
}
}
}
}
private fun handleGroupConfigMessages(
keysResponse: List<RetrieveMessageResponse.Message>,
infoResponse: List<RetrieveMessageResponse.Message>,
membersResponse: List<RetrieveMessageResponse.Message>
) {
if (keysResponse.isEmpty() && infoResponse.isEmpty() && membersResponse.isEmpty()) {
return
}
Log.d(
TAG, "Handling group config messages(" +
"info = ${infoResponse.size}, " +
"keys = ${keysResponse.size}, " +
"members = ${membersResponse.size})"
)
configFactoryProtocol.mergeGroupConfigMessages(
groupId = groupId,
keys = keysResponse.map { it.toConfigMessage() },
info = infoResponse.map { it.toConfigMessage() },
members = membersResponse.map { it.toConfigMessage() },
)
}
private fun handleMessages(body: RawResponse, snode: Snode) {
val messages = configFactoryProtocol.withGroupConfigs(groupId) {
SnodeAPI.parseRawMessagesResponse(
rawResponse = body,
snode = snode,
publicKey = groupId.hexString,
decrypt = it.groupKeys::decrypt,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
)
}
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(
envelope.toByteArray(),
serverHash = serverHash,
closedGroup = Destination.ClosedGroup(groupId.hexString)
)
}
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
val job = BatchMessageReceiveJob(chunk)
JobQueue.shared.add(job)
}
if (messages.isNotEmpty()) {
Log.d(TAG, "Received and handled ${messages.size} group messages")
}
}
/**
* A token to poll a group once and receive the result. Note that it's not guaranteed that
* one token will trigger one poll, as the poller may batch multiple requests together.
*/
private data class PollOnceToken(val resultCallback: SendChannel<PollResult>)
}

@ -1,26 +1,31 @@
package org.thoughtcrime.securesms.groups
import dagger.Lazy
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.supervisorScope
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsession.snode.SnodeClock
import org.session.libsession.utilities.ConfigUpdateNotification
import org.session.libsession.utilities.TextSecurePreferences
@ -28,26 +33,26 @@ import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.dependencies.ConfigFactory
import org.thoughtcrime.securesms.dependencies.POLLER_SCOPE
import org.thoughtcrime.securesms.util.AppVisibilityManager
import org.thoughtcrime.securesms.util.InternetConnectivity
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
/**
* This class manages the lifecycle of group pollers.
*
* It listens for changes in the user's groups config and starts or stops pollers as needed. It
* also considers the app's visibility state to decide whether to start or stop pollers.
* It listens for changes in the user's groups config and create or destroyed pollers as needed. Other
* factors like network availability and user's local number are also taken into account.
*
* All the processes here are automatic and you don't need to do anything to start or stop pollers.
* All the processes here are automatic and you don't need to do anything to create or destroy pollers.
*
* This class also provide state monitoring facilities to check the state of a group poller.
*
* Note that whether a [GroupPoller] is polling things or not is determined by itself. The manager
* class is only responsible for the overall lifecycle of the pollers.
*/
@Singleton
class GroupPollerManager @Inject constructor(
@Named(POLLER_SCOPE) scope: CoroutineScope,
@Named(POLLER_SCOPE) executor: CoroutineDispatcher,
configFactory: ConfigFactory,
groupManagerV2: Lazy<GroupManagerV2>,
storage: StorageProtocol,
@ -55,15 +60,14 @@ class GroupPollerManager @Inject constructor(
clock: SnodeClock,
preferences: TextSecurePreferences,
appVisibilityManager: AppVisibilityManager,
connectivity: InternetConnectivity,
) {
@Suppress("OPT_IN_USAGE")
private val activeGroupPollers: StateFlow<Map<AccountId, ClosedGroupPoller>> =
private val groupPollers: StateFlow<Map<AccountId, GroupPollerHandle>> =
combine(
preferences.watchLocalNumber(),
appVisibilityManager.isAppVisible
) { localNumber, visible -> localNumber != null && visible }
.distinctUntilChanged()
connectivity.networkAvailable,
preferences.watchLocalNumber()
) { networkAvailable, localNumber -> networkAvailable && localNumber != null }
// This flatMap produces a flow of groups that should be polled now
.flatMapLatest { shouldPoll ->
if (shouldPoll) {
@ -86,12 +90,12 @@ class GroupPollerManager @Inject constructor(
// This scan compares the previous active group pollers with the incoming set of groups
// that should be polled now, to work out which pollers should be started or stopped,
// and finally emits the new state
.scan(emptyMap<AccountId, ClosedGroupPoller>()) { previous, newActiveGroupIDs ->
.scan(emptyMap<AccountId, GroupPollerHandle>()) { previous, newActiveGroupIDs ->
// Go through previous pollers and stop those that are not in the new set
for ((groupId, poller) in previous) {
if (groupId !in newActiveGroupIDs) {
Log.d(TAG, "Stopping poller for $groupId")
poller.stop()
poller.scope.cancel()
}
}
@ -102,16 +106,20 @@ class GroupPollerManager @Inject constructor(
if (poller == null) {
Log.d(TAG, "Starting poller for $groupId")
poller = ClosedGroupPoller(
scope = scope,
executor = executor,
closedGroupSessionId = groupId,
configFactoryProtocol = configFactory,
groupManagerV2 = groupManagerV2.get(),
storage = storage,
lokiApiDatabase = lokiApiDatabase,
clock = clock,
).also { it.start() }
val scope = CoroutineScope(Dispatchers.Default)
poller = GroupPollerHandle(
poller = GroupPoller(
scope = scope,
groupId = groupId,
configFactoryProtocol = configFactory,
groupManagerV2 = groupManagerV2.get(),
storage = storage,
lokiApiDatabase = lokiApiDatabase,
clock = clock,
appVisibilityManager = appVisibilityManager,
),
scope = scope
)
}
poller
@ -122,27 +130,55 @@ class GroupPollerManager @Inject constructor(
@Suppress("OPT_IN_USAGE")
fun watchGroupPollingState(groupId: AccountId): Flow<ClosedGroupPoller.State> {
return activeGroupPollers
fun watchGroupPollingState(groupId: AccountId): Flow<GroupPoller.State> {
return groupPollers
.flatMapLatest { pollers ->
pollers[groupId]?.state ?: flowOf(ClosedGroupPoller.IdleState)
pollers[groupId]?.poller?.state ?: flowOf(GroupPoller.State())
}
.distinctUntilChanged()
}
@OptIn(ExperimentalCoroutinesApi::class)
fun watchAllGroupPollingState(): Flow<Pair<AccountId, ClosedGroupPoller.State>> {
return activeGroupPollers
fun watchAllGroupPollingState(): Flow<Pair<AccountId, GroupPoller.State>> {
return groupPollers
.flatMapLatest { pollers ->
// Merge all poller states into a single flow of (groupId, state) pairs
merge(
*pollers
.map { (id, poller) -> poller.state.map { state -> id to state } }
.toTypedArray()
.map { (id, poller) -> poller.poller.state.map { state -> id to state } }
.toTypedArray()
)
}
}
suspend fun pollAllGroupsOnce() {
supervisorScope {
groupPollers.value.values.map {
async {
it.poller.requestPollOnce()
}
}.awaitAll()
}
}
/**
* Wait for a group to be polled once and return the poll result
*
* Note that if the group is not supposed to be polled (kicked, destroyed, etc) then
* this function will hang forever. It's your responsibility to set a timeout if needed.
*/
suspend fun pollOnce(groupId: AccountId): GroupPoller.PollResult {
return groupPollers.mapNotNull { it[groupId] }
.first()
.poller
.requestPollOnce()
}
data class GroupPollerHandle(
val poller: GroupPoller,
val scope: CoroutineScope,
)
companion object {
private const val TAG = "GroupPollerHandler"
}

@ -0,0 +1,65 @@
package org.thoughtcrime.securesms.notifications
import android.annotation.SuppressLint
import android.app.Application
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.launch
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.util.AppVisibilityManager
import javax.inject.Inject
import javax.inject.Singleton
/**
* This class automatically schedules and cancels the background polling work based on the
* visibility of the app and the availability of the logged in user.
*/
@OptIn(FlowPreview::class)
@Singleton
class BackgroundPollManager @Inject constructor(
application: Application,
appVisibilityManager: AppVisibilityManager,
textSecurePreferences: TextSecurePreferences,
) {
init {
@Suppress("OPT_IN_USAGE")
GlobalScope.launch {
combine(
textSecurePreferences.watchLocalNumber(),
// Debounce to avoid rapid toggling on visible app starts
appVisibilityManager.isAppVisible.debounce(1_000L)
) { localNumber, appVisible -> localNumber != null && !appVisible }
.distinctUntilChanged()
.collectLatest { shouldSchedule ->
if (shouldSchedule) {
Log.i(TAG, "Scheduling background polling work.")
BackgroundPollWorker.schedulePeriodic(application)
} else {
Log.i(TAG, "Cancelling background polling work.")
BackgroundPollWorker.cancelPeriodic(application)
}
}
}
}
class BootBroadcastReceiver : BroadcastReceiver() {
@SuppressLint("UnsafeProtectedBroadcastReceiver")
override fun onReceive(context: Context, intent: Intent) {
// This broadcast receiver does nothing but to bring up the app,
// once the app is up, the `BackgroundPollWorker` will have the chance to
// schedule any background polling work accordingly
}
}
companion object {
private const val TAG = "BackgroundPollManager"
}
}

@ -1,58 +1,64 @@
package org.thoughtcrime.securesms.notifications
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import androidx.hilt.work.HiltWorker
import androidx.work.Constraints
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.ExistingPeriodicWorkPolicy
import androidx.work.NetworkType
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.PeriodicWorkRequestBuilder
import androidx.work.WorkManager
import androidx.work.Worker
import androidx.work.WorkerParameters
import kotlinx.coroutines.GlobalScope
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.bind
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.supervisorScope
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.groups.LegacyGroupDeprecationManager
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.sending_receiving.pollers.LegacyClosedGroupPollerV2
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.recover
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
import org.thoughtcrime.securesms.database.LokiThreadDatabase
import org.thoughtcrime.securesms.groups.GroupPollerManager
import java.util.concurrent.TimeUnit
class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) {
enum class Targets {
DMS, CLOSED_GROUPS, OPEN_GROUPS
import kotlin.time.Duration.Companion.minutes
@HiltWorker
class BackgroundPollWorker @AssistedInject constructor(
@Assisted val context: Context,
@Assisted params: WorkerParameters,
private val storage: StorageProtocol,
private val deprecationManager: LegacyGroupDeprecationManager,
private val lokiThreadDatabase: LokiThreadDatabase,
private val groupPollerManager: GroupPollerManager,
) : CoroutineWorker(context, params) {
enum class Target {
ONE_TO_ONE,
LEGACY_GROUPS,
GROUPS,
OPEN_GROUPS
}
companion object {
const val TAG = "BackgroundPollWorker"
const val INITIAL_SCHEDULE_TIME = "INITIAL_SCHEDULE_TIME"
const val REQUEST_TARGETS = "REQUEST_TARGETS"
@JvmStatic
fun schedulePeriodic(context: Context) = schedulePeriodic(context, targets = Targets.values())
private const val TAG = "BackgroundPollWorker"
private const val REQUEST_TARGETS = "REQUEST_TARGETS"
@JvmStatic
fun schedulePeriodic(context: Context, targets: Array<Targets>) {
fun schedulePeriodic(context: Context, targets: Collection<Target> = Target.entries) {
Log.v(TAG, "Scheduling periodic work.")
val durationMinutes: Long = 15
val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(durationMinutes, TimeUnit.MINUTES)
val interval = 15.minutes
val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(interval.inWholeSeconds, TimeUnit.SECONDS)
builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
.setInitialDelay(interval.inWholeSeconds, TimeUnit.SECONDS)
val dataBuilder = Data.Builder()
dataBuilder.putLong(INITIAL_SCHEDULE_TIME, System.currentTimeMillis() + (durationMinutes * 60 * 1000))
dataBuilder.putStringArray(REQUEST_TARGETS, targets.map { it.name }.toTypedArray())
builder.setInputData(dataBuilder.build())
@ -64,8 +70,12 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
)
}
@JvmStatic
fun scheduleOnce(context: Context, targets: Array<Targets> = Targets.values()) {
fun cancelPeriodic(context: Context) {
Log.v(TAG, "Cancelling periodic work.")
WorkManager.getInstance(context).cancelUniqueWork(TAG)
}
fun scheduleOnce(context: Context, targets: Collection<Target> = Target.entries) {
Log.v(TAG, "Scheduling single run.")
val builder = OneTimeWorkRequestBuilder<BackgroundPollWorker>()
builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
@ -79,99 +89,85 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
}
}
override fun doWork(): Result {
if (TextSecurePreferences.getLocalNumber(context) == null) {
override suspend fun doWork(): Result {
val userAuth = storage.userAuth
if (userAuth == null) {
Log.v(TAG, "User not registered yet.")
return Result.failure()
}
// If this is a scheduled run and it is happening before the initial scheduled time (as
// periodic background tasks run immediately when scheduled) then don't actually do anything
// because this might slow requests on initial startup or triggered by PNs
val initialScheduleTime = inputData.getLong(INITIAL_SCHEDULE_TIME, -1)
if (initialScheduleTime != -1L && System.currentTimeMillis() < (initialScheduleTime - (60 * 1000))) {
Log.v(TAG, "Skipping initial run.")
return Result.success()
}
// Retrieve the desired targets (defaulting to all if not provided or empty)
val requestTargets: List<Targets> = (inputData.getStringArray(REQUEST_TARGETS) ?: emptyArray())
.map {
try { Targets.valueOf(it) }
catch(e: Exception) { null }
}
.filterNotNull()
.ifEmpty { Targets.values().toList() }
val requestTargets: List<Target> = (inputData.getStringArray(REQUEST_TARGETS) ?: emptyArray())
.map { enumValueOf<Target>(it) }
try {
Log.v(TAG, "Performing background poll for ${requestTargets.joinToString { it.name }}.")
val promises = mutableListOf<Promise<Unit, Exception>>()
// DMs
var dmsPromise: Promise<Unit, Exception> = Promise.ofSuccess(Unit)
supervisorScope {
val tasks = mutableListOf<Deferred<*>>()
// DMs
if (requestTargets.contains(Target.ONE_TO_ONE)) {
tasks += async {
Log.d(TAG, "Polling messages.")
val params = SnodeAPI.getMessages(userAuth).await().map { (envelope, serverHash) ->
MessageReceiveParameters(envelope.toByteArray(), serverHash, null)
}
if (requestTargets.contains(Targets.DMS)) {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
dmsPromise = SnodeAPI.getMessages(userAuth).bind { envelopes ->
val params = envelopes.map { (envelope, serverHash) ->
// FIXME: Using a job here seems like a bad idea...
MessageReceiveParameters(envelope.toByteArray(), serverHash, null)
}
GlobalScope.asyncPromise {
BatchMessageReceiveJob(params).executeAsync("background")
}
}
promises.add(dmsPromise)
}
// Closed groups
if (requestTargets.contains(Targets.CLOSED_GROUPS)) {
val closedGroupPoller = LegacyClosedGroupPollerV2(
MessagingModuleConfiguration.shared.storage,
MessagingModuleConfiguration.shared.legacyClosedGroupPollerV2.deprecationManager
) // Intentionally don't use shared
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.iterator().forEach { closedGroupPoller.poll(it) }
}
// Open Groups
var ogPollError: Exception? = null
// Legacy groups
if (requestTargets.contains(Target.LEGACY_GROUPS)) {
val poller = LegacyClosedGroupPollerV2(storage, deprecationManager)
if (requestTargets.contains(Targets.OPEN_GROUPS)) {
val threadDB = DatabaseComponent.get(context).lokiThreadDatabase()
val openGroups = threadDB.getAllOpenGroups()
val openGroupServers = openGroups.map { it.value.server }.toSet()
for (server in openGroupServers) {
val poller = OpenGroupPoller(server, null)
poller.hasStarted = true
// If one of the open group pollers fails we don't want it to cancel the DM
// poller so just hold on to the error for later
promises.add(
poller.poll().recover {
if (dmsPromise.isDone()) {
throw it
storage.getAllLegacyGroupPublicKeys()
.mapTo(tasks) { key ->
async {
Log.d(TAG, "Polling legacy group ${key.substring(0, 8)}...")
poller.poll(key)
}
}
}
ogPollError = it
// Open groups
if (requestTargets.contains(Target.OPEN_GROUPS)) {
lokiThreadDatabase.getAllOpenGroups()
.mapTo(hashSetOf()) { it.value.server }
.mapTo(tasks) { server ->
async {
Log.d(TAG, "Polling open group server $server.")
OpenGroupPoller(server, null)
.apply { hasStarted = true }
.poll()
.await()
}
}
)
}
}
// Wait until all the promises are resolved
all(promises).get()
// Close group
if (requestTargets.contains(Target.GROUPS)) {
tasks += async {
Log.d(TAG, "Polling all groups.")
groupPollerManager.pollAllGroupsOnce()
}
}
// If the Open Group pollers threw an exception then re-throw it here (now that
// the DM promise has completed)
val localOgPollException = ogPollError
val caughtException = tasks
.fold(null) { acc: Throwable?, result ->
try {
result.await()
acc
} catch (ec: Exception) {
Log.e(TAG, "Failed to poll group due to error.", ec)
acc?.also { it.addSuppressed(ec) } ?: ec
}
}
if (localOgPollException != null) {
throw localOgPollException
if (caughtException != null) {
throw caughtException
}
}
return Result.success()
@ -181,13 +177,4 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
}
}
class BootBroadcastReceiver: BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
if (intent.action == Intent.ACTION_BOOT_COMPLETED) {
Log.v(TAG, "Boot broadcast caught.")
schedulePeriodic(context)
}
}
}
}

@ -426,7 +426,7 @@ class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener {
BackgroundPollWorker.scheduleOnce(
this,
arrayOf(BackgroundPollWorker.Targets.DMS)
listOf(BackgroundPollWorker.Target.ONE_TO_ONE)
)
}
}

@ -144,7 +144,7 @@ interface StorageProtocol {
fun removeMember(groupID: String, member: Address)
fun updateMembers(groupID: String, members: List<Address>)
fun setZombieMembers(groupID: String, members: List<Address>)
fun getAllClosedGroupPublicKeys(): Set<String>
fun getAllLegacyGroupPublicKeys(): Set<String>
fun getAllActiveClosedGroupPublicKeys(): Set<String>
fun addClosedGroupPublicKey(groupPublicKey: String)
fun removeClosedGroupPublicKey(groupPublicKey: String)

@ -227,7 +227,7 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
TextSecurePreferences.setHasLegacyConfig(context, true)
if (!firstTimeSync) return
val allClosedGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
val allClosedGroupPublicKeys = storage.getAllLegacyGroupPublicKeys()
for (closedGroup in message.closedGroups) {
if (allClosedGroupPublicKeys.contains(closedGroup.publicKey)) {
// just handle the closed group encryption key pairs to avoid sync'd devices getting out of sync

@ -36,7 +36,7 @@ object PushRegistryV1 {
device: Device,
isPushEnabled: Boolean = TextSecurePreferences.isPushEnabled(context),
publicKey: String? = TextSecurePreferences.getLocalNumber(context),
legacyGroupPublicKeys: Collection<String> = MessagingModuleConfiguration.shared.storage.getAllClosedGroupPublicKeys()
legacyGroupPublicKeys: Collection<String> = MessagingModuleConfiguration.shared.storage.getAllLegacyGroupPublicKeys()
): Promise<*, Exception> = scope.asyncPromise {
if (isPushEnabled) {
retryWithUniformInterval(maxRetryCount = MAX_RETRY_COUNT) { doRegister(publicKey, device, legacyGroupPublicKeys) }

@ -1,398 +0,0 @@
package org.session.libsession.messaging.sending_receiving.pollers
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import network.loki.messenger.libsession_util.util.Sodium
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeClock
import org.session.libsession.snode.model.RetrieveMessageResponse
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.getGroup
import org.session.libsignal.database.LokiAPIDatabaseProtocol
import org.session.libsignal.exceptions.NonRetryableException
import org.session.libsignal.utilities.AccountId
import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration.Companion.days
class ClosedGroupPoller(
private val scope: CoroutineScope,
private val executor: CoroutineDispatcher,
private val closedGroupSessionId: AccountId,
private val configFactoryProtocol: ConfigFactoryProtocol,
private val groupManagerV2: GroupManagerV2,
private val storage: StorageProtocol,
private val lokiApiDatabase: LokiAPIDatabaseProtocol,
private val clock: SnodeClock,
) {
companion object {
private const val POLL_INTERVAL = 3_000L
private const val POLL_ERROR_RETRY_DELAY = 10_000L
private const val TAG = "ClosedGroupPoller"
}
sealed interface State
data object IdleState : State
data class StartedState(
internal val job: Job,
val expired: Boolean? = null,
val hadAtLeastOneSuccessfulPoll: Boolean = false,
) : State
private val mutableState = MutableStateFlow<State>(IdleState)
val state: StateFlow<State> get() = mutableState
fun start() {
if ((state.value as? StartedState)?.job?.isActive == true) return // already started, don't restart
Log.d(TAG, "Starting closed group poller for ${closedGroupSessionId.hexString.take(4)}")
val job = scope.launch(executor) {
while (isActive) {
try {
val swarmNodes =
SnodeAPI.fetchSwarmNodes(closedGroupSessionId.hexString).toMutableSet()
var currentSnode: Snode? = null
while (isActive) {
if (currentSnode == null) {
check(swarmNodes.isNotEmpty()) { "No more swarm nodes found" }
Log.d(
TAG,
"No current snode, getting a new one. Remaining in pool = ${swarmNodes.size - 1}"
)
currentSnode = swarmNodes.random()
swarmNodes.remove(currentSnode)
}
val result = runCatching { poll(currentSnode!!) }
when {
result.isSuccess -> {
delay(POLL_INTERVAL)
}
result.isFailure -> {
val error = result.exceptionOrNull()!!
if (error is CancellationException || error is NonRetryableException) {
throw error
}
Log.e(TAG, "Error polling closed group", error)
// Clearing snode so we get a new one next time
currentSnode = null
delay(POLL_INTERVAL)
}
}
}
} catch (e: CancellationException) {
throw e
} catch (e: NonRetryableException) {
Log.e(TAG, "Non-retryable error during group poller", e)
throw e
} catch (e: Exception) {
Log.e(TAG, "Error during group poller", e)
delay(POLL_ERROR_RETRY_DELAY)
}
}
}
mutableState.value = StartedState(job = job)
job.invokeOnCompletion {
mutableState.value = IdleState
}
}
fun stop() {
Log.d(TAG, "Stopping closed group poller for $closedGroupSessionId")
(state.value as? StartedState)?.job?.cancel()
}
private suspend fun poll(snode: Snode): Unit = supervisorScope {
val groupAuth =
configFactoryProtocol.getGroupAuth(closedGroupSessionId) ?: return@supervisorScope
val configHashesToExtends = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
buildSet {
addAll(it.groupKeys.currentHashes())
addAll(it.groupInfo.currentHashes())
addAll(it.groupMembers.currentHashes())
}
}
val group = configFactoryProtocol.getGroup(closedGroupSessionId)
if (group == null) {
throw NonRetryableException("Group doesn't exist")
}
if (group.kicked) {
throw NonRetryableException("Group has been kicked")
}
val adminKey = group.adminKey
val pollingTasks = mutableListOf<Pair<String, Deferred<*>>>()
val receiveRevokeMessage = async {
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
Namespace.REVOKED_GROUP_MESSAGES()
).orEmpty(),
auth = groupAuth,
namespace = Namespace.REVOKED_GROUP_MESSAGES(),
maxSize = null,
),
RetrieveMessageResponse::class.java
).messages.filterNotNull()
}
if (configHashesToExtends.isNotEmpty() && adminKey != null) {
pollingTasks += "extending group config TTL" to async {
SnodeAPI.sendBatchRequest(
snode,
closedGroupSessionId.hexString,
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = configHashesToExtends.toList(),
auth = groupAuth,
newExpiry = clock.currentTimeMills() + 14.days.inWholeMilliseconds,
extend = true
),
)
}
}
val groupMessageRetrieval = async {
val lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
Namespace.CLOSED_GROUP_MESSAGES()
).orEmpty()
Log.d(TAG, "Retrieving group message since lastHash = $lastHash")
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = closedGroupSessionId.hexString,
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lastHash,
auth = groupAuth,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
maxSize = null,
),
responseType = Map::class.java
)
}
val groupConfigRetrieval = listOf(
Namespace.ENCRYPTION_KEYS(),
Namespace.CLOSED_GROUP_INFO(),
Namespace.CLOSED_GROUP_MEMBERS()
).map { ns ->
async {
SnodeAPI.sendBatchRequest(
snode = snode,
publicKey = closedGroupSessionId.hexString,
request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode,
closedGroupSessionId.hexString,
ns
).orEmpty(),
auth = groupAuth,
namespace = ns,
maxSize = null,
),
responseType = RetrieveMessageResponse::class.java
).messages.filterNotNull()
}
}
// The retrieval of the all group messages can be done concurrently,
// however, in order for the messages to be able to be decrypted, the config messages
// must be processed first.
pollingTasks += "polling and handling group config keys and messages" to async {
val result = runCatching {
val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() }
handleGroupConfigMessages(keysMessage, infoMessage, membersMessage)
saveLastMessageHash(snode, keysMessage, Namespace.ENCRYPTION_KEYS())
saveLastMessageHash(snode, infoMessage, Namespace.CLOSED_GROUP_INFO())
saveLastMessageHash(snode, membersMessage, Namespace.CLOSED_GROUP_MEMBERS())
val isGroupExpired = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
it.groupKeys.size() == 0
}
// As soon as we have handled config messages, the polling count as successful,
// as normally the outside world really only cares about configs.
mutableState.update {
(it as? StartedState)?.copy(
hadAtLeastOneSuccessfulPoll = true,
expired = isGroupExpired,
) ?: it
}
val regularMessages = groupMessageRetrieval.await()
handleMessages(regularMessages, snode)
}
// Revoke message must be handled regardless, and at the end
val revokedMessages = receiveRevokeMessage.await()
handleRevoked(revokedMessages)
saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES())
// Propagate any prior exceptions
result.getOrThrow()
}
// Wait for all tasks to complete, gather any exceptions happened during polling
val errors = pollingTasks.mapNotNull { (name, task) ->
runCatching { task.await() }
.exceptionOrNull()
?.takeIf { it !is CancellationException }
?.let { RuntimeException("Error $name", it) }
}
// If there were any errors, throw the first one and add the rest as "suppressed" exceptions
if (errors.isNotEmpty()) {
throw errors.first().apply {
for (index in 1 until errors.size) {
addSuppressed(errors[index])
}
}
}
}
private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage {
return ConfigMessage(hash, data, timestamp ?: clock.currentTimeMills())
}
private fun saveLastMessageHash(
snode: Snode,
messages: List<RetrieveMessageResponse.Message>,
namespace: Int
) {
if (messages.isNotEmpty()) {
lokiApiDatabase.setLastMessageHashValue(
snode = snode,
publicKey = closedGroupSessionId.hexString,
newValue = messages.last().hash,
namespace = namespace
)
}
}
private suspend fun handleRevoked(messages: List<RetrieveMessageResponse.Message>) {
messages.forEach { msg ->
val decoded = configFactoryProtocol.decryptForUser(
msg.data,
Sodium.KICKED_DOMAIN,
closedGroupSessionId,
)
if (decoded != null) {
// The message should be in the format of "<sessionIdPubKeyBinary><messageGenerationASCII>",
// where the pub key is 32 bytes, so we need to have at least 33 bytes of data
if (decoded.size < 33) {
Log.w(TAG, "Received an invalid kicked message, expecting at least 33 bytes, got ${decoded.size}")
return@forEach
}
val sessionId = AccountId(IdPrefix.STANDARD, decoded.copyOfRange(0, 32))
val messageGeneration = decoded.copyOfRange(32, decoded.size).decodeToString().toIntOrNull()
if (messageGeneration == null) {
Log.w(TAG, "Received an invalid kicked message: missing message generation")
return@forEach
}
val currentKeysGeneration = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
it.groupKeys.currentGeneration()
}
val isForMe = sessionId.hexString == storage.getUserPublicKey()
Log.d(TAG, "Received kicked message, for us? ${isForMe}, message key generation = $messageGeneration, our key generation = $currentKeysGeneration")
if (isForMe && messageGeneration >= currentKeysGeneration) {
groupManagerV2.handleKicked(closedGroupSessionId)
}
}
}
}
private fun handleGroupConfigMessages(
keysResponse: List<RetrieveMessageResponse.Message>,
infoResponse: List<RetrieveMessageResponse.Message>,
membersResponse: List<RetrieveMessageResponse.Message>
) {
if (keysResponse.isEmpty() && infoResponse.isEmpty() && membersResponse.isEmpty()) {
return
}
Log.d(
TAG, "Handling group config messages(" +
"info = ${infoResponse.size}, " +
"keys = ${keysResponse.size}, " +
"members = ${membersResponse.size})"
)
configFactoryProtocol.mergeGroupConfigMessages(
groupId = closedGroupSessionId,
keys = keysResponse.map { it.toConfigMessage() },
info = infoResponse.map { it.toConfigMessage() },
members = membersResponse.map { it.toConfigMessage() },
)
}
private fun handleMessages(body: RawResponse, snode: Snode) {
val messages = configFactoryProtocol.withGroupConfigs(closedGroupSessionId) {
SnodeAPI.parseRawMessagesResponse(
rawResponse = body,
snode = snode,
publicKey = closedGroupSessionId.hexString,
decrypt = it.groupKeys::decrypt,
namespace = Namespace.CLOSED_GROUP_MESSAGES(),
)
}
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(
envelope.toByteArray(),
serverHash = serverHash,
closedGroup = Destination.ClosedGroup(closedGroupSessionId.hexString)
)
}
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
val job = BatchMessageReceiveJob(chunk)
JobQueue.shared.add(job)
}
if (messages.isNotEmpty()) {
Log.d(TAG, "Received and handled ${messages.size} group messages")
}
}
}

@ -48,7 +48,7 @@ class LegacyClosedGroupPollerV2(
class PollingCanceledException() : Exception("Polling canceled.")
fun start() {
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
val allGroupPublicKeys = storage.getAllLegacyGroupPublicKeys()
allGroupPublicKeys.iterator().forEach { startPolling(it) }
}

Loading…
Cancel
Save