Group poller tweaks (#995)

pull/1710/head
SessionHero01 1 month ago committed by GitHub
parent bd90383a02
commit b7ab2c51db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -13,8 +13,6 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.groups.GroupManagerV2
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
@ -22,6 +20,7 @@ import org.session.libsession.messaging.messages.Destination
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeClock
import org.session.libsession.snode.model.BatchResponse
import org.session.libsession.snode.model.RetrieveMessageResponse
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
@ -33,6 +32,7 @@ import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import org.thoughtcrime.securesms.util.AppVisibilityManager
import org.thoughtcrime.securesms.util.getRootCause
import java.time.Instant
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration.Companion.days
@ -41,8 +41,6 @@ class GroupPoller(
scope: CoroutineScope,
private val groupId: AccountId,
private val configFactoryProtocol: ConfigFactoryProtocol,
private val groupManagerV2: GroupManagerV2,
private val storage: StorageProtocol,
private val lokiApiDatabase: LokiAPIDatabaseProtocol,
private val clock: SnodeClock,
private val appVisibilityManager: AppVisibilityManager,
@ -50,6 +48,7 @@ class GroupPoller(
) {
companion object {
private const val POLL_INTERVAL = 3_000L
private const val SWARM_FETCH_INTERVAL = 1800_000L // Every 30 minutes
private const val TAG = "GroupPoller"
}
@ -73,9 +72,16 @@ class GroupPoller(
}
private class InternalPollState(
var swarmNodes: MutableSet<Snode> = mutableSetOf(),
var currentSnode: Snode? = null,
)
// The nodes for current swarm
var swarmNodes: Set<Snode> = emptySet(),
// The pool of snodes that are currently being used for polling
val pollPool: MutableSet<Snode> = hashSetOf()
) {
fun shouldFetchSwarmNodes(): Boolean {
return swarmNodes.isEmpty()
}
}
// A channel to send tokens to trigger a poll
private val pollOnceTokens = Channel<PollOnceToken>()
@ -162,20 +168,34 @@ class GroupPoller(
val pollStartedAt = Instant.now()
var groupExpired: Boolean? = null
var currentSnode: Snode? = null
val result = runCatching {
supervisorScope {
// Grab current snode or pick (and remove) a random one from the pool
val snode = pollState.currentSnode ?: run {
if (pollState.swarmNodes.isEmpty()) {
Log.d(TAG, "Fetching swarm nodes for $groupId")
pollState.swarmNodes.addAll(SnodeAPI.fetchSwarmNodes(groupId.hexString))
}
// Fetch snodes if we don't have any
val swarmNodes = if (pollState.shouldFetchSwarmNodes()) {
Log.d(TAG, "Fetching swarm nodes for $groupId")
val fetched = SnodeAPI.fetchSwarmNodes(groupId.hexString).toSet()
pollState.swarmNodes = fetched
fetched
} else {
pollState.swarmNodes
}
check(pollState.swarmNodes.isNotEmpty()) { "No swarm nodes found" }
pollState.swarmNodes.random().also {
pollState.currentSnode = it
pollState.swarmNodes.remove(it)
}
// Ensure we have at least one snode
check(swarmNodes.isNotEmpty()) {
"No swarm nodes found for $groupId"
}
// Fill the pool if it's empty
if (pollState.pollPool.isEmpty()) {
pollState.pollPool.addAll(swarmNodes)
}
// Take a random snode from the pool
val snode = pollState.pollPool.random().also {
pollState.pollPool.remove(it)
currentSnode = it
}
val groupAuth =
@ -241,7 +261,7 @@ class GroupPoller(
Namespace.GROUP_MESSAGES()
).orEmpty()
Log.d(TAG, "Retrieving group message since lastHash = $lastHash")
Log.v(TAG, "Retrieving group($groupId) message since lastHash = $lastHash, snode = ${snode.publicKeySet}")
SnodeAPI.sendBatchRequest(
snode = snode,
@ -331,9 +351,18 @@ class GroupPoller(
val error = result.exceptionOrNull()
Log.e(TAG, "Error polling group", error)
if (error !is NonRetryableException && error !is CancellationException) {
// If the error can be retried, reset the current snode so we use another one
pollState.currentSnode = null
// Find if any exception throws in the process has a root cause of a node returning bad response,
// then we will remove this snode from our swarm nodes set
if (error != null && currentSnode != null) {
val badResponse = (sequenceOf(error) + error.suppressedExceptions.asSequence())
.firstOrNull { err ->
err.getRootCause<BatchResponse.Error>()?.item?.let { it.isServerError || it.isSnodeNoLongerPartOfSwarm } == true
}
if (badResponse != null) {
Log.e(TAG, "Group polling failed due to a server error", badResponse)
pollState.swarmNodes -= currentSnode!!
}
}
}
@ -344,8 +373,6 @@ class GroupPoller(
groupExpired = groupExpired
)
Log.d(TAG, "Polling group $groupId result = $pollResult")
return pollResult
}

@ -54,8 +54,6 @@ import javax.inject.Singleton
@Singleton
class GroupPollerManager @Inject constructor(
configFactory: ConfigFactory,
groupManagerV2: Lazy<GroupManagerV2>,
storage: StorageProtocol,
lokiApiDatabase: LokiAPIDatabaseProtocol,
clock: SnodeClock,
preferences: TextSecurePreferences,
@ -113,8 +111,6 @@ class GroupPollerManager @Inject constructor(
scope = scope,
groupId = groupId,
configFactoryProtocol = configFactory,
groupManagerV2 = groupManagerV2.get(),
storage = storage,
lokiApiDatabase = lokiApiDatabase,
clock = clock,
appVisibilityManager = appVisibilityManager,

@ -0,0 +1,21 @@
package org.thoughtcrime.securesms.util
/**
* Walk the cause chain of this throwable. This chain includes itself as the first element.
*/
fun Throwable.causes(): Sequence<Throwable> = sequence {
var current: Throwable? = this@causes
while (current != null) {
yield(current)
current = current.cause
}
}
/**
* Find out if this throwable as a root cause of the specified type, if so return it.
*/
inline fun <reified E: Throwable> Throwable.getRootCause(): E? {
return causes()
.filterIsInstance<E>()
.firstOrNull()
}

@ -4,6 +4,8 @@ package org.session.libsession.snode
import android.os.SystemClock
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.TextNode
import com.goterl.lazysodium.exceptions.SodiumException
import com.goterl.lazysodium.interfaces.GenericHash
import com.goterl.lazysodium.interfaces.PwHash
@ -657,14 +659,14 @@ object SnodeAPI {
// back through the request's callback.
for ((req, resp) in batch.zip(responses.results)) {
val result = runCatching {
check(resp.code == 200) {
"Error calling \"${req.request.method}\" with code = ${resp.code}, msg = ${resp.body}"
if (!resp.isSuccessful) {
throw BatchResponse.Error(resp)
}
JsonUtil.fromJson(resp.body, req.responseType)
}
runCatching{
runCatching {
req.callback.send(result)
}
}

@ -13,5 +13,20 @@ data class BatchResponse @JsonCreator constructor(
) {
val isSuccessful: Boolean
get() = code in 200..299
val isServerError: Boolean
get() = code in 500..599
val isSnodeNoLongerPartOfSwarm: Boolean
get() = code == 421
}
data class Error(val item: Item)
: RuntimeException("Batch request failed with code ${item.code}") {
init {
require(!item.isSuccessful) {
"This response item does not represent an error state"
}
}
}
}

Loading…
Cancel
Save