// // Copyright (c) 2018 Open Whisper Systems. All rights reserved. // import Foundation /// JobQueue - A durable work queue /// /// When work needs to be done, add it to the JobQueue. /// The JobQueue will persist a JobRecord to be sure that work can be restarted if the app is killed. /// /// The actual work, is carried out in a DurableOperation which the JobQueue spins off, based on the contents /// of a JobRecord. /// /// For a concrete example, take message sending. /// Add an outgoing message to the MessageSenderJobQueue, which first records a SSKMessageSenderJobRecord. /// The MessageSenderJobQueue then uses that SSKMessageSenderJobRecord to create a MessageSenderOperation which /// takes care of the actual business of communicating with the service. /// /// DurableOperations are retryable - via their `remainingRetries` logic. However, if the operation encounters /// an error where `error.isRetryable == false`, the operation will fail, regardless of available retries. public extension Error { var isRetryable: Bool { return (self as NSError).isRetryable } } extension SSKJobRecordStatus: CustomStringConvertible { public var description: String { switch self { case .ready: return "ready" case .unknown: return "unknown" case .running: return "running" case .permanentlyFailed: return "permanentlyFailed" case .obsolete: return "obsolete" } } } public enum JobError: Error { case assertionFailure(description: String) case obsolete(description: String) } public protocol DurableOperation: class { associatedtype JobRecordType: SSKJobRecord associatedtype DurableOperationDelegateType: DurableOperationDelegate var jobRecord: JobRecordType { get } var durableOperationDelegate: DurableOperationDelegateType? { get set } var operation: OWSOperation { get } var remainingRetries: UInt { get set } } public protocol DurableOperationDelegate: class { associatedtype DurableOperationType: DurableOperation func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction) func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) } public protocol JobQueue: DurableOperationDelegate { typealias DurableOperationDelegateType = Self typealias JobRecordType = DurableOperationType.JobRecordType // MARK: Dependencies var dbConnection: YapDatabaseConnection { get } var finder: JobRecordFinder { get } // MARK: Default Implementations func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) func restartOldJobs() func workStep() func defaultSetup() // MARK: Required var runningOperations: [DurableOperationType] { get set } var jobRecordLabel: String { get } var isSetup: Bool { get set } func setup() func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) func operationQueue(jobRecord: JobRecordType) -> OperationQueue func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType /// When `requiresInternet` is true, we immediately run any jobs which are waiting for retry upon detecting Reachability. /// /// Because `Reachability` isn't 100% reliable, the jobs will be attempted regardless of what we think our current Reachability is. /// However, because these jobs will likely fail many times in succession, their `retryInterval` could be quite long by the time we /// are back online. var requiresInternet: Bool { get } static var maxRetries: UInt { get } } public extension JobQueue { // MARK: Dependencies var dbConnection: YapDatabaseConnection { return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection } var finder: JobRecordFinder { return JobRecordFinder() } var reachabilityManager: SSKReachabilityManager { return SSKEnvironment.shared.reachabilityManager } // MARK: func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) { assert(jobRecord.status == .ready) jobRecord.save(with: transaction) transaction.addCompletionQueue(DispatchQueue.global()) { self.startWorkWhenAppIsReady() } } func startWorkWhenAppIsReady() { guard !CurrentAppContext().isRunningTests else { DispatchQueue.global().async { self.workStep() } return } AppReadiness.runNowOrWhenAppDidBecomeReady { DispatchQueue.global().async { self.workStep() } } } func workStep() { Logger.debug("") guard isSetup else { if !CurrentAppContext().isRunningTests { owsFailDebug("not setup") } return } Storage.writeSync { transaction in guard let nextJob: JobRecordType = self.finder.getNextReady(label: self.jobRecordLabel, transaction: transaction) as? JobRecordType else { Logger.verbose("nothing left to enqueue") return } do { try nextJob.saveAsStarted(transaction: transaction) let operationQueue = self.operationQueue(jobRecord: nextJob) let durableOperation = try self.buildOperation(jobRecord: nextJob, transaction: transaction) durableOperation.durableOperationDelegate = self as? Self.DurableOperationType.DurableOperationDelegateType assert(durableOperation.durableOperationDelegate != nil) let remainingRetries = self.remainingRetries(durableOperation: durableOperation) durableOperation.remainingRetries = remainingRetries self.runningOperations.append(durableOperation) Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)") operationQueue.addOperation(durableOperation.operation) } catch JobError.assertionFailure(let description) { owsFailDebug("assertion failure: \(description)") nextJob.saveAsPermanentlyFailed(transaction: transaction) } catch JobError.obsolete(let description) { // TODO is this even worthwhile to have obsolete state? Should we just delete the task outright? Logger.verbose("marking obsolete task as such. description:\(description)") nextJob.saveAsObsolete(transaction: transaction) } catch { owsFailDebug("unexpected error") } DispatchQueue.global().async { self.workStep() } } } public func restartOldJobs() { Storage.writeSync { transaction in let runningRecords = self.finder.allRecords(label: self.jobRecordLabel, status: .running, transaction: transaction) Logger.info("marking old `running` JobRecords as ready: \(runningRecords.count)") for record in runningRecords { guard let jobRecord = record as? JobRecordType else { owsFailDebug("unexpectred jobRecord: \(record)") continue } do { try jobRecord.saveRunningAsReady(transaction: transaction) self.didMarkAsReady(oldJobRecord: jobRecord, transaction: transaction) } catch { owsFailDebug("failed to mark old running records as ready error: \(error)") jobRecord.saveAsPermanentlyFailed(transaction: transaction) } } } } /// Unless you need special handling, your setup method can be as simple as /// /// func setup() { /// defaultSetup() /// } /// /// So you might ask, why not just rename this method to `setup`? Because /// `setup` is called from objc, and default implementations from a protocol /// cannot be marked as @objc. func defaultSetup() { guard !isSetup else { owsFailDebug("already ready already") return } self.restartOldJobs() if self.requiresInternet { NotificationCenter.default.addObserver(forName: .reachabilityChanged, object: self.reachabilityManager.observationContext, queue: nil) { _ in if self.reachabilityManager.isReachable { Logger.verbose("isReachable: true") self.becameReachable() } else { Logger.verbose("isReachable: false") } } } self.isSetup = true self.startWorkWhenAppIsReady() } func remainingRetries(durableOperation: DurableOperationType) -> UInt { let maxRetries = type(of: self).maxRetries let failureCount = durableOperation.jobRecord.failureCount guard maxRetries > failureCount else { return 0 } return maxRetries - failureCount } func becameReachable() { guard requiresInternet else { owsFailDebug("should only be called if `requiresInternet` is true") return } _ = self.runAnyQueuedRetry() } func runAnyQueuedRetry() -> DurableOperationType? { guard let runningDurableOperation = self.runningOperations.first else { return nil } runningDurableOperation.operation.runAnyQueuedRetry() return runningDurableOperation } // MARK: DurableOperationDelegate func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) { self.runningOperations = self.runningOperations.filter { $0 !== operation } operation.jobRecord.remove(with: transaction) } func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction) { do { try operation.jobRecord.addFailure(transaction: transaction) } catch { owsFailDebug("error while addingFailure: \(error)") operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction) } } func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) { self.runningOperations = self.runningOperations.filter { $0 !== operation } operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction) } }