@ -2,6 +2,7 @@ package org.session.libsession.messaging.sending_receiving.pollers
import android.util.SparseArray
import androidx.core.util.valueIterator
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import network.loki.messenger.libsession_util.ConfigBase
@ -13,6 +14,11 @@ import java.util.Timer
import java.util.TimerTask
import kotlin.time.Duration.Companion.days
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
@ -26,8 +32,12 @@ import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeAPI.KEY_BODY
import org.session.libsession.snode.SnodeAPI.KEY_CODE
import org.session.libsession.snode.SnodeAPI.KEY_RESULTS
import org.session.libsession.snode.SnodeModule
import org.session.libsession.snode.utilities.asyncPromise
import org.session.libsession.snode.utilities.await
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsession.utilities.ConfigMessage
import org.session.libsession.utilities.UserConfigType
@ -40,8 +50,6 @@ import org.session.libsignal.utilities.Util.SECURE_RANDOM
private const val TAG = " Poller "
private class PromiseCanceledException : Exception ( " Promise canceled. " )
class Poller (
private val configFactory : ConfigFactoryProtocol ,
private val storage : StorageProtocol ,
@ -50,9 +58,9 @@ class Poller(
private val userPublicKey : String
get ( ) = storage . getUserPublicKey ( ) . orEmpty ( )
private var hasStarted : Boolean = false
private val usedSnodes : MutableSet < Snode > = mutableSetOf ( )
var is CaughtUp = false
var scope : CoroutineScope ? = null
var is Polling: Boolean = false
// region Settings
companion object {
@ -64,25 +72,26 @@ class Poller(
// region Public API
fun startIfNeeded ( ) {
if ( hasStarted ) { return }
if ( scope != null ) { return }
Log . d ( TAG , " Started polling. " )
hasStarted = true
setUpPolling ( RETRY _INTERVAL _MS )
scope = CoroutineScope ( Dispatchers . Default )
scope ?. launch {
setUpPolling ( )
}
}
fun stopIfNeeded ( ) {
Log . d ( TAG , " Stopped polling. " )
hasStarted = false
usedSnodes . clear ( )
scope ?. cancel ( )
scope = null
isPolling = false
}
fun retrieveUserProfile ( ) {
Log . d ( TAG , " Retrieving user profile. for key = $userPublicKey " )
SnodeAPI . getSwarm ( userPublicKey ) . bind {
usedSnodes . clear ( )
deferred < Unit , Exception > ( ) . also { exception ->
pollNextSnode ( userProfileOnly = true , exception )
} . promise
SnodeAPI . getSwarm ( userPublicKey ) . success {
pollUserProfile ( it . random ( ) )
} . fail { exception ->
Log . e ( TAG , " Failed to retrieve user profile. " , exception )
}
@ -90,51 +99,41 @@ class Poller(
// endregion
// region Private API
private fun setUpPolling ( delay : Long ) {
if ( ! hasStarted ) { return ; }
val thread = Thread . currentThread ( )
SnodeAPI . getSwarm ( userPublicKey ) . bind {
usedSnodes . clear ( )
val deferred = deferred < Unit , Exception > ( )
pollNextSnode ( deferred = deferred )
deferred . promise
} . success {
val nextDelay = if ( isCaughtUp ) RETRY _INTERVAL _MS else 0
Timer ( ) . schedule ( object : TimerTask ( ) {
override fun run ( ) {
thread . run { setUpPolling ( RETRY _INTERVAL _MS ) }
}
} , nextDelay )
} . fail {
val nextDelay = minOf ( MAX _RETRY _INTERVAL _MS , ( delay * NEXT _RETRY _MULTIPLIER ) . toLong ( ) )
Timer ( ) . schedule ( object : TimerTask ( ) {
override fun run ( ) {
thread . run { setUpPolling ( nextDelay ) }
}
} , nextDelay )
}
}
private suspend fun setUpPolling ( ) {
val pollPool = hashSetOf < Snode > ( ) // pollPool is the list of snodes we can use while rotating snodes from our swarm
var retryScalingFactor = 1.0f // We increment the retry interval by NEXT_RETRY_MULTIPLIER times this value, which we bump on each failure
private fun pollNextSnode ( userProfileOnly : Boolean = false , deferred : Deferred < Unit , Exception > ) {
val swarm = SnodeModule . shared . storage . getSwarm ( userPublicKey ) ?: setOf ( )
val unusedSnodes = swarm . subtract ( usedSnodes )
if ( unusedSnodes . isNotEmpty ( ) ) {
val index = SECURE_RANDOM . nextInt ( unusedSnodes . size )
val nextSnode = unusedSnodes . elementAt ( index )
usedSnodes . add ( nextSnode )
Log . d ( TAG , " Polling $nextSnode . " )
poll ( userProfileOnly , nextSnode , deferred ) . fail { exception ->
if ( exception is PromiseCanceledException ) {
Log . d ( TAG , " Polling $nextSnode canceled. " )
} else {
Log . d ( TAG , " Polling $nextSnode failed; dropping it and switching to next snode. " )
SnodeAPI . dropSnodeFromSwarmIfNeeded ( nextSnode , userPublicKey )
pollNextSnode ( userProfileOnly , deferred )
}
while ( true ) {
Log . d ( TAG , " Polling... " )
isPolling = true
// check if the polling pool is empty
if ( pollPool . isEmpty ( ) ) {
// if it is empty, fill it with the snodes from our swarm
pollPool . addAll ( SnodeAPI . getSwarm ( userPublicKey ) . await ( ) )
}
// randomly get a snode from the pool
val currentNode = pollPool . random ( )
// remove that snode from the pool
pollPool . remove ( currentNode )
var pollDelay = RETRY _INTERVAL _MS
try {
poll ( currentNode )
retryScalingFactor = 1f
} catch ( e : Exception ) {
Log . e ( TAG , " Error while polling: " , e )
pollDelay = minOf ( MAX _RETRY _INTERVAL _MS , ( RETRY _INTERVAL _MS * ( NEXT _RETRY _MULTIPLIER * retryScalingFactor ) ) . toLong ( ) )
retryScalingFactor ++
} finally {
isPolling = false
}
} else {
isCaughtUp = true
deferred . resolve ( )
// wait before polling again
de lay( pollDelay )
}
}
@ -184,14 +183,8 @@ class Poller(
}
}
private fun poll ( userProfileOnly : Boolean , snode : Snode , deferred : Deferred < Unit , Exception > ) : Promise < Unit , Exception > {
if ( userProfileOnly ) {
return pollUserProfile ( snode , deferred )
}
return poll ( snode , deferred )
}
private fun pollUserProfile ( snode : Snode , deferred : Deferred < Unit , Exception > ) : Promise < Unit , Exception > = GlobalScope . asyncPromise {
//todo we will need to modify this further to fit within the new coroutine setup (currently used by ApplicationContext which is a java class)
private fun pollUserProfile ( snode : Snode ) {
val requests = mutableListOf < SnodeAPI . SnodeBatchRequestInfo > ( )
val hashesToExtend = mutableSetOf < String > ( )
val userAuth = requireNotNull ( MessagingModuleConfiguration . shared . storage . userAuth )
@ -224,22 +217,21 @@ class Poller(
if ( requests . isNotEmpty ( ) ) {
SnodeAPI . getRawBatchResponse ( snode , userPublicKey , requests ) . bind { rawResponses ->
isCaughtUp = true
if ( ! deferred . promise . isDone ( ) ) {
val responseList = ( rawResponses [ " results " ] as List < RawResponse > )
responseList . getOrNull ( 0 ) ?. let { rawResponse ->
if ( rawResponse [ " code " ] as ? Int != 200 ) {
Log . e ( TAG , " Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"} " )
val responseList = ( rawResponses [ KEY _RESULTS ] as List < RawResponse > )
responseList . getOrNull ( 0 ) ?. let { rawResponse ->
if ( rawResponse [ KEY _CODE ] as ? Int != 200 ) {
Log . e ( TAG , " Batch sub-request had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"} " )
} else {
val body = rawResponse [ KEY _BODY ] as ? RawResponse
if ( body == null ) {
Log . e ( TAG , " Batch sub-request didn't contain a body " )
} else {
val body = rawResponse [ " body " ] as ? RawResponse
if ( body == null ) {
Log . e ( TAG , " Batch sub-request didn't contain a body " )
} else {
processConfig ( snode , body , UserConfigType . USER _PROFILE )
}
processConfig ( snode , body , UserConfigType . USER _PROFILE )
}
}
}
Promise . ofSuccess ( Unit )
} . fail {
Log . e ( TAG , " Failed to get raw batch response " , it )
@ -247,120 +239,102 @@ class Poller(
}
}
private suspend fun poll ( snode : Snode ) {
val userAuth = requireNotNull ( MessagingModuleConfiguration . shared . storage . userAuth )
val requestSparseArray = SparseArray < SnodeAPI . SnodeBatchRequestInfo > ( )
// get messages
SnodeAPI . buildAuthenticatedRetrieveBatchRequest (
lastHash = lokiApiDatabase . getLastMessageHashValue (
snode = snode ,
publicKey = userAuth . accountId . hexString ,
namespace = Namespace . DEFAULT ( )
) ,
auth = userAuth ,
maxSize = - 2 )
. also { personalMessages ->
// namespaces here should always be set
requestSparseArray [ personalMessages . namespace !! ] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf < String > ( )
configFactory . withUserConfigs { configs ->
UserConfigType
. entries
. map { type ->
val config = configs . getConfig ( type )
hashesToExtend += config . currentHashes ( )
type . namespace to SnodeAPI . buildAuthenticatedRetrieveBatchRequest (
lastHash = lokiApiDatabase . getLastMessageHashValue (
snode = snode ,
publicKey = userAuth . accountId . hexString ,
namespace = type . namespace
) ,
auth = userAuth ,
namespace = type . namespace ,
maxSize = - 8
)
}
} . forEach { ( namespace , request ) ->
// namespaces here should always be set
requestSparseArray [ namespace ] = request
}
val requests = requestSparseArray . valueIterator ( ) . asSequence ( ) . toMutableList ( )
private fun poll ( snode : Snode , deferred : Deferred < Unit , Exception > ) : Promise < Unit , Exception > {
if ( ! hasStarted ) { return Promise . ofFail ( PromiseCanceledException ( ) ) }
return GlobalScope . asyncPromise {
val userAuth = requireNotNull ( MessagingModuleConfiguration . shared . storage . userAuth )
val requestSparseArray = SparseArray < SnodeAPI . SnodeBatchRequestInfo > ( )
// get messages
SnodeAPI . buildAuthenticatedRetrieveBatchRequest (
lastHash = lokiApiDatabase . getLastMessageHashValue (
snode = snode ,
publicKey = userAuth . accountId . hexString ,
namespace = Namespace . DEFAULT ( )
) ,
if ( hashesToExtend . isNotEmpty ( ) ) {
SnodeAPI . buildAuthenticatedAlterTtlBatchRequest (
messageHashes = hashesToExtend . toList ( ) ,
auth = userAuth ,
maxSize = - 2 )
. also { personalMessages ->
// namespaces here should always be set
requestSparseArray [ personalMessages . namespace !! ] = personalMessages
}
// get the latest convo info volatile
val hashesToExtend = mutableSetOf < String > ( )
configFactory . withUserConfigs { configs ->
UserConfigType
. entries
. map { type ->
val config = configs . getConfig ( type )
hashesToExtend += config . currentHashes ( )
type . namespace to SnodeAPI . buildAuthenticatedRetrieveBatchRequest (
lastHash = lokiApiDatabase . getLastMessageHashValue (
snode = snode ,
publicKey = userAuth . accountId . hexString ,
namespace = type . namespace
) ,
auth = userAuth ,
namespace = type . namespace ,
maxSize = - 8
)
}
} . forEach { ( namespace , request ) ->
// namespaces here should always be set
requestSparseArray [ namespace ] = request
newExpiry = SnodeAPI . nowWithOffset + 14. days . inWholeMilliseconds ,
extend = true
) . let { extensionRequest ->
requests += extensionRequest
}
}
val requests =
requestSparseArray . valueIterator ( ) . asSequence ( ) . toMutableList ( )
if ( requests . isNotEmpty ( ) ) {
val rawResponses = SnodeAPI . getRawBatchResponse ( snode , userPublicKey , requests ) . await ( )
val responseList = ( rawResponses [ KEY _RESULTS ] as List < RawResponse > )
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
UserConfigType . entries
. map { type -> type to requestSparseArray . indexOfKey ( type . namespace ) }
. filter { ( _ , i ) -> i >= 0 }
. forEach { ( configType , requestIndex ) ->
responseList . getOrNull ( requestIndex ) ?. let { rawResponse ->
if ( rawResponse [ KEY _CODE ] as ? Int != 200 ) {
Log . e ( TAG , " Batch sub-request had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"} " )
return @forEach
}
val body = rawResponse [ KEY _BODY ] as ? RawResponse
if ( body == null ) {
Log . e ( TAG , " Batch sub-request didn't contain a body " )
return @forEach
}
if ( hashesToExtend . isNotEmpty ( ) ) {
SnodeAPI . buildAuthenticatedAlterTtlBatchRequest (
messageHashes = hashesToExtend . toList ( ) ,
auth = userAuth ,
newExpiry = SnodeAPI . nowWithOffset + 14. days . inWholeMilliseconds ,
extend = true
) . let { extensionRequest ->
requests += extensionRequest
processConfig ( snode , body , configType )
}
}
}
if ( requests . isNotEmpty ( ) ) {
SnodeAPI . getRawBatchResponse ( snode , userPublicKey , requests ) . bind { rawResponses ->
isCaughtUp = true
if ( deferred . promise . isDone ( ) ) {
return @bind Promise . ofSuccess ( Unit )
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray . indexOfKey ( Namespace . DEFAULT ( ) )
if ( personalResponseIndex >= 0 ) {
responseList . getOrNull ( personalResponseIndex ) ?. let { rawResponse ->
if ( rawResponse [ KEY _CODE ] as ? Int != 200 ) {
// If we got a non-success response then the snode might be bad
throw ( RuntimeException ( " Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse[KEY_CODE] as? Int) ?: "[unknown]"} " ) )
} else {
val responseList = ( rawResponses [ " results " ] as List < RawResponse > )
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
UserConfigType . entries
. map { type -> type to requestSparseArray . indexOfKey ( type . namespace ) }
. filter { ( _ , i ) -> i >= 0 }
. forEach { ( configType , requestIndex ) ->
responseList . getOrNull ( requestIndex ) ?. let { rawResponse ->
if ( rawResponse [ " code " ] as ? Int != 200 ) {
Log . e ( TAG , " Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"} " )
return @forEach
}
val body = rawResponse [ " body " ] as ? RawResponse
if ( body == null ) {
Log . e ( TAG , " Batch sub-request didn't contain a body " )
return @forEach
}
processConfig ( snode , body , configType )
}
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray . indexOfKey ( Namespace . DEFAULT ( ) )
if ( personalResponseIndex >= 0 ) {
responseList . getOrNull ( personalResponseIndex ) ?. let { rawResponse ->
if ( rawResponse [ " code " ] as ? Int != 200 ) {
Log . e ( TAG , " Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"} " )
// If we got a non-success response then the snode might be bad so we should try rotate
// to a different one just in case
pollNextSnode ( deferred = deferred )
return @bind Promise . ofSuccess ( Unit )
} else {
val body = rawResponse [ " body " ] as ? RawResponse
if ( body == null ) {
Log . e ( TAG , " Batch sub-request for personal messages didn't contain a body " )
} else {
processPersonalMessages ( snode , body )
}
}
}
val body = rawResponse [ KEY _BODY ] as ? RawResponse
if ( body == null ) {
throw ( RuntimeException ( " Batch sub-request for personal messages didn't contain a body " ) )
} else {
processPersonalMessages ( snode , body )
}
poll ( snode , deferred )
}
} . fail {
Log . e ( TAG , " Failed to get raw batch response " , it )
poll ( snode , deferred )
}
} else {
throw ( SnodeAPI . Error . Generic )
}
}
}
// endregion
}