Add a global state management for internet connectivity (#907)

pull/1709/head
SessionHero01 2 months ago committed by GitHub
parent e5e00c4548
commit 75ccd3dd2b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,19 +1,28 @@
package org.thoughtcrime.securesms.configs
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import network.loki.messenger.libsession_util.util.ConfigPush
import org.session.libsession.database.StorageProtocol
import org.session.libsession.database.userAuth
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.OwnedSwarmAuth
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeClock
@ -32,6 +41,7 @@ import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.retryWithUniformInterval
import org.thoughtcrime.securesms.util.InternetConnectivity
import javax.inject.Inject
private const val TAG = "ConfigUploader"
@ -51,43 +61,80 @@ class ConfigUploader @Inject constructor(
private val configFactory: ConfigFactoryProtocol,
private val storageProtocol: StorageProtocol,
private val clock: SnodeClock,
private val internetConnectivity: InternetConnectivity,
) {
private var job: Job? = null
@OptIn(DelicateCoroutinesApi::class, FlowPreview::class)
/**
* A flow that only emits when
* 1. There's internet connection AND,
* 2. The onion path is available
*
* The value pushed doesn't matter as nothing is emitted when the conditions are not met.
*/
@OptIn(ExperimentalCoroutinesApi::class)
private fun pathBecomesAvailable(): Flow<*> = internetConnectivity.networkAvailable
.flatMapLatest { hasNetwork ->
if (hasNetwork) {
OnionRequestAPI.hasPath.filter { it }
} else {
emptyFlow()
}
}
@OptIn(DelicateCoroutinesApi::class, FlowPreview::class, ExperimentalCoroutinesApi::class)
fun start() {
require(job == null) { "Already started" }
job = GlobalScope.launch {
supervisorScope {
// For any of these events, we need to push the user configs:
// - The onion path has just become available to use
// - The user configs have been modified
val job1 = launch {
configFactory.configUpdateNotifications
.filterIsInstance<ConfigUpdateNotification.UserConfigsModified>()
.debounce(1000L)
.collect {
try {
retryWithUniformInterval {
pushUserConfigChangesIfNeeded()
}
} catch (e: Exception) {
Log.e(TAG, "Failed to push user configs", e)
merge(
pathBecomesAvailable(),
configFactory.configUpdateNotifications
.filterIsInstance<ConfigUpdateNotification.UserConfigsModified>()
.debounce(1000L)
).collect {
try {
retryWithUniformInterval {
pushUserConfigChangesIfNeeded()
}
} catch (e: Exception) {
Log.e(TAG, "Failed to push user configs", e)
}
}
}
val job2 = launch {
configFactory.configUpdateNotifications
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
.debounce(1000L)
.collect { changes ->
try {
retryWithUniformInterval {
pushGroupConfigsChangesIfNeeded(changes.groupId)
}
} catch (e: Exception) {
Log.e(TAG, "Failed to push group configs", e)
merge(
// When the onion request path changes, we need to examine all the groups
// and push the pending configs for them
pathBecomesAvailable().flatMapLatest {
configFactory.withUserConfigs { configs -> configs.userGroups.allClosedGroupInfo() }
.asSequence()
.filter { !it.destroyed && !it.kicked }
.map { it.groupAccountId }
.asFlow()
},
// Or, when a group config is updated, we need to push the changes for that group
configFactory.configUpdateNotifications
.filterIsInstance<ConfigUpdateNotification.GroupConfigsUpdated>()
.map { it.groupId }
.debounce(1000L)
).collect { groupId ->
try {
retryWithUniformInterval {
pushGroupConfigsChangesIfNeeded(groupId)
}
} catch (e: Exception) {
Log.e(TAG, "Failed to push group configs", e)
}
}
}
job1.join()

@ -190,7 +190,6 @@ import org.thoughtcrime.securesms.showSessionDialog
import org.thoughtcrime.securesms.util.ActivityDispatcher
import org.thoughtcrime.securesms.util.DateUtils
import org.thoughtcrime.securesms.util.MediaUtil
import org.thoughtcrime.securesms.util.NetworkUtils
import org.thoughtcrime.securesms.util.PaddedImageSpan
import org.thoughtcrime.securesms.util.SaveAttachmentTask
import org.thoughtcrime.securesms.util.drawToBitmap

@ -962,8 +962,7 @@ class GroupManagerV2Impl @Inject constructor(
sentTimestamp = timestamp
}
MessageSender.send(message, Destination.ClosedGroup(groupId.hexString), false)
.await()
MessageSender.send(message, Address.fromSerialized(groupId.hexString))
storage.insertGroupInfoChange(message, groupId)
}

@ -103,7 +103,7 @@ import org.thoughtcrime.securesms.ui.theme.PreviewTheme
import org.thoughtcrime.securesms.ui.theme.SessionColorsParameterProvider
import org.thoughtcrime.securesms.ui.theme.ThemeColors
import org.thoughtcrime.securesms.ui.theme.dangerButtonColors
import org.thoughtcrime.securesms.util.NetworkUtils
import org.thoughtcrime.securesms.util.InternetConnectivity
import org.thoughtcrime.securesms.util.push
import java.io.File
import javax.inject.Inject
@ -307,7 +307,7 @@ class SettingsActivity : PassphraseRequiredActionBarActivity() {
// We'll assume we fail & flip the flag on success
var updateWasSuccessful = false
val haveNetworkConnection = NetworkUtils.haveValidNetworkConnection(this@SettingsActivity);
val haveNetworkConnection = viewModel.hasNetworkConnection()
if (!haveNetworkConnection) {
Log.w(TAG, "Cannot update display name - no network connection.")
} else {

@ -36,7 +36,7 @@ import org.thoughtcrime.securesms.preferences.SettingsViewModel.AvatarDialogStat
import org.thoughtcrime.securesms.profiles.ProfileMediaConstraints
import org.thoughtcrime.securesms.util.BitmapDecodingException
import org.thoughtcrime.securesms.util.BitmapUtil
import org.thoughtcrime.securesms.util.NetworkUtils
import org.thoughtcrime.securesms.util.InternetConnectivity
import java.io.File
import java.io.IOException
import javax.inject.Inject
@ -45,7 +45,8 @@ import javax.inject.Inject
class SettingsViewModel @Inject constructor(
@ApplicationContext private val context: Context,
private val prefs: TextSecurePreferences,
private val configFactory: ConfigFactory
private val configFactory: ConfigFactory,
private val connectivity: InternetConnectivity,
) : ViewModel() {
private val TAG = "SettingsViewModel"
@ -156,8 +157,7 @@ class SettingsViewModel @Inject constructor(
val tempAvatar = (avatarDialogState.value as? TempAvatar)?.data
?: return Toast.makeText(context, R.string.profileErrorUpdate, Toast.LENGTH_LONG).show()
val haveNetworkConnection = NetworkUtils.haveValidNetworkConnection(context);
if (!haveNetworkConnection) {
if (!hasNetworkConnection()) {
Log.w(TAG, "Cannot update profile picture - no network connection.")
Toast.makeText(context, R.string.profileErrorUpdate, Toast.LENGTH_LONG).show()
return
@ -173,7 +173,7 @@ class SettingsViewModel @Inject constructor(
fun removeAvatar() {
val haveNetworkConnection = NetworkUtils.haveValidNetworkConnection(context);
val haveNetworkConnection = connectivity.networkAvailable.value
if (!haveNetworkConnection) {
Log.w(TAG, "Cannot remove profile picture - no network connection.")
Toast.makeText(context, R.string.profileDisplayPictureRemoveError, Toast.LENGTH_LONG).show()
@ -257,6 +257,8 @@ class SettingsViewModel @Inject constructor(
_recoveryHidden.update { true }
}
fun hasNetworkConnection(): Boolean = connectivity.networkAvailable.value
sealed class AvatarDialogState() {
object NoAvatar : AvatarDialogState()
data class UserAvatar(val address: Address) : AvatarDialogState()

@ -19,6 +19,9 @@ import androidx.lifecycle.LifecycleService
import androidx.lifecycle.lifecycleScope
import androidx.localbroadcastmanager.content.LocalBroadcastManager
import dagger.hilt.android.AndroidEntryPoint
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
import java.util.UUID
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
@ -40,11 +43,11 @@ import org.thoughtcrime.securesms.util.CallNotificationBuilder.Companion.TYPE_IN
import org.thoughtcrime.securesms.util.CallNotificationBuilder.Companion.TYPE_INCOMING_RINGING
import org.thoughtcrime.securesms.util.CallNotificationBuilder.Companion.TYPE_OUTGOING_RINGING
import org.thoughtcrime.securesms.util.CallNotificationBuilder.Companion.WEBRTC_NOTIFICATION
import org.thoughtcrime.securesms.util.InternetConnectivity
import org.thoughtcrime.securesms.webrtc.AudioManagerCommand
import org.thoughtcrime.securesms.webrtc.CallManager
import org.thoughtcrime.securesms.webrtc.CallViewModel
import org.thoughtcrime.securesms.webrtc.IncomingPstnCallReceiver
import org.thoughtcrime.securesms.webrtc.NetworkChangeReceiver
import org.thoughtcrime.securesms.webrtc.PeerConnectionException
import org.thoughtcrime.securesms.webrtc.PowerButtonReceiver
import org.thoughtcrime.securesms.webrtc.ProximityLockRelease
@ -215,6 +218,9 @@ class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener {
@Inject
lateinit var callManager: CallManager
@Inject
lateinit var internetConnectivity: InternetConnectivity
private var wantsToAnswer = false
private var currentTimeouts = 0
private var isNetworkAvailable = true
@ -229,7 +235,6 @@ class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener {
ContextCompat.startForegroundService(this, hangupIntent(this))
}
private var networkChangedReceiver: NetworkChangeReceiver? = null
private var callReceiver: IncomingPstnCallReceiver? = null
private var wantsToAnswerReceiver: BroadcastReceiver? = null
private var wiredHeadsetStateReceiver: WiredHeadsetStateReceiver? = null
@ -328,8 +333,10 @@ class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener {
telephonyHandler.register(getSystemService(TelephonyManager::class.java))
}
registerUncaughtExceptionHandler()
networkChangedReceiver = NetworkChangeReceiver(::networkChange)
networkChangedReceiver!!.register(this)
GlobalScope.launch {
internetConnectivity.networkAvailable.collectLatest(::networkChange)
}
}
private fun registerUncaughtExceptionHandler() {
@ -813,14 +820,12 @@ class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener {
}
wiredHeadsetStateReceiver?.let(::unregisterReceiver)
powerButtonReceiver?.let(::unregisterReceiver)
networkChangedReceiver?.unregister(this)
wantsToAnswerReceiver?.let { receiver ->
LocalBroadcastManager.getInstance(this).unregisterReceiver(receiver)
}
callManager.shutDownAudioManager()
powerButtonReceiver = null
wiredHeadsetStateReceiver = null
networkChangedReceiver = null
callReceiver = null
uncaughtExceptionHandlerManager?.unregister()
wantsToAnswer = false

@ -0,0 +1,80 @@
package org.thoughtcrime.securesms.util
import android.app.Application
import android.content.Context
import android.content.Context.CONNECTIVITY_SERVICE
import android.net.ConnectivityManager
import android.net.ConnectivityManager.NetworkCallback
import android.net.Network
import android.net.NetworkCapabilities
import android.net.NetworkRequest
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.stateIn
import javax.inject.Inject
import javax.inject.Singleton
/**
* Provides a flow that emits `true` when the device has internet connectivity and `false` otherwise.
*/
@Singleton
class InternetConnectivity @Inject constructor(application: Application) {
val networkAvailable = callbackFlow {
val connectivityManager = application.getSystemService(ConnectivityManager::class.java)
val callback = object : NetworkCallback() {
override fun onCapabilitiesChanged(
network: Network,
networkCapabilities: NetworkCapabilities
) {
trySend(networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET))
}
override fun onAvailable(network: Network) {
trySend(true)
}
override fun onLost(network: Network) {
trySend(false)
}
}
connectivityManager.registerNetworkCallback(
NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.build(),
callback
)
awaitClose {
connectivityManager.unregisterNetworkCallback(callback)
}
}.stateIn(
scope = GlobalScope,
started = SharingStarted.WhileSubscribed(),
initialValue = haveValidNetworkConnection(application)
)
companion object {
// Method to determine if we have a valid Internet connection or not
private fun haveValidNetworkConnection(context: Context): Boolean {
val cm = context.getSystemService(CONNECTIVITY_SERVICE) as ConnectivityManager
// Early exit if we have no active network..
val activeNetwork = cm.activeNetwork ?: return false
// ..otherwise determine what capabilities are available to the active network.
val networkCapabilities = cm.getNetworkCapabilities(activeNetwork)
val internetConnectionValid = networkCapabilities != null &&
networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
return internetConnectionValid
}
}
}

@ -1,28 +0,0 @@
package org.thoughtcrime.securesms.util
import android.content.Context
import android.content.Context.CONNECTIVITY_SERVICE
import android.net.ConnectivityManager
import android.net.NetworkCapabilities
class NetworkUtils {
companion object {
// Method to determine if we have a valid Internet connection or not
fun haveValidNetworkConnection(context: Context) : Boolean {
val cm = context.getSystemService(CONNECTIVITY_SERVICE) as ConnectivityManager
// Early exit if we have no active network..
if (cm.activeNetwork == null) return false
// ..otherwise determine what capabilities are available to the active network.
val networkCapabilities = cm.getNetworkCapabilities(cm.activeNetwork)
val internetConnectionValid = cm.activeNetwork != null &&
networkCapabilities != null &&
networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
return internetConnectionValid
}
}
}

@ -1,71 +0,0 @@
package org.thoughtcrime.securesms.webrtc
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.net.ConnectivityManager
import android.net.Network
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.util.NetworkUtils
class NetworkChangeReceiver(private val onNetworkChangedCallback: (Boolean)->Unit) {
private val networkList: MutableSet<Network> = mutableSetOf()
private val broadcastDelegate = object: BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
receiveBroadcast(context, intent)
}
}
val defaultObserver = object: ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
Log.i("Loki", "onAvailable: $network")
networkList += network
onNetworkChangedCallback(networkList.isNotEmpty())
}
override fun onLosing(network: Network, maxMsToLive: Int) {
Log.i("Loki", "onLosing: $network, maxMsToLive: $maxMsToLive")
}
override fun onLost(network: Network) {
Log.i("Loki", "onLost: $network")
networkList -= network
onNetworkChangedCallback(networkList.isNotEmpty())
}
override fun onUnavailable() {
Log.i("Loki", "onUnavailable")
}
}
fun receiveBroadcast(context: Context, intent: Intent) {
val connected = NetworkUtils.haveValidNetworkConnection(context)
Log.i("Loki", "received broadcast, network connected: $connected")
onNetworkChangedCallback(connected)
}
fun register(context: Context) {
val intentFilter = IntentFilter("android.net.conn.CONNECTIVITY_CHANGE")
context.registerReceiver(broadcastDelegate, intentFilter)
// if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
// val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
// cm.registerDefaultNetworkCallback(defaultObserver)
// } else {
//
// }
}
fun unregister(context: Context) {
context.unregisterReceiver(broadcastDelegate)
// if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
// val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
// cm.unregisterNetworkCallback(defaultObserver)
// } else {
//
// }
}
}
Loading…
Cancel
Save