WIP: refactor jobs (basic)

pull/420/head
Ryan ZHAO 4 years ago
parent da71fdfe44
commit bfb16c581a

@ -1,4 +1,17 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
class AttachmentDownloadJob: Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 100
companion object {
val collection: String = "AttachmentDownloadJobCollection"
}
override fun execute() {
TODO("Not yet implemented")
}
}

@ -1,4 +1,17 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
class AttachmentUploadJob : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 20
companion object {
val collection: String = "AttachmentUploadJobCollection"
}
override fun execute() {
TODO("Not yet implemented")
}
}

@ -1,4 +1,11 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
interface Job {
var delegate: JobDelegate?
var id: String?
var failureCount: Int
val maxFailureCount: Int
fun execute()
}

@ -1,4 +1,7 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
interface JobDelegate {
fun handleJobSucceeded(job: Job)
fun handleJobFailed(job: Job, error: Exception)
fun handleJobFailedPermanently(job: Job, error: Exception)
}

@ -1,4 +1,89 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
import kotlin.math.min
import kotlin.math.pow
import java.util.Timer
import org.session.libsession.messaging.Configuration
import org.session.libsignal.libsignal.logging.Log
import kotlin.concurrent.schedule
import kotlin.math.roundToLong
class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
companion object {
val shared: JobQueue by lazy { JobQueue() }
}
fun add(job: Job) {
addWithoutExecuting(job)
job.execute()
}
fun addWithoutExecuting(job: Job) {
job.id = System.currentTimeMillis().toString()
Configuration.shared.storage.persist(job)
job.delegate = this
}
fun resumePendingJobs() {
if (hasResumedPendingJobs) {
Log.d("Loki", "resumePendingJobs() should only be called once.")
return
}
hasResumedPendingJobs = true
val allJobTypes = listOf(AttachmentDownloadJob.collection, AttachmentDownloadJob.collection, MessageReceiveJob.collection, MessageSendJob.collection, NotifyPNServerJob.collection)
allJobTypes.forEach { type ->
val allPendingJobs = Configuration.shared.storage.getAllPendingJobs(type)
allPendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
job.delegate = this
job.execute()
}
}
}
override fun handleJobSucceeded(job: Job) {
Configuration.shared.storage.markJobAsSucceeded(job)
}
override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1
val storage = Configuration.shared.storage
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
storage.persist(job)
if (job.failureCount == job.maxFailureCount) {
storage.markJobAsFailed(job)
} else {
val retryInterval = getRetryInterval(job)
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
Timer().schedule(delay = retryInterval) {
Log.i("Jobs", "Retrying ${job::class.simpleName}.")
job.execute()
}
}
}
override fun handleJobFailedPermanently(job: Job, error: Exception) {
job.failureCount += 1
val storage = Configuration.shared.storage
storage.persist(job)
storage.markJobAsFailed(job)
}
private fun getRetryInterval(job: Job): Long {
// Arbitrary backoff factor...
// try 1 delay: 0ms
// try 2 delay: 190ms
// ...
// try 5 delay: 1300ms
// ...
// try 11 delay: 61310ms
val backoffFactor = 1.9
val maxBackoff = (60 * 60 * 1000).toDouble()
return (100 * min(maxBackoff, backoffFactor.pow(job.failureCount))).roundToLong()
}
}

@ -1,4 +1,17 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
class MessageReceiveJob : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 10
companion object {
val collection: String = "MessageReceiveJobCollection"
}
override fun execute() {
TODO("Not yet implemented")
}
}

@ -1,4 +1,17 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
class MessageSendJob : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 10
companion object {
val collection: String = "MessageSendJobCollection"
}
override fun execute() {
TODO("Not yet implemented")
}
}

@ -1,4 +1,57 @@
package org.session.messaging.jobs
package org.session.libsession.messaging.jobs
class NotifyPNServerJob : Job {
import nl.komponents.kovenant.functional.map
import okhttp3.MediaType
import okhttp3.Request
import okhttp3.RequestBody
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsignal.libsignal.logging.Log
import org.session.libsignal.service.internal.util.JsonUtil
import org.session.libsignal.service.loki.utilities.retryIfNeeded
class NotifyPNServerJob(val message: SnodeMessage) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
// Settings
override val maxFailureCount: Int = 20
companion object {
val collection: String = "NotifyPNServerJobCollection"
}
// Running
override fun execute() {
val server = PushNotificationAPI.server
val parameters = mapOf( "data" to message.data, "send_to" to message.recipient )
val url = "${server}/notify"
val body = RequestBody.create(MediaType.get("application/json"), JsonUtil.toJson(parameters))
val request = Request.Builder().url(url).post(body)
retryIfNeeded(4) {
OnionRequestAPI.sendOnionRequest(request.build(), server, PushNotificationAPI.serverPublicKey, "/loki/v2/lsrpc").map { json ->
val code = json["code"] as? Int
if (code == null || code == 0) {
Log.d("Loki", "[Loki] Couldn't notify PN server due to error: ${json["message"] as? String ?: "null"}.")
}
}.fail { exception ->
Log.d("Loki", "[Loki] Couldn't notify PN server due to error: $exception.")
}
}.success {
handleSuccess()
}. fail {
handleFailure(it)
}
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
}
private fun handleFailure(error: Exception) {
delegate?.handleJobFailed(this, error)
}
}
Loading…
Cancel
Save