[SES-1966] Attachment batch download and tidy-up (#1507)
* Attachment batch download * Addressed feedback and test issues * Feedback fixes * timedWindow for flow * Feedback * Dispatchers * Remove `flowOn` * New implementation of timedBuffer * Organise import * Feedback * Fix test * Tidied up logic around `eligibleForDownload` * Updated comment --------- Co-authored-by: fanchao <git@fanchao.dev>pull/1528/head
parent
fec67e282a
commit
0da949c8e6
@ -0,0 +1,115 @@
|
||||
package org.thoughtcrime.securesms.conversation.v2
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.receiveAsFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.plus
|
||||
import org.session.libsession.database.MessageDataProvider
|
||||
import org.session.libsession.database.StorageProtocol
|
||||
import org.session.libsession.messaging.jobs.AttachmentDownloadJob
|
||||
import org.session.libsession.messaging.jobs.AttachmentUploadJob
|
||||
import org.session.libsession.messaging.jobs.JobQueue
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentTransferProgress
|
||||
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
|
||||
import org.session.libsignal.utilities.Log
|
||||
import org.thoughtcrime.securesms.util.flatten
|
||||
import org.thoughtcrime.securesms.util.timedBuffer
|
||||
|
||||
/**
|
||||
* [AttachmentDownloadHandler] is responsible for handling attachment download requests. These
|
||||
* requests will go through different level of checking before they are queued for download.
|
||||
*
|
||||
* To use this handler, call [onAttachmentDownloadRequest] with the attachment that needs to be
|
||||
* downloaded. The call to [onAttachmentDownloadRequest] is cheap and can be called multiple times.
|
||||
*/
|
||||
class AttachmentDownloadHandler(
|
||||
private val storage: StorageProtocol,
|
||||
private val messageDataProvider: MessageDataProvider,
|
||||
jobQueue: JobQueue = JobQueue.shared,
|
||||
scope: CoroutineScope = CoroutineScope(Dispatchers.Default) + SupervisorJob(),
|
||||
) {
|
||||
companion object {
|
||||
private const val BUFFER_TIMEOUT_MILLS = 500L
|
||||
private const val BUFFER_MAX_ITEMS = 10
|
||||
private const val LOG_TAG = "AttachmentDownloadHelper"
|
||||
}
|
||||
|
||||
private val downloadRequests = Channel<DatabaseAttachment>(UNLIMITED)
|
||||
|
||||
init {
|
||||
scope.launch(Dispatchers.Default) {
|
||||
downloadRequests
|
||||
.receiveAsFlow()
|
||||
.timedBuffer(BUFFER_TIMEOUT_MILLS, BUFFER_MAX_ITEMS)
|
||||
.map(::filterEligibleAttachments)
|
||||
.flatten()
|
||||
.collect { attachment ->
|
||||
jobQueue.add(
|
||||
AttachmentDownloadJob(
|
||||
attachmentID = attachment.attachmentId.rowId,
|
||||
databaseMessageID = attachment.mmsId
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter attachments that are eligible for creating download jobs.
|
||||
*
|
||||
*/
|
||||
private fun filterEligibleAttachments(attachments: List<DatabaseAttachment>): List<DatabaseAttachment> {
|
||||
val pendingAttachmentIDs = storage
|
||||
.getAllPendingJobs(AttachmentDownloadJob.KEY, AttachmentUploadJob.KEY)
|
||||
.values
|
||||
.mapNotNull {
|
||||
(it as? AttachmentUploadJob)?.attachmentID
|
||||
?: (it as? AttachmentDownloadJob)?.attachmentID
|
||||
}
|
||||
.toSet()
|
||||
|
||||
|
||||
return attachments.filter { attachment ->
|
||||
eligibleForDownloadTask(
|
||||
attachment,
|
||||
pendingAttachmentIDs,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the attachment is eligible for download task.
|
||||
*/
|
||||
private fun eligibleForDownloadTask(
|
||||
attachment: DatabaseAttachment,
|
||||
pendingJobsAttachmentRowIDs: Set<Long>,
|
||||
): Boolean {
|
||||
if (attachment.attachmentId.rowId in pendingJobsAttachmentRowIDs) {
|
||||
return false
|
||||
}
|
||||
|
||||
val threadID = storage.getThreadIdForMms(attachment.mmsId)
|
||||
|
||||
return AttachmentDownloadJob.eligibleForDownload(
|
||||
threadID, storage, messageDataProvider, attachment.mmsId,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
fun onAttachmentDownloadRequest(attachment: DatabaseAttachment) {
|
||||
if (attachment.transferState != AttachmentTransferProgress.TRANSFER_PROGRESS_PENDING) {
|
||||
Log.i(
|
||||
LOG_TAG,
|
||||
"Attachment ${attachment.attachmentId} is not pending, skipping download"
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
downloadRequests.trySend(attachment)
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package org.thoughtcrime.securesms.util
|
||||
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.channelFlow
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.flow.flatMapConcat
|
||||
|
||||
/**
|
||||
* Buffers items from the flow and emits them in batches. The batch will have size [maxItems] and
|
||||
* time [timeoutMillis] limit.
|
||||
*/
|
||||
fun <T> Flow<T>.timedBuffer(timeoutMillis: Long, maxItems: Int): Flow<List<T>> {
|
||||
return channelFlow {
|
||||
val buffer = mutableListOf<T>()
|
||||
var bufferBeganAt = -1L
|
||||
|
||||
collectLatest { value ->
|
||||
if (buffer.isEmpty()) {
|
||||
bufferBeganAt = System.currentTimeMillis()
|
||||
}
|
||||
|
||||
buffer.add(value)
|
||||
|
||||
if (buffer.size < maxItems) {
|
||||
// If the buffer is not full, wait until the time limit is reached.
|
||||
// The delay here, as a suspension point, will be cancelled by `collectLatest`,
|
||||
// if another item is collected while we are waiting for the `delay` to complete.
|
||||
// Once the delay is cancelled, another round of `collectLatest` will be restarted.
|
||||
delay((System.currentTimeMillis() + timeoutMillis - bufferBeganAt).coerceAtLeast(0L))
|
||||
}
|
||||
|
||||
// When we reach here, it's either the buffer is full, or the timeout has been reached:
|
||||
// send out the buffer and reset the state
|
||||
send(buffer.toList())
|
||||
buffer.clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
fun <T> Flow<Iterable<T>>.flatten(): Flow<T> = flatMapConcat { it.asFlow() }
|
@ -0,0 +1,52 @@
|
||||
package org.thoughtcrime.securesms.util
|
||||
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.flowOf
|
||||
import kotlinx.coroutines.flow.toCollection
|
||||
import kotlinx.coroutines.flow.toList
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Test
|
||||
|
||||
class FlowUtilsTest {
|
||||
|
||||
@Test
|
||||
fun `timedBuffer should emit buffer when it's full`() = runTest {
|
||||
// Given
|
||||
val flow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
|
||||
val timeoutMillis = 1000L
|
||||
val maxItems = 5
|
||||
|
||||
// When
|
||||
val result = flow.timedBuffer(timeoutMillis, maxItems).toList()
|
||||
|
||||
// Then
|
||||
assertEquals(2, result.size)
|
||||
assertEquals(listOf(1, 2, 3, 4, 5), result[0])
|
||||
assertEquals(listOf(6, 7, 8, 9, 10), result[1])
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
@Test
|
||||
fun `timedBuffer should emit buffer when timeout expires`() = runTest {
|
||||
// Given
|
||||
val flow = flow {
|
||||
emit(1)
|
||||
emit(2)
|
||||
emit(3)
|
||||
testScheduler.advanceTimeBy(200L)
|
||||
emit(4)
|
||||
}
|
||||
val timeoutMillis = 100L
|
||||
val maxItems = 5
|
||||
|
||||
// When
|
||||
val result = flow.timedBuffer(timeoutMillis, maxItems).toList()
|
||||
|
||||
// Then
|
||||
assertEquals(2, result.size)
|
||||
assertEquals(listOf(1, 2, 3), result[0])
|
||||
assertEquals(listOf(4), result[1])
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue