|
|
|
@ -23,6 +23,7 @@ class JobQueue : JobDelegate {
|
|
|
|
|
private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
|
|
|
|
private val scope = GlobalScope + SupervisorJob()
|
|
|
|
|
private val queue = Channel<Job>(UNLIMITED)
|
|
|
|
|
private val pendingJobIds = mutableSetOf<String>()
|
|
|
|
|
|
|
|
|
|
val timer = Timer()
|
|
|
|
|
|
|
|
|
@ -86,6 +87,19 @@ class JobQueue : JobDelegate {
|
|
|
|
|
MessagingModuleConfiguration.shared.storage.persistJob(job)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fun resumePendingSendMessage(job: Job) {
|
|
|
|
|
val id = job.id ?: run {
|
|
|
|
|
Log.e("Loki", "tried to resume pending send job with no ID")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if (!pendingJobIds.add(id)) {
|
|
|
|
|
Log.e("Loki","tried to re-queue pending/in-progress job")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
queue.offer(job)
|
|
|
|
|
Log.d("Loki", "resumed pending send message $id")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fun resumePendingJobs() {
|
|
|
|
|
if (hasResumedPendingJobs) {
|
|
|
|
|
Log.d("Loki", "resumePendingJobs() should only be called once.")
|
|
|
|
@ -120,6 +134,7 @@ class JobQueue : JobDelegate {
|
|
|
|
|
override fun handleJobSucceeded(job: Job) {
|
|
|
|
|
val jobId = job.id ?: return
|
|
|
|
|
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
|
|
|
|
|
pendingJobIds.remove(jobId)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override fun handleJobFailed(job: Job, error: Exception) {
|
|
|
|
@ -169,4 +184,7 @@ class JobQueue : JobDelegate {
|
|
|
|
|
val maxBackoff = (10 * 60).toDouble() // 10 minutes
|
|
|
|
|
return (1000 * 0.25 * min(maxBackoff, (2.0).pow(job.failureCount))).roundToLong()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private fun Job.isSend() = this is MessageSendJob || this is AttachmentUploadJob
|
|
|
|
|
|
|
|
|
|
}
|