From bd90383a024271f83ebc03d06c2f50b43f416258 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 28 Feb 2025 09:37:07 +1100 Subject: [PATCH 1/5] Disable three dot menu for kicked groups too (#993) --- .../securesms/conversation/v2/ConversationViewModel.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt index ceb8cefa5e..0850fbb797 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationViewModel.kt @@ -201,11 +201,11 @@ class ConversationViewModel( } val showOptionsMenu: Boolean - get() = !isMessageRequestThread && !isDeprecatedLegacyGroup && !isKickedGroupV2Thread + get() = !isMessageRequestThread && !isDeprecatedLegacyGroup && !isInactiveGroupV2Thread - private val isKickedGroupV2Thread: Boolean + private val isInactiveGroupV2Thread: Boolean get() = recipient?.isGroupV2Recipient == true && - configFactory.getGroup(AccountId(recipient!!.address.toString()))?.kicked == true + configFactory.getGroup(AccountId(recipient!!.address.toString()))?.shouldPoll == false private val isDeprecatedLegacyGroup: Boolean get() = recipient?.isLegacyGroupRecipient == true && legacyGroupDeprecationManager.isDeprecated From b7ab2c51dbf92c29d8e0708bf470b470b0b20ef0 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Mon, 3 Mar 2025 14:13:25 +1100 Subject: [PATCH 2/5] Group poller tweaks (#995) --- .../securesms/groups/GroupPoller.kt | 75 +++++++++++++------ .../securesms/groups/GroupPollerManager.kt | 4 - .../securesms/util/ThrowableUtil.kt | 21 ++++++ .../org/session/libsession/snode/SnodeAPI.kt | 8 +- .../libsession/snode/model/BatchResponse.kt | 15 ++++ 5 files changed, 92 insertions(+), 31 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt index c4b53df40b..84012a6d1b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt @@ -13,8 +13,6 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope -import org.session.libsession.database.StorageProtocol -import org.session.libsession.messaging.groups.GroupManagerV2 import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveParameters @@ -22,6 +20,7 @@ import org.session.libsession.messaging.messages.Destination import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeClock +import org.session.libsession.snode.model.BatchResponse import org.session.libsession.snode.model.RetrieveMessageResponse import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigMessage @@ -33,6 +32,7 @@ import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode import org.thoughtcrime.securesms.util.AppVisibilityManager +import org.thoughtcrime.securesms.util.getRootCause import java.time.Instant import kotlin.coroutines.cancellation.CancellationException import kotlin.time.Duration.Companion.days @@ -41,8 +41,6 @@ class GroupPoller( scope: CoroutineScope, private val groupId: AccountId, private val configFactoryProtocol: ConfigFactoryProtocol, - private val groupManagerV2: GroupManagerV2, - private val storage: StorageProtocol, private val lokiApiDatabase: LokiAPIDatabaseProtocol, private val clock: SnodeClock, private val appVisibilityManager: AppVisibilityManager, @@ -50,6 +48,7 @@ class GroupPoller( ) { companion object { private const val POLL_INTERVAL = 3_000L + private const val SWARM_FETCH_INTERVAL = 1800_000L // Every 30 minutes private const val TAG = "GroupPoller" } @@ -73,9 +72,16 @@ class GroupPoller( } private class InternalPollState( - var swarmNodes: MutableSet = mutableSetOf(), - var currentSnode: Snode? = null, - ) + // The nodes for current swarm + var swarmNodes: Set = emptySet(), + + // The pool of snodes that are currently being used for polling + val pollPool: MutableSet = hashSetOf() + ) { + fun shouldFetchSwarmNodes(): Boolean { + return swarmNodes.isEmpty() + } + } // A channel to send tokens to trigger a poll private val pollOnceTokens = Channel() @@ -162,20 +168,34 @@ class GroupPoller( val pollStartedAt = Instant.now() var groupExpired: Boolean? = null + var currentSnode: Snode? = null + val result = runCatching { supervisorScope { - // Grab current snode or pick (and remove) a random one from the pool - val snode = pollState.currentSnode ?: run { - if (pollState.swarmNodes.isEmpty()) { - Log.d(TAG, "Fetching swarm nodes for $groupId") - pollState.swarmNodes.addAll(SnodeAPI.fetchSwarmNodes(groupId.hexString)) - } + // Fetch snodes if we don't have any + val swarmNodes = if (pollState.shouldFetchSwarmNodes()) { + Log.d(TAG, "Fetching swarm nodes for $groupId") + val fetched = SnodeAPI.fetchSwarmNodes(groupId.hexString).toSet() + pollState.swarmNodes = fetched + fetched + } else { + pollState.swarmNodes + } - check(pollState.swarmNodes.isNotEmpty()) { "No swarm nodes found" } - pollState.swarmNodes.random().also { - pollState.currentSnode = it - pollState.swarmNodes.remove(it) - } + // Ensure we have at least one snode + check(swarmNodes.isNotEmpty()) { + "No swarm nodes found for $groupId" + } + + // Fill the pool if it's empty + if (pollState.pollPool.isEmpty()) { + pollState.pollPool.addAll(swarmNodes) + } + + // Take a random snode from the pool + val snode = pollState.pollPool.random().also { + pollState.pollPool.remove(it) + currentSnode = it } val groupAuth = @@ -241,7 +261,7 @@ class GroupPoller( Namespace.GROUP_MESSAGES() ).orEmpty() - Log.d(TAG, "Retrieving group message since lastHash = $lastHash") + Log.v(TAG, "Retrieving group($groupId) message since lastHash = $lastHash, snode = ${snode.publicKeySet}") SnodeAPI.sendBatchRequest( snode = snode, @@ -331,9 +351,18 @@ class GroupPoller( val error = result.exceptionOrNull() Log.e(TAG, "Error polling group", error) - if (error !is NonRetryableException && error !is CancellationException) { - // If the error can be retried, reset the current snode so we use another one - pollState.currentSnode = null + // Find if any exception throws in the process has a root cause of a node returning bad response, + // then we will remove this snode from our swarm nodes set + if (error != null && currentSnode != null) { + val badResponse = (sequenceOf(error) + error.suppressedExceptions.asSequence()) + .firstOrNull { err -> + err.getRootCause()?.item?.let { it.isServerError || it.isSnodeNoLongerPartOfSwarm } == true + } + + if (badResponse != null) { + Log.e(TAG, "Group polling failed due to a server error", badResponse) + pollState.swarmNodes -= currentSnode!! + } } } @@ -344,8 +373,6 @@ class GroupPoller( groupExpired = groupExpired ) - Log.d(TAG, "Polling group $groupId result = $pollResult") - return pollResult } diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt index 5a8b9593dc..1780622e3c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPollerManager.kt @@ -54,8 +54,6 @@ import javax.inject.Singleton @Singleton class GroupPollerManager @Inject constructor( configFactory: ConfigFactory, - groupManagerV2: Lazy, - storage: StorageProtocol, lokiApiDatabase: LokiAPIDatabaseProtocol, clock: SnodeClock, preferences: TextSecurePreferences, @@ -113,8 +111,6 @@ class GroupPollerManager @Inject constructor( scope = scope, groupId = groupId, configFactoryProtocol = configFactory, - groupManagerV2 = groupManagerV2.get(), - storage = storage, lokiApiDatabase = lokiApiDatabase, clock = clock, appVisibilityManager = appVisibilityManager, diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt b/app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt new file mode 100644 index 0000000000..fb97eddd54 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/util/ThrowableUtil.kt @@ -0,0 +1,21 @@ +package org.thoughtcrime.securesms.util + +/** + * Walk the cause chain of this throwable. This chain includes itself as the first element. + */ +fun Throwable.causes(): Sequence = sequence { + var current: Throwable? = this@causes + while (current != null) { + yield(current) + current = current.cause + } +} + +/** + * Find out if this throwable as a root cause of the specified type, if so return it. + */ +inline fun Throwable.getRootCause(): E? { + return causes() + .filterIsInstance() + .firstOrNull() +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index f1634d8e2e..c27d1955e1 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -4,6 +4,8 @@ package org.session.libsession.snode import android.os.SystemClock import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.NullNode +import com.fasterxml.jackson.databind.node.TextNode import com.goterl.lazysodium.exceptions.SodiumException import com.goterl.lazysodium.interfaces.GenericHash import com.goterl.lazysodium.interfaces.PwHash @@ -657,14 +659,14 @@ object SnodeAPI { // back through the request's callback. for ((req, resp) in batch.zip(responses.results)) { val result = runCatching { - check(resp.code == 200) { - "Error calling \"${req.request.method}\" with code = ${resp.code}, msg = ${resp.body}" + if (!resp.isSuccessful) { + throw BatchResponse.Error(resp) } JsonUtil.fromJson(resp.body, req.responseType) } - runCatching{ + runCatching { req.callback.send(result) } } diff --git a/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt b/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt index 7bc4308417..723abfc79f 100644 --- a/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt +++ b/libsession/src/main/java/org/session/libsession/snode/model/BatchResponse.kt @@ -13,5 +13,20 @@ data class BatchResponse @JsonCreator constructor( ) { val isSuccessful: Boolean get() = code in 200..299 + + val isServerError: Boolean + get() = code in 500..599 + + val isSnodeNoLongerPartOfSwarm: Boolean + get() = code == 421 + } + + data class Error(val item: Item) + : RuntimeException("Batch request failed with code ${item.code}") { + init { + require(!item.isSuccessful) { + "This response item does not represent an error state" + } + } } } From 3a339b15f94f8ff13c6efb70d4eda92c7930c0c9 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Mon, 3 Mar 2025 17:43:18 +1100 Subject: [PATCH 3/5] Added code to output libSession logs --- libsession-util/src/main/cpp/CMakeLists.txt | 2 + libsession-util/src/main/cpp/logging.cpp | 48 +++++++++++++++++++ libsession-util/src/main/cpp/logging.h | 17 +++++++ .../messenger/libsession_util/util/Logger.kt | 11 +++++ 4 files changed, 78 insertions(+) create mode 100644 libsession-util/src/main/cpp/logging.cpp create mode 100644 libsession-util/src/main/cpp/logging.h create mode 100644 libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Logger.kt diff --git a/libsession-util/src/main/cpp/CMakeLists.txt b/libsession-util/src/main/cpp/CMakeLists.txt index 9f3eee8a5e..97fc9a1817 100644 --- a/libsession-util/src/main/cpp/CMakeLists.txt +++ b/libsession-util/src/main/cpp/CMakeLists.txt @@ -37,6 +37,7 @@ set(SOURCES group_keys.cpp group_info.cpp config_common.cpp + logging.cpp ) add_library( # Sets the name of the library. @@ -65,6 +66,7 @@ find_library( # Sets the name of the path variable. target_link_libraries( # Specifies the target library. session_util PUBLIC + libsession::util libsession::config libsession::crypto libsodium::sodium-internal diff --git a/libsession-util/src/main/cpp/logging.cpp b/libsession-util/src/main/cpp/logging.cpp new file mode 100644 index 0000000000..8362df04ce --- /dev/null +++ b/libsession-util/src/main/cpp/logging.cpp @@ -0,0 +1,48 @@ +#include +#include +#include +#include + +#include "logging.h" +#include "session/logging.hpp" +#include "session/log_level.h" + +#define LOG_TAG "LibSession" + +extern "C" JNIEXPORT void JNICALL +Java_network_loki_messenger_libsession_1util_util_Logger_initLogger(JNIEnv* env, jclass clazz) { + session::add_logger([](std::string_view msg, std::string_view category, session::LogLevel level) { + android_LogPriority prio = ANDROID_LOG_VERBOSE; + + switch (level.level) { + case LOG_LEVEL_TRACE: + prio = ANDROID_LOG_VERBOSE; + break; + + case LOG_LEVEL_DEBUG: + prio = ANDROID_LOG_DEBUG; + break; + + case LOG_LEVEL_INFO: + prio = ANDROID_LOG_INFO; + break; + + case LOG_LEVEL_WARN: + prio = ANDROID_LOG_WARN; + break; + + case LOG_LEVEL_ERROR: + case LOG_LEVEL_CRITICAL: + prio = ANDROID_LOG_ERROR; + break; + + default: + prio = ANDROID_LOG_INFO; + break; + } + + __android_log_print(prio, LOG_TAG, "%.*s [%.*s]", + static_cast(msg.size()), msg.data(), + static_cast(category.size()), category.data()); +}); +} \ No newline at end of file diff --git a/libsession-util/src/main/cpp/logging.h b/libsession-util/src/main/cpp/logging.h new file mode 100644 index 0000000000..c8a67cbda1 --- /dev/null +++ b/libsession-util/src/main/cpp/logging.h @@ -0,0 +1,17 @@ +#ifndef SESSION_ANDROID_LOGGING_H +#define SESSION_ANDROID_LOGGING_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Declaration of the JNI function following the JNI naming convention. +JNIEXPORT void JNICALL Java_network_loki_messenger_libsession_1util_util_Logger_initLogger(JNIEnv* env, jclass clazz); + +#ifdef __cplusplus +} +#endif + +#endif //SESSION_ANDROID_LOGGING_H diff --git a/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Logger.kt b/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Logger.kt new file mode 100644 index 0000000000..afb20e23ee --- /dev/null +++ b/libsession-util/src/main/java/network/loki/messenger/libsession_util/util/Logger.kt @@ -0,0 +1,11 @@ +package network.loki.messenger.libsession_util.util + +object Logger { + + init { + System.loadLibrary("session_util") + } + + @JvmStatic + external fun initLogger() +} \ No newline at end of file From c3c903caed71263cda3dd11bf022d9ec65b402ba Mon Sep 17 00:00:00 2001 From: ThomasSession Date: Tue, 4 Mar 2025 01:45:52 +0200 Subject: [PATCH 4/5] Polling node rotation (#996) * Rethinking poller logic * Reworking poller logic Reworked the poller as the logic wasn't correctly rotating across snodes beyond on exception thrown, which caused issues when landing on a snode with no data which isn't an exception. Moved away from Promises in favour of coroutines * Update libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt Co-authored-by: AL-Session <160798022+AL-Session@users.noreply.github.com> * PR feedback * Using same size as messages for input text in convo * PR feedback --------- Co-authored-by: AL-Session <160798022+AL-Session@users.noreply.github.com> --- .../securesms/ApplicationContext.java | 12 +- .../OptimizedMessageNotifier.java | 8 +- app/src/main/res/layout/view_input_bar.xml | 2 +- .../sending_receiving/pollers/Poller.kt | 334 ++++++++---------- .../org/session/libsession/snode/SnodeAPI.kt | 15 +- 5 files changed, 173 insertions(+), 198 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 3bcf748788..7658f358d6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -344,13 +344,9 @@ public class ApplicationContext extends Application implements DefaultLifecycleO return; } - ThreadUtils.queue(()->{ - if (poller != null) { - poller.setCaughtUp(false); - } - - startPollingIfNeeded(); + startPollingIfNeeded(); + ThreadUtils.queue(()->{ OpenGroupManager.INSTANCE.startPolling(); return Unit.INSTANCE; }); @@ -466,7 +462,9 @@ public class ApplicationContext extends Application implements DefaultLifecycleO private void setUpPollingIfNeeded() { String userPublicKey = textSecurePreferences.getLocalNumber(); if (userPublicKey == null) return; - poller = new Poller(configFactory, storage, lokiAPIDatabase); + if(poller == null) { + poller = new Poller(configFactory, storage, lokiAPIDatabase); + } } public void startPollingIfNeeded() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java index 8bcf43887a..5f712e3210 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java @@ -52,7 +52,7 @@ public class OptimizedMessageNotifier implements MessageNotifier { Poller poller = ApplicationContext.getInstance(context).poller; boolean isCaughtUp = true; if (poller != null) { - isCaughtUp = isCaughtUp && poller.isCaughtUp(); + isCaughtUp = isCaughtUp && !poller.isPolling(); } isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp(); @@ -69,7 +69,7 @@ public class OptimizedMessageNotifier implements MessageNotifier { Poller lokiPoller = ApplicationContext.getInstance(context).poller; boolean isCaughtUp = true; if (lokiPoller != null) { - isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); + isCaughtUp = isCaughtUp && !lokiPoller.isPolling(); } isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp(); @@ -86,7 +86,7 @@ public class OptimizedMessageNotifier implements MessageNotifier { Poller lokiPoller = ApplicationContext.getInstance(context).poller; boolean isCaughtUp = true; if (lokiPoller != null) { - isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); + isCaughtUp = isCaughtUp && !lokiPoller.isPolling(); } isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp(); @@ -103,7 +103,7 @@ public class OptimizedMessageNotifier implements MessageNotifier { Poller lokiPoller = ApplicationContext.getInstance(context).poller; boolean isCaughtUp = true; if (lokiPoller != null) { - isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp(); + isCaughtUp = isCaughtUp && !lokiPoller.isPolling(); } isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp(); diff --git a/app/src/main/res/layout/view_input_bar.xml b/app/src/main/res/layout/view_input_bar.xml index 2b868ece36..4d145a642d 100644 --- a/app/src/main/res/layout/view_input_bar.xml +++ b/app/src/main/res/layout/view_input_bar.xml @@ -49,7 +49,7 @@ android:hint="@string/message" android:textColorHint="?attr/input_bar_text_hint" android:textColor="?input_bar_text_user" - android:textSize="@dimen/small_font_size" /> + android:textSize="@dimen/medium_font_size" /> = mutableSetOf() - var isCaughtUp = false + var scope: CoroutineScope? = null + + var isPolling: Boolean = false // region Settings companion object { @@ -64,25 +72,26 @@ class Poller( // region Public API fun startIfNeeded() { - if (hasStarted) { return } + if (scope != null) { return } + Log.d(TAG, "Started polling.") - hasStarted = true - setUpPolling(RETRY_INTERVAL_MS) + scope = CoroutineScope(Dispatchers.Default) + scope?.launch { + setUpPolling() + } } fun stopIfNeeded() { Log.d(TAG, "Stopped polling.") - hasStarted = false - usedSnodes.clear() + scope?.cancel() + scope = null + isPolling = false } fun retrieveUserProfile() { Log.d(TAG, "Retrieving user profile. for key = $userPublicKey") - SnodeAPI.getSwarm(userPublicKey).bind { - usedSnodes.clear() - deferred().also { exception -> - pollNextSnode(userProfileOnly = true, exception) - }.promise + SnodeAPI.getSwarm(userPublicKey).success { + pollUserProfile(it.random()) }.fail { exception -> Log.e(TAG, "Failed to retrieve user profile.", exception) } @@ -90,51 +99,41 @@ class Poller( // endregion // region Private API - private fun setUpPolling(delay: Long) { - if (!hasStarted) { return; } - val thread = Thread.currentThread() - SnodeAPI.getSwarm(userPublicKey).bind { - usedSnodes.clear() - val deferred = deferred() - pollNextSnode(deferred = deferred) - deferred.promise - }.success { - val nextDelay = if (isCaughtUp) RETRY_INTERVAL_MS else 0 - Timer().schedule(object : TimerTask() { - override fun run() { - thread.run { setUpPolling(RETRY_INTERVAL_MS) } - } - }, nextDelay) - }.fail { - val nextDelay = minOf(MAX_RETRY_INTERVAL_MS, (delay * NEXT_RETRY_MULTIPLIER).toLong()) - Timer().schedule(object : TimerTask() { - override fun run() { - thread.run { setUpPolling(nextDelay) } - } - }, nextDelay) - } - } + private suspend fun setUpPolling() { + val pollPool = hashSetOf() // pollPool is the list of snodes we can use while rotating snodes from our swarm + var retryScalingFactor = 1.0f // We increment the retry interval by NEXT_RETRY_MULTIPLIER times this value, which we bump on each failure - private fun pollNextSnode(userProfileOnly: Boolean = false, deferred: Deferred) { - val swarm = SnodeModule.shared.storage.getSwarm(userPublicKey) ?: setOf() - val unusedSnodes = swarm.subtract(usedSnodes) - if (unusedSnodes.isNotEmpty()) { - val index = SECURE_RANDOM.nextInt(unusedSnodes.size) - val nextSnode = unusedSnodes.elementAt(index) - usedSnodes.add(nextSnode) - Log.d(TAG, "Polling $nextSnode.") - poll(userProfileOnly, nextSnode, deferred).fail { exception -> - if (exception is PromiseCanceledException) { - Log.d(TAG, "Polling $nextSnode canceled.") - } else { - Log.d(TAG, "Polling $nextSnode failed; dropping it and switching to next snode.") - SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, userPublicKey) - pollNextSnode(userProfileOnly, deferred) - } + while(true){ + Log.d(TAG, "Polling...") + + isPolling = true + + // check if the polling pool is empty + if(pollPool.isEmpty()){ + // if it is empty, fill it with the snodes from our swarm + pollPool.addAll(SnodeAPI.getSwarm(userPublicKey).await()) + } + + // randomly get a snode from the pool + val currentNode = pollPool.random() + + // remove that snode from the pool + pollPool.remove(currentNode) + + var pollDelay = RETRY_INTERVAL_MS + try { + poll(currentNode) + retryScalingFactor = 1f + } catch (e: Exception){ + Log.e(TAG, "Error while polling:", e) + pollDelay = minOf(MAX_RETRY_INTERVAL_MS, (RETRY_INTERVAL_MS * (NEXT_RETRY_MULTIPLIER * retryScalingFactor)).toLong()) + retryScalingFactor++ + } finally { + isPolling = false } - } else { - isCaughtUp = true - deferred.resolve() + + // wait before polling again + delay(pollDelay) } } @@ -184,14 +183,8 @@ class Poller( } } - private fun poll(userProfileOnly: Boolean, snode: Snode, deferred: Deferred): Promise { - if (userProfileOnly) { - return pollUserProfile(snode, deferred) - } - return poll(snode, deferred) - } - - private fun pollUserProfile(snode: Snode, deferred: Deferred): Promise = GlobalScope.asyncPromise { + //todo we will need to modify this further to fit within the new coroutine setup (currently used by ApplicationContext which is a java class) + private fun pollUserProfile(snode: Snode) { val requests = mutableListOf() val hashesToExtend = mutableSetOf() val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) @@ -224,22 +217,21 @@ class Poller( if (requests.isNotEmpty()) { SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> - isCaughtUp = true - if (!deferred.promise.isDone()) { - val responseList = (rawResponses["results"] as List) - responseList.getOrNull(0)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") + + val responseList = (rawResponses[KEY_RESULTS] as List) + responseList.getOrNull(0)?.let { rawResponse -> + if (rawResponse[KEY_CODE] as? Int != 200) { + Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"}") + } else { + val body = rawResponse[KEY_BODY] as? RawResponse + if (body == null) { + Log.e(TAG, "Batch sub-request didn't contain a body") } else { - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request didn't contain a body") - } else { - processConfig(snode, body, UserConfigType.USER_PROFILE) - } + processConfig(snode, body, UserConfigType.USER_PROFILE) } } } + Promise.ofSuccess(Unit) }.fail { Log.e(TAG, "Failed to get raw batch response", it) @@ -247,120 +239,102 @@ class Poller( } } + private suspend fun poll(snode: Snode) { + val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) + val requestSparseArray = SparseArray() + // get messages + SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = Namespace.DEFAULT() + ), + auth = userAuth, + maxSize = -2) + .also { personalMessages -> + // namespaces here should always be set + requestSparseArray[personalMessages.namespace!!] = personalMessages + } + // get the latest convo info volatile + val hashesToExtend = mutableSetOf() + configFactory.withUserConfigs { configs -> + UserConfigType + .entries + .map { type -> + val config = configs.getConfig(type) + hashesToExtend += config.currentHashes() + type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest( + lastHash = lokiApiDatabase.getLastMessageHashValue( + snode = snode, + publicKey = userAuth.accountId.hexString, + namespace = type.namespace + ), + auth = userAuth, + namespace = type.namespace, + maxSize = -8 + ) + } + }.forEach { (namespace, request) -> + // namespaces here should always be set + requestSparseArray[namespace] = request + } + + val requests = requestSparseArray.valueIterator().asSequence().toMutableList() - private fun poll(snode: Snode, deferred: Deferred): Promise { - if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) } - return GlobalScope.asyncPromise { - val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) - val requestSparseArray = SparseArray() - // get messages - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode = snode, - publicKey = userAuth.accountId.hexString, - namespace = Namespace.DEFAULT() - ), + if (hashesToExtend.isNotEmpty()) { + SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + messageHashes = hashesToExtend.toList(), auth = userAuth, - maxSize = -2) - .also { personalMessages -> - // namespaces here should always be set - requestSparseArray[personalMessages.namespace!!] = personalMessages - } - // get the latest convo info volatile - val hashesToExtend = mutableSetOf() - configFactory.withUserConfigs { configs -> - UserConfigType - .entries - .map { type -> - val config = configs.getConfig(type) - hashesToExtend += config.currentHashes() - type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode = snode, - publicKey = userAuth.accountId.hexString, - namespace = type.namespace - ), - auth = userAuth, - namespace = type.namespace, - maxSize = -8 - ) - } - }.forEach { (namespace, request) -> - // namespaces here should always be set - requestSparseArray[namespace] = request + newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, + extend = true + ).let { extensionRequest -> + requests += extensionRequest } + } - val requests = - requestSparseArray.valueIterator().asSequence().toMutableList() + if (requests.isNotEmpty()) { + val rawResponses = SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).await() + val responseList = (rawResponses[KEY_RESULTS] as List) + // in case we had null configs, the array won't be fully populated + // index of the sparse array key iterator should be the request index, with the key being the namespace + UserConfigType.entries + .map { type -> type to requestSparseArray.indexOfKey(type.namespace) } + .filter { (_, i) -> i >= 0 } + .forEach { (configType, requestIndex) -> + responseList.getOrNull(requestIndex)?.let { rawResponse -> + if (rawResponse[KEY_CODE] as? Int != 200) { + Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"}") + return@forEach + } + val body = rawResponse[KEY_BODY] as? RawResponse + if (body == null) { + Log.e(TAG, "Batch sub-request didn't contain a body") + return@forEach + } - if (hashesToExtend.isNotEmpty()) { - SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( - messageHashes = hashesToExtend.toList(), - auth = userAuth, - newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, - extend = true - ).let { extensionRequest -> - requests += extensionRequest + processConfig(snode, body, configType) + } } - } - if (requests.isNotEmpty()) { - SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses -> - isCaughtUp = true - if (deferred.promise.isDone()) { - return@bind Promise.ofSuccess(Unit) + // the first response will be the personal messages (we want these to be processed after config messages) + val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT()) + if (personalResponseIndex >= 0) { + responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> + if (rawResponse[KEY_CODE] as? Int != 200) { + // If we got a non-success response then the snode might be bad + throw(RuntimeException("Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"}")) } else { - val responseList = (rawResponses["results"] as List) - // in case we had null configs, the array won't be fully populated - // index of the sparse array key iterator should be the request index, with the key being the namespace - UserConfigType.entries - .map { type -> type to requestSparseArray.indexOfKey(type.namespace) } - .filter { (_, i) -> i >= 0 } - .forEach { (configType, requestIndex) -> - responseList.getOrNull(requestIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - return@forEach - } - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request didn't contain a body") - return@forEach - } - - processConfig(snode, body, configType) - } - } - - // the first response will be the personal messages (we want these to be processed after config messages) - val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT()) - if (personalResponseIndex >= 0) { - responseList.getOrNull(personalResponseIndex)?.let { rawResponse -> - if (rawResponse["code"] as? Int != 200) { - Log.e(TAG, "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}") - // If we got a non-success response then the snode might be bad so we should try rotate - // to a different one just in case - pollNextSnode(deferred = deferred) - return@bind Promise.ofSuccess(Unit) - } else { - val body = rawResponse["body"] as? RawResponse - if (body == null) { - Log.e(TAG, "Batch sub-request for personal messages didn't contain a body") - } else { - processPersonalMessages(snode, body) - } - } - } + val body = rawResponse[KEY_BODY] as? RawResponse + if (body == null) { + throw(RuntimeException("Batch sub-request for personal messages didn't contain a body")) + } else { + processPersonalMessages(snode, body) } - - poll(snode, deferred) } - }.fail { - Log.e(TAG, "Failed to get raw batch response", it) - poll(snode, deferred) } + } else { + throw(SnodeAPI.Error.Generic) } } } - // endregion } diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index c27d1955e1..1484235293 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -96,6 +96,9 @@ object SnodeAPI { private const val snodeFailureThreshold = 3 private const val useOnionRequests = true + const val KEY_BODY = "body" + const val KEY_CODE = "code" + const val KEY_RESULTS = "results" private const val KEY_IP = "public_ip" private const val KEY_PORT = "storage_port" private const val KEY_X25519 = "pubkey_x25519" @@ -577,14 +580,14 @@ object SnodeAPI { parameters, publicKey ).success { rawResponses -> - rawResponses["results"].let { it as List } + rawResponses[KEY_RESULTS].let { it as List } .asSequence() - .filter { it["code"] as? Int != 200 } + .filter { it[KEY_CODE] as? Int != 200 } .forEach { response -> Log.w("Loki", "response code was not 200") handleSnodeError( - response["code"] as? Int ?: 0, - response["body"] as? Map<*, *>, + response[KEY_CODE] as? Int ?: 0, + response[KEY_BODY] as? Map<*, *>, snode, publicKey ) @@ -901,7 +904,7 @@ object SnodeAPI { val deletedMessages = swarms.mapValuesNotNull { (hexSnodePublicKey, rawJSON) -> (rawJSON as? Map)?.let { json -> val isFailed = json["failed"] as? Boolean ?: false - val statusCode = json["code"] as? String + val statusCode = json[KEY_CODE] as? String val reason = json["reason"] as? String if (isFailed) { @@ -1072,7 +1075,7 @@ object SnodeAPI { val json = rawJSON as? Map ?: return@mapValuesNotNull null if (json["failed"] as? Boolean == true) { val reason = json["reason"] as? String - val statusCode = json["code"] as? String + val statusCode = json[KEY_CODE] as? String Log.e("Loki", "Failed to delete all messages from: $hexSnodePublicKey due to error: $reason ($statusCode).") false } else { From fbd1975673e5714fd65c953f791363f4ae7f4c04 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Tue, 4 Mar 2025 17:31:23 +1100 Subject: [PATCH 5/5] Removed unnecessary header file --- libsession-util/src/main/cpp/logging.cpp | 1 - libsession-util/src/main/cpp/logging.h | 17 ----------------- 2 files changed, 18 deletions(-) delete mode 100644 libsession-util/src/main/cpp/logging.h diff --git a/libsession-util/src/main/cpp/logging.cpp b/libsession-util/src/main/cpp/logging.cpp index 8362df04ce..3f39b4ac78 100644 --- a/libsession-util/src/main/cpp/logging.cpp +++ b/libsession-util/src/main/cpp/logging.cpp @@ -3,7 +3,6 @@ #include #include -#include "logging.h" #include "session/logging.hpp" #include "session/log_level.h" diff --git a/libsession-util/src/main/cpp/logging.h b/libsession-util/src/main/cpp/logging.h deleted file mode 100644 index c8a67cbda1..0000000000 --- a/libsession-util/src/main/cpp/logging.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef SESSION_ANDROID_LOGGING_H -#define SESSION_ANDROID_LOGGING_H - -#include - -#ifdef __cplusplus -extern "C" { -#endif - -// Declaration of the JNI function following the JNI naming convention. -JNIEXPORT void JNICALL Java_network_loki_messenger_libsession_1util_util_Logger_initLogger(JNIEnv* env, jclass clazz); - -#ifdef __cplusplus -} -#endif - -#endif //SESSION_ANDROID_LOGGING_H