Fixed up and added more unit tests to the JobRunner

pull/813/head
Morgan Pretty 1 year ago
parent 77b6faccb3
commit fc94d24ddf

@ -151,7 +151,7 @@ internal extension ControlMessageProcessRecord {
.infoClosedGroupCreated: .infoClosedGroupCreated:
return nil return nil
case .infoClosedGroupUpdated, .infoClosedGroupCurrentUserLeft, .infoClosedGroupCurrentUserLeaving, .infoClosedGroupCurrentUserErrorLeaving: case .infoClosedGroupUpdated, .infoClosedGroupCurrentUserLeft, .infoClosedGroupCurrentUserLeaving, .infoClosedGroupCurrentUserErrorLeaving:
self.variant = .closedGroupControlMessage self.variant = .closedGroupControlMessage
case .infoDisappearingMessagesUpdate: case .infoDisappearingMessagesUpdate:

@ -40,6 +40,12 @@ open class Dependencies {
set { _date.mutate { $0 = newValue } } set { _date.mutate { $0 = newValue } }
} }
public var _fixedTime: Atomic<Int?>
public var fixedTime: Int {
get { Dependencies.getValueSettingIfNull(&_fixedTime) { 0 } }
set { _fixedTime.mutate { $0 = newValue } }
}
// MARK: - Initialization // MARK: - Initialization
public init( public init(
@ -48,7 +54,8 @@ open class Dependencies {
jobRunner: JobRunnerType? = nil, jobRunner: JobRunnerType? = nil,
scheduler: ValueObservationScheduler? = nil, scheduler: ValueObservationScheduler? = nil,
standardUserDefaults: UserDefaultsType? = nil, standardUserDefaults: UserDefaultsType? = nil,
date: Date? = nil date: Date? = nil,
fixedTime: Int? = nil
) { ) {
_generalCache = Atomic(generalCache) _generalCache = Atomic(generalCache)
_storage = Atomic(storage) _storage = Atomic(storage)
@ -56,6 +63,7 @@ open class Dependencies {
_scheduler = Atomic(scheduler) _scheduler = Atomic(scheduler)
_standardUserDefaults = Atomic(standardUserDefaults) _standardUserDefaults = Atomic(standardUserDefaults)
_date = Atomic(date) _date = Atomic(date)
_fixedTime = Atomic(fixedTime)
} }
// MARK: - Convenience // MARK: - Convenience

@ -475,10 +475,7 @@ public final class JobRunner: JobRunnerType {
return return
} }
queues.mutate { queues.wrappedValue[updatedJob.variant]?.add(db, job: updatedJob, canStartJob: canStartJob, dependencies: dependencies)
$0[updatedJob.variant]?
.add(updatedJob, canStartJob: canStartJob, dependencies: dependencies)
}
// Don't start the queue if the job can't be started // Don't start the queue if the job can't be started
guard canStartJob else { return } guard canStartJob else { return }
@ -501,7 +498,7 @@ public final class JobRunner: JobRunnerType {
return return
} }
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob, dependencies: dependencies) queues.wrappedValue[job.variant]?.upsert(db, job: job, canStartJob: canStartJob, dependencies: dependencies)
// Don't start the queue if the job can't be started // Don't start the queue if the job can't be started
guard canStartJob else { return } guard canStartJob else { return }
@ -618,14 +615,28 @@ public final class JobQueue {
/// on our random queue threads results in the timer never firing, the `start` method will redirect itself to /// on our random queue threads results in the timer never firing, the `start` method will redirect itself to
/// the correct thread /// the correct thread
let trigger: Trigger = Trigger() let trigger: Trigger = Trigger()
trigger.fireTimestamp = max(1, (timestamp - Date().timeIntervalSince1970)) trigger.fireTimestamp = max(1, (timestamp - dependencies.date.timeIntervalSince1970))
trigger.timer = Timer.scheduledTimerOnMainThread(
withTimeInterval: trigger.fireTimestamp, switch HasAppContext() && CurrentAppContext().isRunningTests {
repeats: false, case true:
block: { [weak queue] _ in // When running unit tests don't schedule a proper Timer, use a while loop instead
queue?.start(dependencies: dependencies) DispatchQueue.global(qos: .default).async { [weak queue] in
} while timestamp < dependencies.date.timeIntervalSince1970 {
) Thread.sleep(forTimeInterval: 0.01) // Wait for 10ms
}
queue?.start(dependencies: dependencies)
}
case false:
trigger.timer = Timer.scheduledTimerOnMainThread(
withTimeInterval: trigger.fireTimestamp,
repeats: false,
block: { [weak queue] _ in
queue?.start(dependencies: dependencies)
}
)
}
return trigger return trigger
} }
@ -711,12 +722,12 @@ public final class JobQueue {
// MARK: - Execution // MARK: - Execution
fileprivate func add(_ job: Job, canStartJob: Bool = true, dependencies: Dependencies) { fileprivate func add(_ db: Database, job: Job, canStartJob: Bool = true, dependencies: Dependencies) {
// Check if the job should be added to the queue // Check if the job should be added to the queue
guard guard
canStartJob, canStartJob,
job.behaviour != .runOnceNextLaunch, job.behaviour != .runOnceNextLaunch,
job.nextRunTimestamp <= Date().timeIntervalSince1970 job.nextRunTimestamp <= dependencies.date.timeIntervalSince1970
else { return } else { return }
guard job.id != nil else { guard job.id != nil else {
SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue") SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue")
@ -724,6 +735,15 @@ public final class JobQueue {
} }
pendingJobsQueue.mutate { $0.append(job) } pendingJobsQueue.mutate { $0.append(job) }
// If this is a concurrent queue then we should immediately start the next job
guard executionType == .concurrent else { return }
// Ensure that the database commit has completed and then trigger the next job to run (need
// to ensure any interactions have been correctly inserted first)
db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in
self?.runNextJob(dependencies: dependencies)
}
} }
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
@ -731,7 +751,7 @@ public final class JobQueue {
/// ///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started /// is in the future then the job won't be started
fileprivate func upsert(_ job: Job, canStartJob: Bool = true, dependencies: Dependencies) { fileprivate func upsert(_ db: Database, job: Job, canStartJob: Bool = true, dependencies: Dependencies) {
guard let jobId: Int64 = job.id else { guard let jobId: Int64 = job.id else {
SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue")
return return
@ -754,7 +774,7 @@ public final class JobQueue {
// If we didn't update an existing job then we need to add it to the pendingJobsQueue // If we didn't update an existing job then we need to add it to the pendingJobsQueue
guard !didUpdateExistingJob else { return } guard !didUpdateExistingJob else { return }
add(job, canStartJob: canStartJob, dependencies: dependencies) add(db, job: job, canStartJob: canStartJob, dependencies: dependencies)
} }
fileprivate func insert(_ job: Job, before otherJob: Job, dependencies: Dependencies) { fileprivate func insert(_ job: Job, before otherJob: Job, dependencies: Dependencies) {
@ -790,7 +810,11 @@ public final class JobQueue {
} }
} }
fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { fileprivate func appDidBecomeActive(
with jobs: [Job],
canStart: Bool,
dependencies: Dependencies
) {
let currentlyRunningJobIds: Set<Int64> = jobsCurrentlyRunning.wrappedValue let currentlyRunningJobIds: Set<Int64> = jobsCurrentlyRunning.wrappedValue
pendingJobsQueue.mutate { queue in pendingJobsQueue.mutate { queue in
@ -826,7 +850,7 @@ public final class JobQueue {
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool {
guard let detailsData: Data = detailsData else { return false } guard let detailsData: Data = detailsData else { return false }
let pendingJobs: [Job] = queue.wrappedValue let pendingJobs: [Job] = pendingJobsQueue.wrappedValue
guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true } guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true }
@ -1013,7 +1037,7 @@ public final class JobQueue {
/// ///
/// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies
/// are successfully completed /// are successfully completed
let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys.map { $0.id })
let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
@ -1023,7 +1047,7 @@ public final class JobQueue {
.filter { !dependencyJobsNotCurrentlyRunning.contains($0) } .filter { !dependencyJobsNotCurrentlyRunning.contains($0) }
.inserting(contentsOf: dependencyJobsNotCurrentlyRunning, at: 0) .inserting(contentsOf: dependencyJobsNotCurrentlyRunning, at: 0)
} }
handleJobDeferred(nextJob) handleJobDeferred(nextJob, dependencies: dependencies)
return return
} }
@ -1105,7 +1129,7 @@ public final class JobQueue {
} }
// If the next job isn't scheduled in the future then just restart the JobRunner immediately // If the next job isn't scheduled in the future then just restart the JobRunner immediately
let secondsUntilNextJob: TimeInterval = (nextJobTimestamp - Date().timeIntervalSince1970) let secondsUntilNextJob: TimeInterval = (nextJobTimestamp - dependencies.date.timeIntervalSince1970)
guard secondsUntilNextJob > 0 else { guard secondsUntilNextJob > 0 else {
// Only log that the queue is getting restarted if this queue had actually been about to stop // Only log that the queue is getting restarted if this queue had actually been about to stop
@ -1218,7 +1242,7 @@ public final class JobQueue {
/// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
/// removed from the queue, replaced by their dependencies /// removed from the queue, replaced by their dependencies
if !dependantJobs.isEmpty { if !dependantJobs.isEmpty {
let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys.map { $0.id })
let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
@ -1288,7 +1312,7 @@ public final class JobQueue {
// Get the max failure count for the job (a value of '-1' means it will retry indefinitely) // Get the max failure count for the job (a value of '-1' means it will retry indefinitely)
let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0) let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0)
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) let nextRunTimestamp: TimeInterval = (dependencies.date.timeIntervalSince1970 + JobRunner.getRetryInterval(for: job))
dependencies.storage.write { db in dependencies.storage.write { db in
/// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try
@ -1313,7 +1337,7 @@ public final class JobQueue {
updatedFailureCount <= maxFailureCount updatedFailureCount <= maxFailureCount
) )
else { else {
SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")") SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 && updatedFailureCount > maxFailureCount ? "; too many retries" : "")")
// If the job permanently failed or we have performed all of our retry attempts // If the job permanently failed or we have performed all of our retry attempts
// then delete the job and all of it's dependant jobs (it'll probably never succeed) // then delete the job and all of it's dependant jobs (it'll probably never succeed)
@ -1364,12 +1388,12 @@ public final class JobQueue {
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else { guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
$0 = $0.setting( $0 = $0.setting(
job.id, job.id,
(1, [Date().timeIntervalSince1970]) (1, [dependencies.date.timeIntervalSince1970])
) )
return return
} }
let timeNow: TimeInterval = Date().timeIntervalSince1970 let timeNow: TimeInterval = dependencies.date.timeIntervalSince1970
stuckInDeferLoop = ( stuckInDeferLoop = (
lastRecord.count >= JobQueue.deferralLoopThreshold && lastRecord.count >= JobQueue.deferralLoopThreshold &&
(timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count) (timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count)

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save