Polling node rotation (#996)

* Rethinking poller logic

* Reworking poller logic

Reworked the poller as the logic wasn't correctly rotating across snodes beyond on exception thrown, which caused issues when landing on a snode with no data which isn't an exception.
Moved away from Promises in favour of coroutines

* Update libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt

Co-authored-by: AL-Session <160798022+AL-Session@users.noreply.github.com>

* PR feedback

* Using same size as messages for input text in convo

* PR feedback

---------

Co-authored-by: AL-Session <160798022+AL-Session@users.noreply.github.com>
pull/1710/head
ThomasSession 1 month ago committed by GitHub
parent b7ab2c51db
commit c3c903caed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -344,13 +344,9 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
return;
}
ThreadUtils.queue(()->{
if (poller != null) {
poller.setCaughtUp(false);
}
startPollingIfNeeded();
startPollingIfNeeded();
ThreadUtils.queue(()->{
OpenGroupManager.INSTANCE.startPolling();
return Unit.INSTANCE;
});
@ -466,7 +462,9 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
private void setUpPollingIfNeeded() {
String userPublicKey = textSecurePreferences.getLocalNumber();
if (userPublicKey == null) return;
poller = new Poller(configFactory, storage, lokiAPIDatabase);
if(poller == null) {
poller = new Poller(configFactory, storage, lokiAPIDatabase);
}
}
public void startPollingIfNeeded() {

@ -52,7 +52,7 @@ public class OptimizedMessageNotifier implements MessageNotifier {
Poller poller = ApplicationContext.getInstance(context).poller;
boolean isCaughtUp = true;
if (poller != null) {
isCaughtUp = isCaughtUp && poller.isCaughtUp();
isCaughtUp = isCaughtUp && !poller.isPolling();
}
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();
@ -69,7 +69,7 @@ public class OptimizedMessageNotifier implements MessageNotifier {
Poller lokiPoller = ApplicationContext.getInstance(context).poller;
boolean isCaughtUp = true;
if (lokiPoller != null) {
isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp();
isCaughtUp = isCaughtUp && !lokiPoller.isPolling();
}
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();
@ -86,7 +86,7 @@ public class OptimizedMessageNotifier implements MessageNotifier {
Poller lokiPoller = ApplicationContext.getInstance(context).poller;
boolean isCaughtUp = true;
if (lokiPoller != null) {
isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp();
isCaughtUp = isCaughtUp && !lokiPoller.isPolling();
}
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();
@ -103,7 +103,7 @@ public class OptimizedMessageNotifier implements MessageNotifier {
Poller lokiPoller = ApplicationContext.getInstance(context).poller;
boolean isCaughtUp = true;
if (lokiPoller != null) {
isCaughtUp = isCaughtUp && lokiPoller.isCaughtUp();
isCaughtUp = isCaughtUp && !lokiPoller.isPolling();
}
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();

@ -49,7 +49,7 @@
android:hint="@string/message"
android:textColorHint="?attr/input_bar_text_hint"
android:textColor="?input_bar_text_user"
android:textSize="@dimen/small_font_size" />
android:textSize="@dimen/medium_font_size" />
<RelativeLayout
android:id="@+id/microphoneOrSendButtonContainer"

@ -2,6 +2,7 @@ package org.session.libsession.messaging.sending_receiving.pollers
import android.util.SparseArray
import androidx.core.util.valueIterator
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import network.loki.messenger.libsession_util.ConfigBase
@ -13,6 +14,11 @@ import java.util.Timer
import java.util.TimerTask
import kotlin.time.Duration.Companion.days
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
@ -26,8 +32,12 @@ import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeAPI.KEY_BODY
import org.session.libsession.snode.SnodeAPI.KEY_CODE
import org.session.libsession.snode.SnodeAPI.KEY_RESULTS
import org.session.libsession.snode.SnodeModule
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.UserConfigType
@ -40,8 +50,6 @@ import org.session.libsignal.utilities.Util.SECURE_RANDOM
private const val TAG = "Poller"
private class PromiseCanceledException : Exception("Promise canceled.")
class Poller(
private val configFactory: ConfigFactoryProtocol,
private val storage: StorageProtocol,
@ -50,9 +58,9 @@ class Poller(
private val userPublicKey: String
get() = storage.getUserPublicKey().orEmpty()
private var hasStarted: Boolean = false
private val usedSnodes: MutableSet<Snode> = mutableSetOf()
var isCaughtUp = false
var scope: CoroutineScope? = null
var isPolling: Boolean = false
// region Settings
companion object {
@ -64,25 +72,26 @@ class Poller(
// region Public API
fun startIfNeeded() {
if (hasStarted) { return }
if (scope != null) { return }
Log.d(TAG, "Started polling.")
hasStarted = true
setUpPolling(RETRY_INTERVAL_MS)
scope = CoroutineScope(Dispatchers.Default)
scope?.launch {
setUpPolling()
}
}
fun stopIfNeeded() {
Log.d(TAG, "Stopped polling.")
hasStarted = false
usedSnodes.clear()
scope?.cancel()
scope = null
isPolling = false
}
fun retrieveUserProfile() {
Log.d(TAG, "Retrieving user profile. for key = $userPublicKey")
SnodeAPI.getSwarm(userPublicKey).bind {
usedSnodes.clear()
deferred<Unit, Exception>().also { exception ->
pollNextSnode(userProfileOnly = true, exception)
}.promise
SnodeAPI.getSwarm(userPublicKey).success {
pollUserProfile(it.random())
}.fail { exception ->
Log.e(TAG, "Failed to retrieve user profile.", exception)
}
@ -90,51 +99,41 @@ class Poller(
// endregion
// region Private API
private fun setUpPolling(delay: Long) {
if (!hasStarted) { return; }
val thread = Thread.currentThread()
SnodeAPI.getSwarm(userPublicKey).bind {
usedSnodes.clear()
val deferred = deferred<Unit, Exception>()
pollNextSnode(deferred = deferred)
deferred.promise
}.success {
val nextDelay = if (isCaughtUp) RETRY_INTERVAL_MS else 0
Timer().schedule(object : TimerTask() {
override fun run() {
thread.run { setUpPolling(RETRY_INTERVAL_MS) }
}
}, nextDelay)
}.fail {
val nextDelay = minOf(MAX_RETRY_INTERVAL_MS, (delay * NEXT_RETRY_MULTIPLIER).toLong())
Timer().schedule(object : TimerTask() {
override fun run() {
thread.run { setUpPolling(nextDelay) }
}
}, nextDelay)
}
}
private suspend fun setUpPolling() {
val pollPool = hashSetOf<Snode>() // pollPool is the list of snodes we can use while rotating snodes from our swarm
var retryScalingFactor = 1.0f // We increment the retry interval by NEXT_RETRY_MULTIPLIER times this value, which we bump on each failure
private fun pollNextSnode(userProfileOnly: Boolean = false, deferred: Deferred<Unit, Exception>) {
val swarm = SnodeModule.shared.storage.getSwarm(userPublicKey) ?: setOf()
val unusedSnodes = swarm.subtract(usedSnodes)
if (unusedSnodes.isNotEmpty()) {
val index = SECURE_RANDOM.nextInt(unusedSnodes.size)
val nextSnode = unusedSnodes.elementAt(index)
usedSnodes.add(nextSnode)
Log.d(TAG, "Polling $nextSnode.")
poll(userProfileOnly, nextSnode, deferred).fail { exception ->
if (exception is PromiseCanceledException) {
Log.d(TAG, "Polling $nextSnode canceled.")
} else {
Log.d(TAG, "Polling $nextSnode failed; dropping it and switching to next snode.")
SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, userPublicKey)
pollNextSnode(userProfileOnly, deferred)
}
while(true){
Log.d(TAG, "Polling...")
isPolling = true
// check if the polling pool is empty
if(pollPool.isEmpty()){
// if it is empty, fill it with the snodes from our swarm
pollPool.addAll(SnodeAPI.getSwarm(userPublicKey).await())
}
// randomly get a snode from the pool
val currentNode = pollPool.random()
// remove that snode from the pool
pollPool.remove(currentNode)
var pollDelay = RETRY_INTERVAL_MS
try {
poll(currentNode)
retryScalingFactor = 1f
} catch (e: Exception){
Log.e(TAG, "Error while polling:", e)
pollDelay = minOf(MAX_RETRY_INTERVAL_MS, (RETRY_INTERVAL_MS * (NEXT_RETRY_MULTIPLIER * retryScalingFactor)).toLong())
retryScalingFactor++
} finally {
isPolling = false
}
} else {
isCaughtUp = true
deferred.resolve()
// wait before polling again
delay(pollDelay)
}
}
@ -184,14 +183,8 @@ class Poller(
}
}
private fun poll(userProfileOnly: Boolean, snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (userProfileOnly) {
return pollUserProfile(snode, deferred)
}
return poll(snode, deferred)
}
private fun pollUserProfile(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> = GlobalScope.asyncPromise {
//todo we will need to modify this further to fit within the new coroutine setup (currently used by ApplicationContext which is a java class)
private fun pollUserProfile(snode: Snode) {
val requests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
val hashesToExtend = mutableSetOf<String>()
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
@ -224,22 +217,21 @@ class Poller(
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (!deferred.promise.isDone()) {
val responseList = (rawResponses["results"] as List<RawResponse>)
responseList.getOrNull(0)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
val responseList = (rawResponses[KEY_RESULTS] as List<RawResponse>)
responseList.getOrNull(0)?.let { rawResponse ->
if (rawResponse[KEY_CODE] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"}")
} else {
val body = rawResponse[KEY_BODY] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
} else {
processConfig(snode, body, UserConfigType.USER_PROFILE)
}
processConfig(snode, body, UserConfigType.USER_PROFILE)
}
}
}
Promise.ofSuccess(Unit)
}.fail {
Log.e(TAG, "Failed to get raw batch response", it)
@ -247,120 +239,102 @@ class Poller(
}
}
private suspend fun poll(snode: Snode) {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = Namespace.DEFAULT()
),
auth = userAuth,
maxSize = -2)
.also { personalMessages ->
// namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.withUserConfigs { configs ->
UserConfigType
.entries
.map { type ->
val config = configs.getConfig(type)
hashesToExtend += config.currentHashes()
type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = type.namespace
),
auth = userAuth,
namespace = type.namespace,
maxSize = -8
)
}
}.forEach { (namespace, request) ->
// namespaces here should always be set
requestSparseArray[namespace] = request
}
val requests = requestSparseArray.valueIterator().asSequence().toMutableList()
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return GlobalScope.asyncPromise {
val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth)
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = Namespace.DEFAULT()
),
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = userAuth,
maxSize = -2)
.also { personalMessages ->
// namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.withUserConfigs { configs ->
UserConfigType
.entries
.map { type ->
val config = configs.getConfig(type)
hashesToExtend += config.currentHashes()
type.namespace to SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
lastHash = lokiApiDatabase.getLastMessageHashValue(
snode = snode,
publicKey = userAuth.accountId.hexString,
namespace = type.namespace
),
auth = userAuth,
namespace = type.namespace,
maxSize = -8
)
}
}.forEach { (namespace, request) ->
// namespaces here should always be set
requestSparseArray[namespace] = request
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
).let { extensionRequest ->
requests += extensionRequest
}
}
val requests =
requestSparseArray.valueIterator().asSequence().toMutableList()
if (requests.isNotEmpty()) {
val rawResponses = SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).await()
val responseList = (rawResponses[KEY_RESULTS] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
UserConfigType.entries
.map { type -> type to requestSparseArray.indexOfKey(type.namespace) }
.filter { (_, i) -> i >= 0 }
.forEach { (configType, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse[KEY_CODE] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse[KEY_BODY] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
auth = userAuth,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
).let { extensionRequest ->
requests += extensionRequest
processConfig(snode, body, configType)
}
}
}
if (requests.isNotEmpty()) {
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT())
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse[KEY_CODE] as? Int != 200) {
// If we got a non-success response then the snode might be bad
throw(RuntimeException("Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"}"))
} else {
val responseList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
UserConfigType.entries
.map { type -> type to requestSparseArray.indexOfKey(type.namespace) }
.filter { (_, i) -> i >= 0 }
.forEach { (configType, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request didn't contain a body")
return@forEach
}
processConfig(snode, body, configType)
}
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT())
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e(TAG, "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode(deferred = deferred)
return@bind Promise.ofSuccess(Unit)
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e(TAG, "Batch sub-request for personal messages didn't contain a body")
} else {
processPersonalMessages(snode, body)
}
}
}
val body = rawResponse[KEY_BODY] as? RawResponse
if (body == null) {
throw(RuntimeException("Batch sub-request for personal messages didn't contain a body"))
} else {
processPersonalMessages(snode, body)
}
poll(snode, deferred)
}
}.fail {
Log.e(TAG, "Failed to get raw batch response", it)
poll(snode, deferred)
}
} else {
throw(SnodeAPI.Error.Generic)
}
}
}
// endregion
}

@ -96,6 +96,9 @@ object SnodeAPI {
private const val snodeFailureThreshold = 3
private const val useOnionRequests = true
const val KEY_BODY = "body"
const val KEY_CODE = "code"
const val KEY_RESULTS = "results"
private const val KEY_IP = "public_ip"
private const val KEY_PORT = "storage_port"
private const val KEY_X25519 = "pubkey_x25519"
@ -577,14 +580,14 @@ object SnodeAPI {
parameters,
publicKey
).success { rawResponses ->
rawResponses["results"].let { it as List<RawResponse> }
rawResponses[KEY_RESULTS].let { it as List<RawResponse> }
.asSequence()
.filter { it["code"] as? Int != 200 }
.filter { it[KEY_CODE] as? Int != 200 }
.forEach { response ->
Log.w("Loki", "response code was not 200")
handleSnodeError(
response["code"] as? Int ?: 0,
response["body"] as? Map<*, *>,
response[KEY_CODE] as? Int ?: 0,
response[KEY_BODY] as? Map<*, *>,
snode,
publicKey
)
@ -901,7 +904,7 @@ object SnodeAPI {
val deletedMessages = swarms.mapValuesNotNull { (hexSnodePublicKey, rawJSON) ->
(rawJSON as? Map<String, Any>)?.let { json ->
val isFailed = json["failed"] as? Boolean ?: false
val statusCode = json["code"] as? String
val statusCode = json[KEY_CODE] as? String
val reason = json["reason"] as? String
if (isFailed) {
@ -1072,7 +1075,7 @@ object SnodeAPI {
val json = rawJSON as? Map<String, Any> ?: return@mapValuesNotNull null
if (json["failed"] as? Boolean == true) {
val reason = json["reason"] as? String
val statusCode = json["code"] as? String
val statusCode = json[KEY_CODE] as? String
Log.e("Loki", "Failed to delete all messages from: $hexSnodePublicKey due to error: $reason ($statusCode).")
false
} else {

Loading…
Cancel
Save