|
|
|
@ -11,6 +11,7 @@ import kotlinx.coroutines.coroutineScope
|
|
|
|
|
import kotlinx.coroutines.flow.Flow
|
|
|
|
|
import kotlinx.coroutines.flow.asFlow
|
|
|
|
|
import kotlinx.coroutines.flow.debounce
|
|
|
|
|
import kotlinx.coroutines.flow.distinctUntilChanged
|
|
|
|
|
import kotlinx.coroutines.flow.emptyFlow
|
|
|
|
|
import kotlinx.coroutines.flow.filter
|
|
|
|
|
import kotlinx.coroutines.flow.filterIsInstance
|
|
|
|
@ -33,6 +34,7 @@ import org.session.libsession.snode.utilities.await
|
|
|
|
|
import org.session.libsession.utilities.ConfigFactoryProtocol
|
|
|
|
|
import org.session.libsession.utilities.ConfigPushResult
|
|
|
|
|
import org.session.libsession.utilities.ConfigUpdateNotification
|
|
|
|
|
import org.session.libsession.utilities.TextSecurePreferences
|
|
|
|
|
import org.session.libsession.utilities.UserConfigType
|
|
|
|
|
import org.session.libsession.utilities.getGroup
|
|
|
|
|
import org.session.libsignal.utilities.AccountId
|
|
|
|
@ -43,6 +45,7 @@ import org.session.libsignal.utilities.Snode
|
|
|
|
|
import org.session.libsignal.utilities.retryWithUniformInterval
|
|
|
|
|
import org.thoughtcrime.securesms.util.InternetConnectivity
|
|
|
|
|
import javax.inject.Inject
|
|
|
|
|
import kotlin.math.log
|
|
|
|
|
|
|
|
|
|
private const val TAG = "ConfigUploader"
|
|
|
|
|
|
|
|
|
@ -62,6 +65,7 @@ class ConfigUploader @Inject constructor(
|
|
|
|
|
private val storageProtocol: StorageProtocol,
|
|
|
|
|
private val clock: SnodeClock,
|
|
|
|
|
private val internetConnectivity: InternetConnectivity,
|
|
|
|
|
private val textSecurePreferences: TextSecurePreferences,
|
|
|
|
|
) {
|
|
|
|
|
private var job: Job? = null
|
|
|
|
|
|
|
|
|
@ -82,6 +86,11 @@ class ConfigUploader @Inject constructor(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// A flow that emits true when there's a logged in user
|
|
|
|
|
private fun hasLoggedInUser(): Flow<Boolean> = textSecurePreferences.watchLocalNumber()
|
|
|
|
|
.map { it != null }
|
|
|
|
|
.distinctUntilChanged()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@OptIn(DelicateCoroutinesApi::class, FlowPreview::class, ExperimentalCoroutinesApi::class)
|
|
|
|
|
fun start() {
|
|
|
|
@ -92,41 +101,58 @@ class ConfigUploader @Inject constructor(
|
|
|
|
|
// For any of these events, we need to push the user configs:
|
|
|
|
|
// - The onion path has just become available to use
|
|
|
|
|
// - The user configs have been modified
|
|
|
|
|
// Also, these events are only relevant when there's a logged in user
|
|
|
|
|
val job1 = launch {
|
|
|
|
|
merge(
|
|
|
|
|
pathBecomesAvailable(),
|
|
|
|
|
configFactory.configUpdateNotifications
|
|
|
|
|
.filterIsInstance<ConfigUpdateNotification.UserConfigsModified>()
|
|
|
|
|
.debounce(1000L)
|
|
|
|
|
).collect {
|
|
|
|
|
try {
|
|
|
|
|
retryWithUniformInterval {
|
|
|
|
|
pushUserConfigChangesIfNeeded()
|
|
|
|
|
hasLoggedInUser()
|
|
|
|
|
.flatMapLatest { loggedIn ->
|
|
|
|
|
if (loggedIn) {
|
|
|
|
|
merge(
|
|
|
|
|
pathBecomesAvailable(),
|
|
|
|
|
configFactory.configUpdateNotifications
|
|
|
|
|
.filterIsInstance<ConfigUpdateNotification.UserConfigsModified>()
|
|
|
|
|
.debounce(1000L)
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
emptyFlow()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
.collect {
|
|
|
|
|
try {
|
|
|
|
|
retryWithUniformInterval {
|
|
|
|
|
pushUserConfigChangesIfNeeded()
|
|
|
|
|
}
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
Log.e(TAG, "Failed to push user configs", e)
|
|
|
|
|
}
|
|
|
|
|
} catch (e: Exception) {
|
|
|
|
|
Log.e(TAG, "Failed to push user configs", e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val job2 = launch {
|
|
|
|
|
merge(
|
|
|
|
|
// When the onion request path changes, we need to examine all the groups
|
|
|
|
|
// and push the pending configs for them
|
|
|
|
|
pathBecomesAvailable().flatMapLatest {
|
|
|
|
|
configFactory.withUserConfigs { configs -> configs.userGroups.allClosedGroupInfo() }
|
|
|
|
|
.asSequence()
|
|
|
|
|
.filter { !it.destroyed && !it.kicked }
|
|
|
|
|
.map { it.groupAccountId }
|
|
|
|
|
.asFlow()
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// Or, when a group config is updated, we need to push the changes for that group
|
|
|
|
|
configFactory.configUpdateNotifications
|
|
|
|
|
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
|
|
|
|
|
.map { it.groupId }
|
|
|
|
|
.debounce(1000L)
|
|
|
|
|
).collect { groupId ->
|
|
|
|
|
hasLoggedInUser()
|
|
|
|
|
.flatMapLatest { loggedIn ->
|
|
|
|
|
if (loggedIn) {
|
|
|
|
|
merge(
|
|
|
|
|
// When the onion request path changes, we need to examine all the groups
|
|
|
|
|
// and push the pending configs for them
|
|
|
|
|
pathBecomesAvailable().flatMapLatest {
|
|
|
|
|
configFactory.withUserConfigs { configs -> configs.userGroups.allClosedGroupInfo() }
|
|
|
|
|
.asSequence()
|
|
|
|
|
.filter { !it.destroyed && !it.kicked }
|
|
|
|
|
.map { it.groupAccountId }
|
|
|
|
|
.asFlow()
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// Or, when a group config is updated, we need to push the changes for that group
|
|
|
|
|
configFactory.configUpdateNotifications
|
|
|
|
|
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
|
|
|
|
|
.map { it.groupId }
|
|
|
|
|
.debounce(1000L)
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
emptyFlow()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
.collect { groupId ->
|
|
|
|
|
try {
|
|
|
|
|
retryWithUniformInterval {
|
|
|
|
|
pushGroupConfigsChangesIfNeeded(groupId)
|
|
|
|
|