You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-ios/SessionUtilitiesKit/JobRunner/JobRunner.swift

1352 lines
58 KiB
Swift

// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
public protocol JobExecutor {
/// The maximum number of times the job can fail before it fails permanently
///
/// **Note:** A value of `-1` means it will retry indefinitely
static var maxFailureCount: Int { get }
static var requiresThreadId: Bool { get }
static var requiresInteractionId: Bool { get }
/// This method contains the logic needed to complete a job
///
/// **Note:** The code in this method should run synchronously and the various
/// "result" blocks should not be called within a database closure
///
/// - Parameters:
/// - job: The job which is being run
/// - success: The closure which is called when the job succeeds (with an
/// updated `job` and a flag indicating whether the job should forcibly stop running)
/// - failure: The closure which is called when the job fails (with an updated
/// `job`, an `Error` (if applicable) and a flag indicating whether it was a permanent
/// failure)
/// - deferred: The closure which is called when the job is deferred (with an
/// updated `job`)
static func run(
_ job: Job,
queue: DispatchQueue,
success: @escaping (Job, Bool) -> (),
failure: @escaping (Job, Error?, Bool) -> (),
deferred: @escaping (Job) -> ()
)
}
public final class JobRunner {
public enum JobResult {
case succeeded
case failed
case deferred
case notFound
}
public struct JobInfo {
public let threadId: String?
public let interactionId: Int64?
public let detailsData: Data?
}
private static let blockingQueue: Atomic<JobQueue?> = Atomic(
JobQueue(
type: .blocking,
qos: .default,
jobVariants: [],
onQueueDrained: {
// Once all blocking jobs have been completed we want to start running
// the remaining job queues
queues.wrappedValue.forEach { _, queue in queue.start() }
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
blockingQueueDrainCallback.mutate {
$0.forEach { $0() }
$0 = []
}
}
)
)
private static let queues: Atomic<[Job.Variant: JobQueue]> = {
var jobVariants: Set<Job.Variant> = Job.Variant.allCases.asSet()
let expirationUpdateQueue: JobQueue = JobQueue(
type: .expirationUpdate,
executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
qos: .default,
jobVariants: [
jobVariants.remove(.expirationUpdate),
jobVariants.remove(.getExpiration)
].compactMap { $0 }
)
let messageSendQueue: JobQueue = JobQueue(
type: .messageSend,
executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
qos: .default,
jobVariants: [
jobVariants.remove(.attachmentUpload),
jobVariants.remove(.messageSend),
jobVariants.remove(.notifyPushServer),
jobVariants.remove(.sendReadReceipts),
Merge remote-tracking branch 'upstream/dev' into feature/updated-user-config-handling # Conflicts: # Podfile.lock # Session.xcodeproj/project.pbxproj # Session/Closed Groups/EditClosedGroupVC.swift # Session/Conversations/Settings/ThreadSettingsViewModel.swift # Session/Home/HomeVC.swift # Session/Home/HomeViewModel.swift # Session/Meta/Translations/de.lproj/Localizable.strings # Session/Meta/Translations/en.lproj/Localizable.strings # Session/Meta/Translations/es.lproj/Localizable.strings # Session/Meta/Translations/fa.lproj/Localizable.strings # Session/Meta/Translations/fi.lproj/Localizable.strings # Session/Meta/Translations/fr.lproj/Localizable.strings # Session/Meta/Translations/hi.lproj/Localizable.strings # Session/Meta/Translations/hr.lproj/Localizable.strings # Session/Meta/Translations/id-ID.lproj/Localizable.strings # Session/Meta/Translations/it.lproj/Localizable.strings # Session/Meta/Translations/ja.lproj/Localizable.strings # Session/Meta/Translations/nl.lproj/Localizable.strings # Session/Meta/Translations/pl.lproj/Localizable.strings # Session/Meta/Translations/pt_BR.lproj/Localizable.strings # Session/Meta/Translations/ru.lproj/Localizable.strings # Session/Meta/Translations/si.lproj/Localizable.strings # Session/Meta/Translations/sk.lproj/Localizable.strings # Session/Meta/Translations/sv.lproj/Localizable.strings # Session/Meta/Translations/th.lproj/Localizable.strings # Session/Meta/Translations/vi-VN.lproj/Localizable.strings # Session/Meta/Translations/zh-Hant.lproj/Localizable.strings # Session/Meta/Translations/zh_CN.lproj/Localizable.strings # Session/Shared/FullConversationCell.swift # SessionMessagingKit/Configuration.swift # SessionMessagingKit/Database/Models/SessionThread.swift # SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift # SessionMessagingKit/Shared Models/SessionThreadViewModel.swift # SessionUIKit/Utilities/UIContextualAction+Theming.swift # SessionUtilitiesKit/Database/Models/Job.swift # SessionUtilitiesKit/General/Dictionary+Utilities.swift # SessionUtilitiesKit/JobRunner/JobRunner.swift
1 year ago
jobVariants.remove(.groupLeaving),
jobVariants.remove(.configurationSync)
].compactMap { $0 }
)
let messageReceiveQueue: JobQueue = JobQueue(
type: .messageReceive,
// Explicitly serial as executing concurrently means message receives getting processed at
// different speeds which can result in:
// Small batches of messages appearing in the UI before larger batches
// Closed group messages encrypted with updated keys could start parsing before it's key
// update message has been processed (ie. guaranteed to fail)
executionType: .serial,
qos: .default,
jobVariants: [
jobVariants.remove(.messageReceive),
jobVariants.remove(.configMessageReceive)
].compactMap { $0 }
)
let attachmentDownloadQueue: JobQueue = JobQueue(
type: .attachmentDownload,
qos: .utility,
jobVariants: [
jobVariants.remove(.attachmentDownload)
].compactMap { $0 }
)
let generalQueue: JobQueue = JobQueue(
type: .general(number: 0),
qos: .utility,
jobVariants: Array(jobVariants)
)
return Atomic([
expirationUpdateQueue,
messageSendQueue,
messageReceiveQueue,
attachmentDownloadQueue,
generalQueue
].reduce(into: [:]) { prev, next in
next.jobVariants.forEach { variant in
prev[variant] = next
}
})
}()
internal static var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:])
fileprivate static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
private static var hasCompletedInitialBecomeActive: Atomic<Bool> = Atomic(false)
private static var shutdownBackgroundTask: Atomic<OWSBackgroundTask?> = Atomic(nil)
fileprivate static var canStartQueues: Atomic<Bool> = Atomic(false)
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
private static var blockingQueueDrainCallback: Atomic<[() -> ()]> = Atomic([])
fileprivate static var canStartNonBlockingQueue: Bool {
blockingQueue.wrappedValue?.hasStartedAtLeastOnce.wrappedValue == true &&
blockingQueue.wrappedValue?.isRunning.wrappedValue != true
}
// MARK: - Configuration
public static func add(executor: JobExecutor.Type, for variant: Job.Variant) {
executorMap.mutate { $0[variant] = executor }
}
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
public static func afterBlockingQueue(callback: @escaping () -> ()) {
guard
(blockingQueue.wrappedValue?.hasStartedAtLeastOnce.wrappedValue != true) ||
(blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
else { return callback() }
blockingQueueDrainCallback.mutate { $0.append(callback) }
}
// MARK: - Execution
/// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
/// the JobRunner
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
@discardableResult public static func add(_ db: Database, job: Job?, canStartJob: Bool = true) -> Job? {
// Store the job into the database (getting an id for it)
guard let updatedJob: Job = try? job?.inserted(db) else {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job")
return nil
}
guard !canStartJob || updatedJob.id != nil else {
SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id")
return nil
}
// Wait until the transaction has been completed before updating the queue (to ensure anything
// created during the transaction has been saved to the database before any corresponding jobs
// are run)
db.afterNextTransactionNested { _ in
queues.wrappedValue[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
queues.wrappedValue[updatedJob.variant]?.start()
}
return updatedJob
}
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
/// the JobRunner
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) {
guard let job: Job = job else { return } // Ignore null jobs
guard job.id != nil else {
add(db, job: job, canStartJob: canStartJob)
return
}
// Wait until the transaction has been completed before updating the queue (to ensure anything
// created during the transaction has been saved to the database before any corresponding jobs
// are run)
db.afterNextTransactionNested { _ in
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
queues.wrappedValue[job.variant]?.start()
}
}
/// Insert a job before another job in the queue
///
/// **Note:** This function assumes the relevant job queue is already running and as such **will not** start the queue if it isn't running
@discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? {
switch job?.behaviour {
case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch:
SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")")
return nil
default: break
}
// Store the job into the database (getting an id for it)
guard let updatedJob: Job = try? job?.inserted(db) else {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job")
return nil
}
guard let jobId: Int64 = updatedJob.id else {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id")
return nil
}
// Wait until the transaction has been completed before updating the queue (to ensure anything
// created during the transaction has been saved to the database before any corresponding jobs
// are run)
db.afterNextTransactionNested { _ in
queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
}
return (jobId, updatedJob)
}
public static func appDidFinishLaunching() {
// Flag that the JobRunner can start it's queues
JobRunner.canStartQueues.mutate { $0 = true }
// Note: 'appDidBecomeActive' will run on first launch anyway so we can
// leave those jobs out and can wait until then to start the JobRunner
let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = Storage.shared
.read { db in
let blockingJobs: [Job] = try Job
.filter(
[
Job.Behaviour.recurringOnLaunch,
Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour)
)
.filter(Job.Columns.shouldBlock == true)
.order(
Job.Columns.priority.desc,
Job.Columns.id
)
.fetchAll(db)
let nonblockingJobs: [Job] = try Job
.filter(
[
Job.Behaviour.recurringOnLaunch,
Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour)
)
.filter(Job.Columns.shouldBlock == false)
.order(
Job.Columns.priority.desc,
Job.Columns.id
)
.fetchAll(db)
return (blockingJobs, nonblockingJobs)
}
.defaulting(to: ([], []))
guard !jobsToRun.blocking.isEmpty || !jobsToRun.nonBlocking.isEmpty else { return }
// Add and start any blocking jobs
blockingQueue.wrappedValue?.appDidFinishLaunching(with: jobsToRun.blocking, canStart: true)
// Add any non-blocking jobs (we don't start these incase there are blocking "on active"
// jobs as well)
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant)
let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
jobsByVariant.forEach { variant, jobs in
jobQueues[variant]?.appDidFinishLaunching(with: jobs, canStart: false)
}
}
public static func appDidBecomeActive() {
// Flag that the JobRunner can start it's queues
JobRunner.canStartQueues.mutate { $0 = true }
// If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it
// can result in the database being suspended and us being unable to interact with it at all
shutdownBackgroundTask.mutate {
$0?.cancel()
$0 = nil
}
// Retrieve any jobs which should run when becoming active
let hasCompletedInitialBecomeActive: Bool = JobRunner.hasCompletedInitialBecomeActive.wrappedValue
let jobsToRun: [Job] = Storage.shared
.read { db in
return try Job
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
.order(
Job.Columns.priority.desc,
Job.Columns.id
)
.fetchAll(db)
}
.defaulting(to: [])
.filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive }
// Store the current queue state locally to avoid multiple atomic retrievals
let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
guard !jobsToRun.isEmpty else {
if !blockingQueueIsRunning {
jobQueues.forEach { _, queue in queue.start() }
}
return
}
// Add and start any non-blocking jobs (if there are no blocking jobs)
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.grouped(by: \.variant)
jobQueues.forEach { variant, queue in
queue.appDidBecomeActive(
with: (jobsByVariant[variant] ?? []),
canStart: !blockingQueueIsRunning
)
}
JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true }
}
/// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run
/// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their
/// failure - they _should_ be picked up again the next time the app is launched)
public static func stopAndClearPendingJobs(
exceptForVariant: Job.Variant? = nil,
onComplete: (() -> ())? = nil
) {
// Inform the JobRunner that it can't start any queues (this is to prevent queues from
// rescheduling themselves while in the background, when the app restarts or becomes active
// the JobRunenr will update this flag)
JobRunner.canStartQueues.mutate { $0 = false }
// Stop all queues except for the one containing the `exceptForVariant`
queues.wrappedValue
.values
.filter { queue -> Bool in
guard let exceptForVariant: Job.Variant = exceptForVariant else { return true }
return !queue.jobVariants.contains(exceptForVariant)
}
.forEach { $0.stopAndClearPendingJobs() }
// Ensure the queue is actually running (if not the trigger the callback immediately)
guard
let exceptForVariant: Job.Variant = exceptForVariant,
let queue: JobQueue = queues.wrappedValue[exceptForVariant],
queue.isRunning.wrappedValue == true
else {
onComplete?()
return
}
let oldQueueDrained: (() -> ())? = queue.onQueueDrained
// Create a backgroundTask to give the queue the chance to properly be drained
shutdownBackgroundTask.mutate {
$0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in
// If the background task didn't succeed then trigger the onComplete (and hope we have
// enough time to complete it's logic)
guard state != .cancelled else {
queue?.onQueueDrained = oldQueueDrained
return
}
guard state != .success else { return }
onComplete?()
queue?.onQueueDrained = oldQueueDrained
queue?.stopAndClearPendingJobs()
}
}
// Add a callback to be triggered once the queue is drained
queue.onQueueDrained = { [weak queue] in
oldQueueDrained?()
queue?.onQueueDrained = oldQueueDrained
onComplete?()
shutdownBackgroundTask.mutate { $0 = nil }
}
}
public static func isCurrentlyRunning(_ job: Job?) -> Bool {
guard let job: Job = job, let jobId: Int64 = job.id else { return false }
return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true)
}
public static func infoForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: JobInfo] {
return (queues.wrappedValue[variant]?.infoForAllCurrentlyRunningJobs())
.defaulting(to: [:])
}
public static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) {
guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else {
callback(.notFound)
return
}
queue.afterCurrentlyRunningJob(jobId, callback: callback)
}
public static func hasPendingOrRunningJob<T: Encodable>(
with variant: Job.Variant,
threadId: String? = nil,
interactionId: Int64? = nil,
details: T? = nil
) -> Bool {
guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false }
// Ensure we can encode the details (if provided)
let detailsData: Data? = details.map { try? JSONEncoder().encode($0) }
guard details == nil || detailsData != nil else { return false }
return targetQueue.hasPendingOrRunningJobWith(
threadId: threadId,
interactionId: interactionId,
detailsData: detailsData
)
}
public static func removePendingJob(_ job: Job?) {
guard let job: Job = job, let jobId: Int64 = job.id else { return }
queues.wrappedValue[job.variant]?.removePendingJob(jobId)
}
// MARK: - Convenience
fileprivate static func getRetryInterval(for job: Job) -> TimeInterval {
// Arbitrary backoff factor...
// try 1 delay: 0.5s
// try 2 delay: 1s
// ...
// try 5 delay: 16s
// ...
// try 11 delay: 512s
let maxBackoff: Double = 10 * 60 // 10 minutes
return 0.25 * min(maxBackoff, pow(2, Double(job.failureCount)))
}
}
// MARK: - JobQueue
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
public final class JobQueue {
public enum QueueType: Hashable {
case blocking
case general(number: Int)
case messageSend
case messageReceive
case attachmentDownload
case expirationUpdate
var name: String {
switch self {
case .blocking: return "Blocking"
case .general(let number): return "General-\(number)"
case .messageSend: return "MessageSend"
case .messageReceive: return "MessageReceive"
case .attachmentDownload: return "AttachmentDownload"
case .expirationUpdate: return "ExpirationUpdate"
}
}
}
fileprivate enum ExecutionType {
/// A serial queue will execute one job at a time until the queue is empty, then will load any new/deferred
/// jobs and run those one at a time
case serial
/// A concurrent queue will execute as many jobs as the device supports at once until the queue is empty,
/// then will load any new/deferred jobs and try to start them all
case concurrent
}
private class Trigger {
private var timer: Timer?
fileprivate var fireTimestamp: TimeInterval = 0
static func create(queue: JobQueue, timestamp: TimeInterval) -> Trigger? {
/// Setup the trigger (wait at least 1 second before triggering)
///
/// **Note:** We use the `Timer.scheduledTimerOnMainThread` method because running a timer
/// on our random queue threads results in the timer never firing, the `start` method will redirect itself to
/// the correct thread
let trigger: Trigger = Trigger()
trigger.fireTimestamp = max(1, (timestamp - Date().timeIntervalSince1970))
trigger.timer = Timer.scheduledTimerOnMainThread(
withTimeInterval: trigger.fireTimestamp,
repeats: false,
block: { [weak queue] _ in
queue?.start()
}
)
return trigger
}
func invalidate() {
// Need to do this to prevent a strong reference cycle
timer?.invalidate()
timer = nil
}
}
private static let deferralLoopThreshold: Int = 3
private let type: QueueType
private let executionType: ExecutionType
private let qosClass: DispatchQoS
private let queueKey: DispatchSpecificKey = DispatchSpecificKey<String>()
private let queueContext: String
/// The specific types of jobs this queue manages, if this is left empty it will handle all jobs not handled by other queues
fileprivate let jobVariants: [Job.Variant]
fileprivate var onQueueDrained: (() -> ())?
private lazy var internalQueue: DispatchQueue = {
let result: DispatchQueue = DispatchQueue(
label: self.queueContext,
qos: self.qosClass,
attributes: (self.executionType == .concurrent ? [.concurrent] : []),
autoreleaseFrequency: .inherit,
target: nil
)
result.setSpecific(key: queueKey, value: queueContext)
return result
}()
private var nextTrigger: Atomic<Trigger?> = Atomic(nil)
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
fileprivate var hasStartedAtLeastOnce: Atomic<Bool> = Atomic(false)
fileprivate var isRunning: Atomic<Bool> = Atomic(false)
private var queue: Atomic<[Job]> = Atomic([])
private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
private var currentlyRunningJobIds: Atomic<Set<Int64>> = Atomic([])
private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty }
// MARK: - Initialization
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
fileprivate init(
type: QueueType,
executionType: ExecutionType = .serial,
qos: DispatchQoS,
jobVariants: [Job.Variant],
onQueueDrained: (() -> ())? = nil
) {
self.type = type
self.executionType = executionType
self.queueContext = "JobQueue-\(type.name)"
self.qosClass = qos
self.jobVariants = jobVariants
self.onQueueDrained = onQueueDrained
}
// MARK: - Execution
fileprivate func add(_ job: Job, canStartJob: Bool = true) {
// Check if the job should be added to the queue
guard
canStartJob,
job.behaviour != .runOnceNextLaunch,
job.nextRunTimestamp <= Date().timeIntervalSince1970
else { return }
guard job.id != nil else {
SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue")
return
}
queue.mutate { $0.append(job) }
// If this is a concurrent queue then we should immediately start the next job
guard executionType == .concurrent else { return }
runNextJob()
}
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
/// the JobRunner
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
fileprivate func upsert(_ job: Job, canStartJob: Bool = true) {
guard let jobId: Int64 = job.id else {
SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue")
return
}
// Lock the queue while checking the index and inserting to ensure we don't run into
// any multi-threading shenanigans
//
// Note: currently running jobs are removed from the queue so we don't need to check
// the 'jobsCurrentlyRunning' set
var didUpdateExistingJob: Bool = false
queue.mutate { queue in
if let jobIndex: Array<Job>.Index = queue.firstIndex(where: { $0.id == jobId }) {
queue[jobIndex] = job
didUpdateExistingJob = true
}
}
// If we didn't update an existing job then we need to add it to the queue
guard !didUpdateExistingJob else { return }
add(job, canStartJob: canStartJob)
}
fileprivate func insert(_ job: Job, before otherJob: Job) {
guard job.id != nil else {
SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue")
return
}
// Insert the job before the current job (re-adding the current job to
// the start of the queue if it's not in there) - this will mean the new
// job will run and then the otherJob will run (or run again) once it's
// done
queue.mutate {
guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else {
$0.insert(contentsOf: [job, otherJob], at: 0)
return
}
$0.insert(job, at: otherJobIndex)
}
}
fileprivate func appDidFinishLaunching(with jobs: [Job], canStart: Bool) {
queue.mutate { $0.append(contentsOf: jobs) }
// Start the job runner if needed
if canStart && !isRunning.wrappedValue {
start()
}
}
fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) {
let currentlyRunningJobIds: Set<Int64> = currentlyRunningJobIds.wrappedValue
queue.mutate { queue in
// Avoid re-adding jobs to the queue that are already in it (this can
// happen if the user sends the app to the background before the 'onActive'
// jobs and then brings it back to the foreground)
let jobsNotAlreadyInQueue: [Job] = jobs
.filter { job in
!currentlyRunningJobIds.contains(job.id ?? -1) &&
!queue.contains(where: { $0.id == job.id })
}
queue.append(contentsOf: jobsNotAlreadyInQueue)
}
// Start the job runner if needed
if canStart && !isRunning.wrappedValue {
start()
}
}
fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool {
return currentlyRunningJobIds.wrappedValue.contains(jobId)
}
fileprivate func infoForAllCurrentlyRunningJobs() -> [Int64: JobRunner.JobInfo] {
return currentlyRunningJobInfo.wrappedValue
}
fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) {
guard isCurrentlyRunning(jobId) else {
callback(.notFound)
return
}
jobCallbacks.mutate { jobCallbacks in
jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback)
}
}
fileprivate func hasPendingOrRunningJobWith(
threadId: String? = nil,
interactionId: Int64? = nil,
detailsData: Data? = nil
) -> Bool {
let pendingJobs: [Job] = queue.wrappedValue
let currentlyRunningJobInfo: [Int64: JobRunner.JobInfo] = currentlyRunningJobInfo.wrappedValue
var possibleJobIds: Set<Int64> = Set(currentlyRunningJobInfo.keys)
.inserting(contentsOf: pendingJobs.compactMap { $0.id }.asSet())
// Remove any which don't have the matching threadId (if provided)
if let targetThreadId: String = threadId {
let pendingJobIdsWithWrongThreadId: Set<Int64> = pendingJobs
.filter { $0.threadId != targetThreadId }
.compactMap { $0.id }
.asSet()
let runningJobIdsWithWrongThreadId: Set<Int64> = currentlyRunningJobInfo
.filter { _, info -> Bool in info.threadId != targetThreadId }
.map { key, _ in key }
.asSet()
possibleJobIds = possibleJobIds
.subtracting(pendingJobIdsWithWrongThreadId)
.subtracting(runningJobIdsWithWrongThreadId)
}
// Remove any which don't have the matching interactionId (if provided)
if let targetInteractionId: Int64 = interactionId {
let pendingJobIdsWithWrongInteractionId: Set<Int64> = pendingJobs
.filter { $0.interactionId != targetInteractionId }
.compactMap { $0.id }
.asSet()
let runningJobIdsWithWrongInteractionId: Set<Int64> = currentlyRunningJobInfo
.filter { _, info -> Bool in info.interactionId != targetInteractionId }
.map { key, _ in key }
.asSet()
possibleJobIds = possibleJobIds
.subtracting(pendingJobIdsWithWrongInteractionId)
.subtracting(runningJobIdsWithWrongInteractionId)
}
// Remove any which don't have the matching details (if provided)
if let targetDetailsData: Data = detailsData {
let pendingJobIdsWithWrongDetailsData: Set<Int64> = pendingJobs
.filter { $0.details != targetDetailsData }
.compactMap { $0.id }
.asSet()
let runningJobIdsWithWrongDetailsData: Set<Int64> = currentlyRunningJobInfo
.filter { _, info -> Bool in info.detailsData != detailsData }
.map { key, _ in key }
.asSet()
possibleJobIds = possibleJobIds
.subtracting(pendingJobIdsWithWrongDetailsData)
.subtracting(runningJobIdsWithWrongDetailsData)
}
return !possibleJobIds.isEmpty
}
fileprivate func removePendingJob(_ jobId: Int64) {
queue.mutate { queue in
queue = queue.filter { $0.id != jobId }
}
}
// MARK: - Job Running
fileprivate func start(force: Bool = false) {
// We only want the JobRunner to run in the main app
guard
HasAppContext() &&
CurrentAppContext().isMainApp &&
Merge remote-tracking branch 'upstream/dev' into feature/updated-user-config-handling # Conflicts: # Session/Media Viewing & Editing/PhotoCapture.swift # Session/Meta/Translations/de.lproj/Localizable.strings # Session/Meta/Translations/en.lproj/Localizable.strings # Session/Meta/Translations/es.lproj/Localizable.strings # Session/Meta/Translations/fa.lproj/Localizable.strings # Session/Meta/Translations/fi.lproj/Localizable.strings # Session/Meta/Translations/fr.lproj/Localizable.strings # Session/Meta/Translations/hi.lproj/Localizable.strings # Session/Meta/Translations/hr.lproj/Localizable.strings # Session/Meta/Translations/id-ID.lproj/Localizable.strings # Session/Meta/Translations/it.lproj/Localizable.strings # Session/Meta/Translations/ja.lproj/Localizable.strings # Session/Meta/Translations/nl.lproj/Localizable.strings # Session/Meta/Translations/pl.lproj/Localizable.strings # Session/Meta/Translations/pt_BR.lproj/Localizable.strings # Session/Meta/Translations/ru.lproj/Localizable.strings # Session/Meta/Translations/si.lproj/Localizable.strings # Session/Meta/Translations/sk.lproj/Localizable.strings # Session/Meta/Translations/sv.lproj/Localizable.strings # Session/Meta/Translations/th.lproj/Localizable.strings # Session/Meta/Translations/vi-VN.lproj/Localizable.strings # Session/Meta/Translations/zh-Hant.lproj/Localizable.strings # Session/Meta/Translations/zh_CN.lproj/Localizable.strings # Session/Notifications/AppNotifications.swift # Session/Onboarding/RestoreVC.swift # Session/Shared/SessionTableViewController.swift # Session/Shared/SessionTableViewModel.swift # SessionMessagingKit/Calls/WebRTCSession.swift # SessionMessagingKit/Database/Models/Attachment.swift # SessionMessagingKit/Database/Models/DisappearingMessageConfiguration.swift # SessionMessagingKit/File Server/FileServerAPI.swift # SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift # SessionMessagingKit/Open Groups/OpenGroupAPI.swift # SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift # SessionMessagingKit/Sending & Receiving/MessageReceiver.swift # SessionMessagingKit/Sending & Receiving/MessageSender.swift # SessionSnodeKit/OnionRequestAPI.swift # SessionSnodeKit/SnodeAPI.swift # SessionUtilitiesKit/Database/Models/Identity.swift # SessionUtilitiesKit/JobRunner/JobRunner.swift
2 years ago
!CurrentAppContext().isRunningTests &&
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
JobRunner.canStartQueues.wrappedValue &&
(
type == .blocking ||
JobRunner.canStartNonBlockingQueue
)
else { return }
guard force || !isRunning.wrappedValue else { return }
// The JobRunner runs synchronously we need to ensure this doesn't start
// on the main thread (if it is on the main thread then swap to a different thread)
guard DispatchQueue.getSpecific(key: queueKey) == queueContext else {
internalQueue.async { [weak self] in
self?.start()
}
return
}
// Flag the JobRunner as running (to prevent something else from trying to start it
// and messing with the execution behaviour)
var wasAlreadyRunning: Bool = false
isRunning.mutate { isRunning in
wasAlreadyRunning = isRunning
isRunning = true
}
Fixed a number of issues found during internal testing Added copy for an unrecoverable startup case Added some additional logs to better debug ValueObservation query errors Increased the pageSize to 20 on iPad devices (to prevent it immediately loading a second page) Cleaned up a bunch of threading logic (try to avoid overriding subscribe/receive threads specified at subscription) Consolidated the 'sendMessage' and 'sendAttachments' functions Updated the various frameworks to use 'DAWRF with DSYM' to allow for better debugging during debug mode (at the cost of a longer build time) Updated the logic to optimistically insert messages when sending to avoid any database write delays Updated the logic to avoid sending notifications for messages which are already marked as read by the config Fixed an issue where multiple paths could incorrectly get built at the same time in some cases Fixed an issue where other job queues could be started before the blockingQueue finishes Fixed a potential bug with the snode version comparison (was just a string comparison which would fail when getting to double-digit values) Fixed a bug where you couldn't remove the last reaction on a message Fixed the broken media message zoom animations Fixed a bug where the last message read in a conversation wouldn't be correctly detected as already read Fixed a bug where the QuoteView had no line limits (resulting in the '@You' mention background highlight being incorrectly positioned in the quote preview) Fixed a bug where a large number of configSyncJobs could be scheduled (only one would run at a time but this could result in performance impacts)
1 year ago
hasStartedAtLeastOnce.mutate { $0 = true }
// Get any pending jobs
let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
let jobsAlreadyInQueue: Set<Int64> = queue.wrappedValue.compactMap { $0.id }.asSet()
let jobsToRun: [Job] = Storage.shared.read { db in
try Job
.filterPendingJobs(
variants: jobVariants,
excludeFutureJobs: true,
includeJobsWithDependencies: false
)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue
.fetchAll(db)
}
.defaulting(to: [])
// Determine the number of jobs to run
var jobCount: Int = 0
queue.mutate { queue in
queue.append(contentsOf: jobsToRun)
jobCount = queue.count
}
// If there are no pending jobs and nothing in the queue then schedule the JobRunner
// to start again when the next scheduled job should start
guard jobCount > 0 else {
if jobIdsAlreadyRunning.isEmpty {
isRunning.mutate { $0 = false }
scheduleNextSoonestJob()
}
return
}
// Run the first job in the queue
if !wasAlreadyRunning {
SNLog("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))")
}
runNextJob()
}
fileprivate func stopAndClearPendingJobs() {
isRunning.mutate { $0 = false }
queue.mutate { $0 = [] }
deferLoopTracker.mutate { $0 = [:] }
}
private func runNextJob() {
// Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job)
guard isRunning.wrappedValue else { return }
// Ensure this is running on the correct queue
guard DispatchQueue.getSpecific(key: queueKey) == queueContext else {
internalQueue.async { [weak self] in
self?.runNextJob()
}
return
}
guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else {
// If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag
if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
isRunning.mutate { $0 = false }
}
// Always attempt to schedule the next soonest job (otherwise if enough jobs get started in rapid
// succession then pending/failed jobs in the database may never get re-started in a concurrent queue)
scheduleNextSoonestJob()
return
}
guard let jobExecutor: JobExecutor.Type = JobRunner.executorMap.wrappedValue[nextJob.variant] else {
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing executor")
handleJobFailed(nextJob, error: JobRunnerError.executorMissing, permanentFailure: true)
return
}
guard !jobExecutor.requiresThreadId || nextJob.threadId != nil else {
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required threadId")
handleJobFailed(nextJob, error: JobRunnerError.requiredThreadIdMissing, permanentFailure: true)
return
}
guard !jobExecutor.requiresInteractionId || nextJob.interactionId != nil else {
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required interactionId")
handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true)
return
}
guard nextJob.id != nil else {
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id")
handleJobFailed(nextJob, error: JobRunnerError.jobIdMissing, permanentFailure: false)
return
}
// If the 'nextRunTimestamp' for the job is in the future then don't run it yet
guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else {
handleJobDeferred(nextJob)
return
}
// Check if the next job has any dependencies
let dependencyInfo: (expectedCount: Int, jobs: Set<Job>) = Storage.shared.read { db in
let expectedDependencies: Set<JobDependencies> = try JobDependencies
.filter(JobDependencies.Columns.jobId == nextJob.id)
.fetchSet(db)
let jobDependencies: Set<Job> = try Job
.filter(ids: expectedDependencies.compactMap { $0.dependantId })
.fetchSet(db)
return (expectedDependencies.count, jobDependencies)
}
.defaulting(to: (0, []))
guard dependencyInfo.jobs.count == dependencyInfo.expectedCount else {
SNLog("[JobRunner] \(queueContext) found job with missing dependencies, removing the job")
handleJobFailed(nextJob, error: JobRunnerError.missingDependencies, permanentFailure: true)
return
}
guard dependencyInfo.jobs.isEmpty else {
SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first")
/// Remove all jobs this one is dependant on that aren't currently running from the queue and re-insert them at the start
/// of the queue
///
/// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies
/// are successfully completed
let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue)
let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
queue.mutate { queue in
queue = queue
.filter { !dependencyJobsNotCurrentlyRunning.contains($0) }
.inserting(contentsOf: dependencyJobsNotCurrentlyRunning, at: 0)
}
handleJobDeferred(nextJob)
return
}
// Update the state to indicate the particular job is running
//
// Note: We need to store 'numJobsRemaining' in it's own variable because
// the 'SNLog' seems to dispatch to it's own queue which ends up getting
// blocked by the JobRunner's queue becuase 'jobQueue' is Atomic
var numJobsRunning: Int = 0
nextTrigger.mutate { trigger in
trigger?.invalidate() // Need to invalidate to prevent a memory leak
trigger = nil
}
currentlyRunningJobIds.mutate { currentlyRunningJobIds in
currentlyRunningJobIds = currentlyRunningJobIds.inserting(nextJob.id)
numJobsRunning = currentlyRunningJobIds.count
}
currentlyRunningJobInfo.mutate { currentlyRunningJobInfo in
currentlyRunningJobInfo = currentlyRunningJobInfo.setting(
nextJob.id,
JobRunner.JobInfo(
threadId: nextJob.threadId,
interactionId: nextJob.interactionId,
detailsData: nextJob.details
)
)
}
SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)")
/// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to
/// the queue which means an odd situation can occasionally occur where the `finished` event can actually run before the `output`
/// event - this can result in unexpected behaviours (for more information see https://github.com/groue/GRDB.swift/issues/1334)
///
/// Due to this if a job is meant to run on a concurrent queue then we actually want to create a temporary serial queue just for the execution
/// of that job
let targetQueue: DispatchQueue = {
guard executionType == .concurrent else { return internalQueue }
return DispatchQueue(
label: "\(self.queueContext)-serial",
qos: self.qosClass,
attributes: [],
autoreleaseFrequency: .inherit,
target: nil
)
}()
jobExecutor.run(
nextJob,
queue: targetQueue,
success: handleJobSucceeded,
failure: handleJobFailed,
deferred: handleJobDeferred
)
// If this queue executes concurrently and there are still jobs remaining then immediately attempt
// to start the next job
if executionType == .concurrent && numJobsRemaining > 0 {
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
}
private func scheduleNextSoonestJob() {
let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in
try Job
.filterPendingJobs(
variants: jobVariants,
excludeFutureJobs: false,
includeJobsWithDependencies: false
)
.select(.nextRunTimestamp)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.asRequest(of: TimeInterval.self)
.fetchOne(db)
}
// If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger
// the 'onQueueDrained' callback and stop
guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else {
if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
self.onQueueDrained?()
}
return
}
// If the next job isn't scheduled in the future then just restart the JobRunner immediately
let secondsUntilNextJob: TimeInterval = (nextJobTimestamp - Date().timeIntervalSince1970)
guard secondsUntilNextJob > 0 else {
// Only log that the queue is getting restarted if this queue had actually been about to stop
if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
let timingString: String = (nextJobTimestamp == 0 ?
"that should be in the queue" :
"scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s") ago"
)
SNLog("[JobRunner] Restarting \(queueContext) immediately for job \(timingString)")
}
// Trigger the 'start' function to load in any pending jobs that aren't already in the
// queue (for concurrent queues we want to force them to load in pending jobs and add
// them to the queue regardless of whether the queue is already running)
internalQueue.async { [weak self] in
self?.start(force: (self?.executionType == .concurrent))
}
return
}
// Only schedule a trigger if this queue has actually completed
guard executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else { return }
// Setup a trigger
SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")")
nextTrigger.mutate { trigger in
trigger?.invalidate() // Need to invalidate the old trigger to prevent a memory leak
trigger = Trigger.create(queue: self, timestamp: nextJobTimestamp)
}
}
// MARK: - Handling Results
/// This function is called when a job succeeds
private func handleJobSucceeded(_ job: Job, shouldStop: Bool) {
/// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is
/// removed so we need to retrieve these records before that happens)
let dependantJobs: [Job] = Storage.shared
.read { db in try job.dependantJobs.fetchAll(db) }
.defaulting(to: [])
switch job.behaviour {
case .runOnce, .runOnceNextLaunch:
Storage.shared.write { db in
// First remove any JobDependencies requiring this job to be completed (if
// we don't then the dependant jobs will automatically be deleted)
_ = try JobDependencies
.filter(JobDependencies.Columns.dependantId == job.id)
.deleteAll(db)
_ = try job.delete(db)
}
case .recurring where shouldStop == true:
Storage.shared.write { db in
// First remove any JobDependencies requiring this job to be completed (if
// we don't then the dependant jobs will automatically be deleted)
_ = try JobDependencies
.filter(JobDependencies.Columns.dependantId == job.id)
.deleteAll(db)
_ = try job.delete(db)
}
// For `recurring` jobs which have already run, they should automatically run again
// but we want at least 1 second to pass before doing so - the job itself should
// really update it's own 'nextRunTimestamp' (this is just a safety net)
case .recurring where job.nextRunTimestamp <= Date().timeIntervalSince1970:
guard let jobId: Int64 = job.id else { break }
Storage.shared.write { db in
_ = try Job
.filter(id: jobId)
.updateAll(
db,
Job.Columns.failureCount.set(to: 0),
Job.Columns.nextRunTimestamp.set(to: (Date().timeIntervalSince1970 + 1))
)
}
// For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to
// clear their `failureCount` and `nextRunTimestamp` to prevent them from endlessly running
// over and over again
case .recurringOnLaunch, .recurringOnActive:
guard
let jobId: Int64 = job.id,
job.failureCount != 0 &&
job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude
else { break }
Storage.shared.write { db in
_ = try Job
.filter(id: jobId)
.updateAll(
db,
Job.Columns.failureCount.set(to: 0),
Job.Columns.nextRunTimestamp.set(to: 0)
)
}
default: break
}
/// Now that the job has been completed we want to insert any jobs that were dependant on it, that aren't already running
/// to the start of the queue (the most likely case is that we want an entire job chain to be completed at the same time rather
/// than being blocked by other unrelated jobs)
///
/// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
/// removed from the queue, replaced by their dependencies
if !dependantJobs.isEmpty {
let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue)
let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
queue.mutate { queue in
queue = queue
.filter { !dependantJobsNotCurrentlyRunning.contains($0) }
.inserting(contentsOf: dependantJobsNotCurrentlyRunning, at: 0)
}
}
// Perform job cleanup and start the next job
performCleanUp(for: job, result: .succeeded)
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
/// This function is called when a job fails, if it's wasn't a permanent failure then the 'failureCount' for the job will be incremented and it'll
/// be re-run after a retry interval has passed
private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) {
guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else {
SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled")
performCleanUp(for: job, result: .failed)
internalQueue.async { [weak self] in
self?.runNextJob()
}
return
}
// If this is the blocking queue and a "blocking" job failed then rerun it
// immediately (in this case we don't trigger any job callbacks because the
// job isn't actually done, it's going to try again immediately)
if self.type == .blocking && job.shouldBlock {
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately")
// If it was a possible deferral loop then we don't actually want to
// retry the job (even if it's a blocking one, this gives a small chance
// that the app could continue to function)
let wasPossibleDeferralLoop: Bool = {
if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true }
return false
}()
performCleanUp(
for: job,
result: .failed,
shouldTriggerCallbacks: wasPossibleDeferralLoop
)
// Only add it back to the queue if it wasn't a deferral loop
if !wasPossibleDeferralLoop {
queue.mutate { $0.insert(job, at: 0) }
}
internalQueue.async { [weak self] in
self?.runNextJob()
}
return
}
// Get the max failure count for the job (a value of '-1' means it will retry indefinitely)
let maxFailureCount: Int = (JobRunner.executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0)
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job))
var dependantJobIds: [Int64] = []
var failureText: String = "failed"
Storage.shared.write { db in
/// Retrieve a list of dependant jobs so we can clear them from the queue
dependantJobIds = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
/// Delete/update the failed jobs and any dependencies
let updatedFailureCount: UInt = (job.failureCount + 1)
guard
!permanentFailure && (
maxFailureCount < 0 ||
updatedFailureCount <= maxFailureCount
)
else {
failureText = (maxFailureCount >= 0 && updatedFailureCount > maxFailureCount ?
"failed permanently; too many retries" :
"failed permanently"
)
// If the job permanently failed or we have performed all of our retry attempts
// then delete the job and all of it's dependant jobs (it'll probably never succeed)
_ = try job.dependantJobs
.deleteAll(db)
_ = try job.delete(db)
return
}
failureText = "failed; scheduling retry (failure count is \(updatedFailureCount))"
_ = try job
.with(
failureCount: updatedFailureCount,
nextRunTimestamp: nextRunTimestamp
)
.saved(db)
// Update the failureCount and nextRunTimestamp on dependant jobs as well (update the
// 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated it'll
// come after the dependency)
try job.dependantJobs
.updateAll(
db,
Job.Columns.failureCount.set(to: updatedFailureCount),
Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000)))
)
}
/// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try
/// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
SNLog("[JobRunner] \(queueContext) \(job.variant) job \(failureText)")
performCleanUp(for: job, result: .failed)
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
/// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant
/// on other jobs, and it should automatically manage those dependencies)
private func handleJobDeferred(_ job: Job) {
var stuckInDeferLoop: Bool = false
deferLoopTracker.mutate {
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
$0 = $0.setting(
job.id,
(1, [Date().timeIntervalSince1970])
)
return
}
let timeNow: TimeInterval = Date().timeIntervalSince1970
stuckInDeferLoop = (
lastRecord.count >= JobQueue.deferralLoopThreshold &&
(timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count)
)
$0 = $0.setting(
job.id,
(
lastRecord.count + 1,
// Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster
// than one loop per second
lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow]
)
)
}
// It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately
// defers itself but then attempts to run again (resulting in an infinite loop); this won't block
// the app since it's on a background thread but can result in 100% of a CPU being used (and a
// battery drain)
//
// This code will maintain an in-memory store for any jobs which are deferred too quickly (ie.
// more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds)
guard !stuckInDeferLoop else {
deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) }
handleJobFailed(job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false)
return
}
performCleanUp(for: job, result: .deferred)
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) {
// The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set
currentlyRunningJobIds.mutate { $0 = $0.removing(job.id) }
currentlyRunningJobInfo.mutate { $0 = $0.removingValue(forKey: job.id) }
guard shouldTriggerCallbacks else { return }
// Run any job callbacks now that it's done
var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = []
jobCallbacks.mutate { jobCallbacks in
jobCallbacksToRun = (jobCallbacks[job.id] ?? [])
jobCallbacks = jobCallbacks.removingValue(forKey: job.id)
}
DispatchQueue.global(qos: .default).async {
jobCallbacksToRun.forEach { $0(result) }
}
}
}