diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt index c4b53df40b..84012a6d1b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt @@ -13,8 +13,6 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope -import org.session.libsession.database.StorageProtocol -import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveParameters @@ -22,6 +20,7 @@ import org.session.libsession.messaging.messages.Destination import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeClock +import org.session.libsession.snode.model.BatchResponse import org.session.libsession.snode.model.RetrieveMessageResponse import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigMessage @@ -33,6 +32,7 @@ import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode import org.thoughtcrime.securesms.util.AppVisibilityManager +import org.thoughtcrime.securesms.util.getRootCause import java.time.Instant import kotlin.coroutines.cancellation.CancellationException import kotlin.time.Duration.Companion.days @@ -41,8 +41,6 @@ class GroupPoller( scope: CoroutineScope, private val groupId: AccountId, private val configFactoryProtocol: ConfigFactoryProtocol, - private val groupManagerV2: GroupManagerV2, - private val storage: StorageProtocol, private val lokiApiDatabase: LokiAPIDatabaseProtocol, private val clock: SnodeClock, private val appVisibilityManager: AppVisibilityManager, @@ -50,6 +48,7 @@ class GroupPoller( ) { companion object { private const val POLL_INTERVAL = 3_000L + private const val SWARM_FETCH_INTERVAL = 1800_000L // Every 30 minutes private const val TAG = "GroupPoller" } @@ -73,9 +72,16 @@ class GroupPoller( } private class InternalPollState( - var swarmNodes: MutableSet = mutableSetOf(), - var currentSnode: Snode? = null, - ) + // The nodes for current swarm + var swarmNodes: Set = emptySet(), + + // The pool of snodes that are currently being used for polling + val pollPool: MutableSet = hashSetOf() + ) { + fun shouldFetchSwarmNodes(): Boolean { + return swarmNodes.isEmpty() + } + } // A channel to send tokens to trigger a poll private val pollOnceTokens = Channel() @@ -162,20 +168,34 @@ class GroupPoller( val pollStartedAt = Instant.now() var groupExpired: Boolean? = null + var currentSnode: Snode? = null + val result = runCatching { supervisorScope { - // Grab current snode or pick (and remove) a random one from the pool - val snode = pollState.currentSnode ?: run { - if (pollState.swarmNodes.isEmpty()) { - Log.d(TAG, "Fetching swarm nodes for $groupId") - pollState.swarmNodes.addAll(SnodeAPI.fetchSwarmNodes(groupId.hexString)) - } + // Fetch snodes if we don't have any + val swarmNodes = if (pollState.shouldFetchSwarmNodes()) { + Log.d(TAG, "Fetching swarm nodes for $groupId") + val fetched = SnodeAPI.fetchSwarmNodes(groupId.hexString).toSet() + pollState.swarmNodes = fetched + fetched + } else { + pollState.swarmNodes + } - check(pollState.swarmNodes.isNotEmpty()) { "No swarm nodes found" } - pollState.swarmNodes.random().also { - pollState.currentSnode = it - pollState.swarmNodes.remove(it) - } + // Ensure we have at least one snode + check(swarmNodes.isNotEmpty()) { + "No swarm nodes found for $groupId" + } + + // Fill the pool if it's empty + if (pollState.pollPool.isEmpty()) { + pollState.pollPool.addAll(swarmNodes) + } + + // Take a random snode from the pool + val snode = pollState.pollPool.random().also { + pollState.pollPool.remove(it) + currentSnode = it } val groupAuth = @@ -241,7 +261,7 @@ class GroupPoller( Namespace.GROUP_MESSAGES() ).orEmpty() - Log.d(TAG, "Retrieving group message since lastHash = $lastHash") + Log.v(TAG, "Retrieving group($groupId) message since lastHash = $lastHash, snode = ${snode.publicKeySet}") SnodeAPI.sendBatchRequest( snode = snode, @@ -331,9 +351,18 @@ class GroupPoller( val error = result.exceptionOrNull() Log.e(TAG, "Error polling group", error) - if (error !is NonRetryableException && error !is CancellationException) { - // If the error can be retried, reset the current snode so we use another one - pollState.currentSnode = null + // Find if any exception throws in the process has a root cause of a node returning bad response, + // then we will remove this snode from our swarm nodes set + if (error != null && currentSnode != null) { + val badResponse = (sequenceOf(error) + error.suppressedExceptions.asSequence()) + .firstOrNull { err -> + err.getRootCause()?.item?.let { it.isServerError || it.isSnodeNoLongerPartOfSwarm } == true + } + + if (badResponse != null) { + Log.e(TAG, "Group polling failed due to a server error", badResponse) + pollState.swarmNodes -= currentSnode!! + } } } @@ -344,8 +373,6 @@ class GroupPoller( groupExpired = groupExpired ) - Log.d(TAG, "Polling group $groupId result = $pollResult") - return pollResult } diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt index 5a8b9593dc..1780622e3c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt @@ -54,8 +54,6 @@ import javax.inject.Singleton @Singleton class GroupPollerManager @Inject constructor( configFactory: ConfigFactory, - groupManagerV2: Lazy, - storage: StorageProtocol, lokiApiDatabase: LokiAPIDatabaseProtocol, clock: SnodeClock, preferences: TextSecurePreferences, @@ -113,8 +111,6 @@ class GroupPollerManager @Inject constructor( scope = scope, groupId = groupId, configFactoryProtocol = configFactory, - groupManagerV2 = groupManagerV2.get(), - storage = storage, lokiApiDatabase = lokiApiDatabase, clock = clock, appVisibilityManager = appVisibilityManager, diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt b/app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt new file mode 100644 index 0000000000..fb97eddd54 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt @@ -0,0 +1,21 @@ +package org.thoughtcrime.securesms.util + +/** + * Walk the cause chain of this throwable. This chain includes itself as the first element. + */ +fun Throwable.causes(): Sequence = sequence { + var current: Throwable? = this@causes + while (current != null) { + yield(current) + current = current.cause + } +} + +/** + * Find out if this throwable as a root cause of the specified type, if so return it. + */ +inline fun Throwable.getRootCause(): E? { + return causes() + .filterIsInstance() + .firstOrNull() +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index f1634d8e2e..c27d1955e1 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -4,6 +4,8 @@ package org.session.libsession.snode import android.os.SystemClock import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.NullNode +import com.fasterxml.jackson.databind.node.TextNode import com.goterl.lazysodium.exceptions.SodiumException import com.goterl.lazysodium.interfaces.GenericHash import com.goterl.lazysodium.interfaces.PwHash @@ -657,14 +659,14 @@ object SnodeAPI { // back through the request's callback. for ((req, resp) in batch.zip(responses.results)) { val result = runCatching { - check(resp.code == 200) { - "Error calling \"${req.request.method}\" with code = ${resp.code}, msg = ${resp.body}" + if (!resp.isSuccessful) { + throw BatchResponse.Error(resp) } JsonUtil.fromJson(resp.body, req.responseType) } - runCatching{ + runCatching { req.callback.send(result) } } diff --git a/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt b/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt index 7bc4308417..723abfc79f 100644 --- a/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt +++ b/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt @@ -13,5 +13,20 @@ data class BatchResponse @JsonCreator constructor( ) { val isSuccessful: Boolean get() = code in 200..299 + + val isServerError: Boolean + get() = code in 500..599 + + val isSnodeNoLongerPartOfSwarm: Boolean + get() = code == 421 + } + + data class Error(val item: Item) + : RuntimeException("Batch request failed with code ${item.code}") { + init { + require(!item.isSuccessful) { + "This response item does not represent an error state" + } + } } }