|
|
|
@ -36,6 +36,13 @@ public protocol JobExecutor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public final class JobRunner {
|
|
|
|
|
public enum JobResult {
|
|
|
|
|
case succeeded
|
|
|
|
|
case failed
|
|
|
|
|
case deferred
|
|
|
|
|
case notFound
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static let blockingQueue: Atomic<JobQueue?> = Atomic(
|
|
|
|
|
JobQueue(
|
|
|
|
|
type: .blocking,
|
|
|
|
@ -332,6 +339,15 @@ public final class JobRunner {
|
|
|
|
|
.defaulting(to: [:])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) {
|
|
|
|
|
guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else {
|
|
|
|
|
callback(.notFound)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue.afterCurrentlyRunningJob(jobId, callback: callback)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static func hasPendingOrRunningJob<T: Encodable>(with variant: Job.Variant, details: T) -> Bool {
|
|
|
|
|
guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false }
|
|
|
|
|
guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false }
|
|
|
|
@ -339,6 +355,12 @@ public final class JobRunner {
|
|
|
|
|
return targetQueue.hasPendingOrRunningJob(with: detailsData)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static func removePendingJob(_ job: Job?) {
|
|
|
|
|
guard let job: Job = job, let jobId: Int64 = job.id else { return }
|
|
|
|
|
|
|
|
|
|
queues.wrappedValue[job.variant]?.removePendingJob(jobId)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MARK: - Convenience
|
|
|
|
|
|
|
|
|
|
fileprivate static func getRetryInterval(for job: Job) -> TimeInterval {
|
|
|
|
@ -445,6 +467,7 @@ private final class JobQueue {
|
|
|
|
|
fileprivate var isRunning: Atomic<Bool> = Atomic(false)
|
|
|
|
|
private var queue: Atomic<[Job]> = Atomic([])
|
|
|
|
|
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
|
|
|
|
|
private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
|
|
|
|
|
private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:])
|
|
|
|
|
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
|
|
|
|
|
|
|
|
|
@ -560,12 +583,29 @@ private final class JobQueue {
|
|
|
|
|
return detailsForCurrentlyRunningJobs.wrappedValue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) {
|
|
|
|
|
guard isCurrentlyRunning(jobId) else {
|
|
|
|
|
callback(.notFound)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jobCallbacks.mutate { jobCallbacks in
|
|
|
|
|
jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool {
|
|
|
|
|
let pendingJobs: [Job] = queue.wrappedValue
|
|
|
|
|
|
|
|
|
|
return pendingJobs.contains { job in job.details == detailsData }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fileprivate func removePendingJob(_ jobId: Int64) {
|
|
|
|
|
queue.mutate { queue in
|
|
|
|
|
queue = queue.filter { $0.id != jobId }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MARK: - Job Running
|
|
|
|
|
|
|
|
|
|
fileprivate func start(force: Bool = false) {
|
|
|
|
@ -900,10 +940,8 @@ private final class JobQueue {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The job is removed from the queue before it runs so all we need to to is remove it
|
|
|
|
|
// from the 'currentlyRunning' set and start the next one
|
|
|
|
|
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
|
|
|
|
|
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
|
|
|
|
|
// Perform job cleanup and start the next job
|
|
|
|
|
performCleanUp(for: job, result: .succeeded)
|
|
|
|
|
internalQueue.async { [weak self] in
|
|
|
|
|
self?.runNextJob()
|
|
|
|
|
}
|
|
|
|
@ -914,8 +952,7 @@ private final class JobQueue {
|
|
|
|
|
private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) {
|
|
|
|
|
guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else {
|
|
|
|
|
SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled")
|
|
|
|
|
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
|
|
|
|
|
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
|
|
|
|
|
performCleanUp(for: job, result: .failed)
|
|
|
|
|
|
|
|
|
|
internalQueue.async { [weak self] in
|
|
|
|
|
self?.runNextJob()
|
|
|
|
@ -923,12 +960,30 @@ private final class JobQueue {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If this is the blocking queue and a "blocking" job failed then rerun it immediately
|
|
|
|
|
// If this is the blocking queue and a "blocking" job failed then rerun it
|
|
|
|
|
// immediately (in this case we don't trigger any job callbacks because the
|
|
|
|
|
// job isn't actually done, it's going to try again immediately)
|
|
|
|
|
if self.type == .blocking && job.shouldBlock {
|
|
|
|
|
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately")
|
|
|
|
|
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
|
|
|
|
|
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
|
|
|
|
|
queue.mutate { $0.insert(job, at: 0) }
|
|
|
|
|
|
|
|
|
|
// If it was a possible deferral loop then we don't actually want to
|
|
|
|
|
// retry the job (even if it's a blocking one, this gives a small chance
|
|
|
|
|
// that the app could continue to function)
|
|
|
|
|
let wasPossibleDeferralLoop: Bool = {
|
|
|
|
|
if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true }
|
|
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
}()
|
|
|
|
|
performCleanUp(
|
|
|
|
|
for: job,
|
|
|
|
|
result: .failed,
|
|
|
|
|
shouldTriggerCallbacks: wasPossibleDeferralLoop
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Only add it back to the queue if it wasn't a deferral loop
|
|
|
|
|
if !wasPossibleDeferralLoop {
|
|
|
|
|
queue.mutate { $0.insert(job, at: 0) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
internalQueue.async { [weak self] in
|
|
|
|
|
self?.runNextJob()
|
|
|
|
@ -1003,8 +1058,7 @@ private final class JobQueue {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
|
|
|
|
|
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
|
|
|
|
|
performCleanUp(for: job, result: .failed)
|
|
|
|
|
internalQueue.async { [weak self] in
|
|
|
|
|
self?.runNextJob()
|
|
|
|
|
}
|
|
|
|
@ -1014,8 +1068,7 @@ private final class JobQueue {
|
|
|
|
|
/// on other jobs, and it should automatically manage those dependencies)
|
|
|
|
|
private func handleJobDeferred(_ job: Job) {
|
|
|
|
|
var stuckInDeferLoop: Bool = false
|
|
|
|
|
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
|
|
|
|
|
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
|
|
|
|
|
|
|
|
|
|
deferLoopTracker.mutate {
|
|
|
|
|
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
|
|
|
|
|
$0 = $0.setting(
|
|
|
|
@ -1055,8 +1108,29 @@ private final class JobQueue {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
performCleanUp(for: job, result: .deferred)
|
|
|
|
|
internalQueue.async { [weak self] in
|
|
|
|
|
self?.runNextJob()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) {
|
|
|
|
|
// The job is removed from the queue before it runs so all we need to to is remove it
|
|
|
|
|
// from the 'currentlyRunning' set
|
|
|
|
|
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
|
|
|
|
|
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
|
|
|
|
|
|
|
|
|
|
guard shouldTriggerCallbacks else { return }
|
|
|
|
|
|
|
|
|
|
// Run any job callbacks now that it's done
|
|
|
|
|
var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = []
|
|
|
|
|
jobCallbacks.mutate { jobCallbacks in
|
|
|
|
|
jobCallbacksToRun = (jobCallbacks[job.id] ?? [])
|
|
|
|
|
jobCallbacks = jobCallbacks.removingValue(forKey: job.id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DispatchQueue.global(qos: .default).async {
|
|
|
|
|
jobCallbacksToRun.forEach { $0(result) }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|