Made some tweaks to help properly stop the JobRunner

pull/771/head
Morgan Pretty 2 years ago
parent b622e9342c
commit d38a44515d

@ -111,6 +111,7 @@ public final class JobRunner {
fileprivate static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([]) fileprivate static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
private static var hasCompletedInitialBecomeActive: Atomic<Bool> = Atomic(false) private static var hasCompletedInitialBecomeActive: Atomic<Bool> = Atomic(false)
private static var shutdownBackgroundTask: Atomic<OWSBackgroundTask?> = Atomic(nil) private static var shutdownBackgroundTask: Atomic<OWSBackgroundTask?> = Atomic(nil)
fileprivate static var canStartQueues: Atomic<Bool> = Atomic(false)
// MARK: - Configuration // MARK: - Configuration
@ -161,6 +162,9 @@ public final class JobRunner {
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
// Start the job runner if needed // Start the job runner if needed
db.afterNextTransaction { _ in db.afterNextTransaction { _ in
queues.wrappedValue[job.variant]?.start() queues.wrappedValue[job.variant]?.start()
@ -188,15 +192,13 @@ public final class JobRunner {
queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
// Start the job runner if needed
db.afterNextTransaction { _ in
queues.wrappedValue[updatedJob.variant]?.start()
}
return (jobId, updatedJob) return (jobId, updatedJob)
} }
public static func appDidFinishLaunching() { public static func appDidFinishLaunching() {
// Flag that the JobRunner can start it's queues
JobRunner.canStartQueues.mutate { $0 = true }
// Note: 'appDidBecomeActive' will run on first launch anyway so we can // Note: 'appDidBecomeActive' will run on first launch anyway so we can
// leave those jobs out and can wait until then to start the JobRunner // leave those jobs out and can wait until then to start the JobRunner
let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = Storage.shared let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = Storage.shared
@ -242,6 +244,9 @@ public final class JobRunner {
} }
public static func appDidBecomeActive() { public static func appDidBecomeActive() {
// Flag that the JobRunner can start it's queues
JobRunner.canStartQueues.mutate { $0 = true }
// If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it // If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it
// can result in the database being suspended and us being unable to interact with it at all // can result in the database being suspended and us being unable to interact with it at all
shutdownBackgroundTask.mutate { shutdownBackgroundTask.mutate {
@ -291,6 +296,11 @@ public final class JobRunner {
exceptForVariant: Job.Variant? = nil, exceptForVariant: Job.Variant? = nil,
onComplete: (() -> ())? = nil onComplete: (() -> ())? = nil
) { ) {
// Inform the JobRunner that it can't start any queues (this is to prevent queues from
// rescheduling themselves while in the background, when the app restarts or becomes active
// the JobRunenr will update this flag)
JobRunner.canStartQueues.mutate { $0 = false }
// Stop all queues except for the one containing the `exceptForVariant` // Stop all queues except for the one containing the `exceptForVariant`
queues.wrappedValue queues.wrappedValue
.values .values
@ -632,6 +642,7 @@ private final class JobQueue {
fileprivate func start(force: Bool = false) { fileprivate func start(force: Bool = false) {
// We only want the JobRunner to run in the main app // We only want the JobRunner to run in the main app
guard CurrentAppContext().isMainApp else { return } guard CurrentAppContext().isMainApp else { return }
guard JobRunner.canStartQueues.wrappedValue else { return }
guard force || !isRunning.wrappedValue else { return } guard force || !isRunning.wrappedValue else { return }
// The JobRunner runs synchronously we need to ensure this doesn't start // The JobRunner runs synchronously we need to ensure this doesn't start
@ -852,8 +863,9 @@ private final class JobQueue {
.fetchOne(db) .fetchOne(db)
} }
// If there are no remaining jobs the trigger the 'onQueueDrained' callback and stop // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger
guard let nextJobTimestamp: TimeInterval = nextJobTimestamp else { // the 'onQueueDrained' callback and stop
guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else {
if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty {
self.onQueueDrained?() self.onQueueDrained?()
} }
@ -1064,6 +1076,8 @@ private final class JobQueue {
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
} }
} }
performCleanUp(for: job, result: .failed)
return return
} }
@ -1082,7 +1096,7 @@ private final class JobQueue {
try job.dependantJobs try job.dependantJobs
.updateAll( .updateAll(
db, db,
Job.Columns.failureCount.set(to: job.failureCount), Job.Columns.failureCount.set(to: (job.failureCount + 1)),
Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000))) Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000)))
) )

Loading…
Cancel
Save