Merge remote-tracking branch 'upstream/dev' into SES-2009-blinded-conversation
commit
f16735d4ee
@ -1,71 +1,88 @@
|
||||
package org.thoughtcrime.securesms.home
|
||||
|
||||
import android.content.ContentResolver
|
||||
import android.content.Context
|
||||
import androidx.lifecycle.LiveData
|
||||
import androidx.lifecycle.MutableLiveData
|
||||
import androidx.lifecycle.ViewModel
|
||||
import androidx.lifecycle.asFlow
|
||||
import androidx.lifecycle.viewModelScope
|
||||
import app.cash.copper.flow.observeQuery
|
||||
import dagger.hilt.android.lifecycle.HiltViewModel
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.mapLatest
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.coroutines.flow.onStart
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.thoughtcrime.securesms.ApplicationContext
|
||||
import org.thoughtcrime.securesms.database.DatabaseContentProviders
|
||||
import org.thoughtcrime.securesms.database.ThreadDatabase
|
||||
import org.thoughtcrime.securesms.database.model.ThreadRecord
|
||||
import java.lang.ref.WeakReference
|
||||
import org.thoughtcrime.securesms.util.observeChanges
|
||||
import javax.inject.Inject
|
||||
import dagger.hilt.android.qualifiers.ApplicationContext as ApplicationContextQualifier
|
||||
|
||||
@HiltViewModel
|
||||
class HomeViewModel @Inject constructor(private val threadDb: ThreadDatabase): ViewModel() {
|
||||
class HomeViewModel @Inject constructor(
|
||||
private val threadDb: ThreadDatabase,
|
||||
private val contentResolver: ContentResolver,
|
||||
@ApplicationContextQualifier private val context: Context,
|
||||
) : ViewModel() {
|
||||
// SharedFlow that emits whenever the user asks us to reload the conversation
|
||||
private val manualReloadTrigger = MutableSharedFlow<Unit>(
|
||||
extraBufferCapacity = 1,
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST
|
||||
)
|
||||
|
||||
private val executor = viewModelScope + SupervisorJob()
|
||||
private var lastContext: WeakReference<Context>? = null
|
||||
private var updateJobs: MutableList<Job> = mutableListOf()
|
||||
/**
|
||||
* A [StateFlow] that emits the list of threads and the typing status of each thread.
|
||||
*
|
||||
* This flow will emit whenever the user asks us to reload the conversation list or
|
||||
* whenever the conversation list changes.
|
||||
*/
|
||||
val threads: StateFlow<Data?> = combine(observeConversationList(), observeTypingStatus(), ::Data)
|
||||
.stateIn(viewModelScope, SharingStarted.Eagerly, null)
|
||||
|
||||
private val _conversations = MutableLiveData<List<ThreadRecord>>()
|
||||
val conversations: LiveData<List<ThreadRecord>> = _conversations
|
||||
private fun observeTypingStatus(): Flow<Set<Long>> =
|
||||
ApplicationContext.getInstance(context).typingStatusRepository
|
||||
.typingThreads
|
||||
.asFlow()
|
||||
.onStart { emit(emptySet()) }
|
||||
.distinctUntilChanged()
|
||||
|
||||
private val listUpdateChannel = Channel<Unit>(capacity = Channel.CONFLATED)
|
||||
|
||||
fun tryUpdateChannel() = listUpdateChannel.trySend(Unit)
|
||||
|
||||
fun getObservable(context: Context): LiveData<List<ThreadRecord>> {
|
||||
// If the context has changed (eg. the activity gets recreated) then
|
||||
// we need to cancel the old executors and recreate them to prevent
|
||||
// the app from triggering extra updates when data changes
|
||||
if (context != lastContext?.get()) {
|
||||
lastContext = WeakReference(context)
|
||||
updateJobs.forEach { it.cancel() }
|
||||
updateJobs.clear()
|
||||
|
||||
updateJobs.add(
|
||||
executor.launch(Dispatchers.IO) {
|
||||
context.contentResolver
|
||||
.observeQuery(DatabaseContentProviders.ConversationList.CONTENT_URI)
|
||||
.onEach { listUpdateChannel.trySend(Unit) }
|
||||
.collect()
|
||||
}
|
||||
)
|
||||
updateJobs.add(
|
||||
executor.launch(Dispatchers.IO) {
|
||||
for (update in listUpdateChannel) {
|
||||
threadDb.approvedConversationList.use { openCursor ->
|
||||
val reader = threadDb.readerFor(openCursor)
|
||||
val threads = mutableListOf<ThreadRecord>()
|
||||
@Suppress("OPT_IN_USAGE")
|
||||
private fun observeConversationList(): Flow<List<ThreadRecord>> = merge(
|
||||
manualReloadTrigger,
|
||||
contentResolver.observeChanges(DatabaseContentProviders.ConversationList.CONTENT_URI))
|
||||
.debounce(CHANGE_NOTIFICATION_DEBOUNCE_MILLS)
|
||||
.onStart { emit(Unit) }
|
||||
.mapLatest { _ ->
|
||||
withContext(Dispatchers.IO) {
|
||||
threadDb.approvedConversationList.use { openCursor ->
|
||||
val reader = threadDb.readerFor(openCursor)
|
||||
buildList(reader.count) {
|
||||
while (true) {
|
||||
threads += reader.next ?: break
|
||||
}
|
||||
withContext(Dispatchers.Main) {
|
||||
_conversations.value = threads
|
||||
add(reader.next ?: break)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
return conversations
|
||||
}
|
||||
}
|
||||
|
||||
fun tryReload() = manualReloadTrigger.tryEmit(Unit)
|
||||
|
||||
data class Data(
|
||||
val threads: List<ThreadRecord>,
|
||||
val typingThreadIDs: Set<Long>
|
||||
)
|
||||
|
||||
companion object {
|
||||
private const val CHANGE_NOTIFICATION_DEBOUNCE_MILLS = 100L
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package org.thoughtcrime.securesms.util
|
||||
|
||||
import android.content.ContentResolver
|
||||
import android.database.ContentObserver
|
||||
import android.net.Uri
|
||||
import android.os.Handler
|
||||
import android.os.Looper
|
||||
import androidx.annotation.CheckResult
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
|
||||
/**
|
||||
* Observe changes to a content Uri. This function will emit the Uri whenever the content or
|
||||
* its descendants change, according to the parameter [notifyForDescendants].
|
||||
*/
|
||||
@CheckResult
|
||||
fun ContentResolver.observeChanges(uri: Uri, notifyForDescendants: Boolean = false): Flow<Uri> {
|
||||
return callbackFlow {
|
||||
val observer = object : ContentObserver(Handler(Looper.getMainLooper())) {
|
||||
override fun onChange(selfChange: Boolean) {
|
||||
trySend(uri)
|
||||
}
|
||||
}
|
||||
|
||||
registerContentObserver(uri, notifyForDescendants, observer)
|
||||
awaitClose {
|
||||
unregisterContentObserver(observer)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue