mirror of https://github.com/oxen-io/session-ios
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
304 lines
11 KiB
Swift
304 lines
11 KiB
Swift
5 years ago
|
//
|
||
|
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
|
||
|
//
|
||
|
|
||
|
import Foundation
|
||
|
|
||
|
/// JobQueue - A durable work queue
|
||
|
///
|
||
|
/// When work needs to be done, add it to the JobQueue.
|
||
|
/// The JobQueue will persist a JobRecord to be sure that work can be restarted if the app is killed.
|
||
|
///
|
||
|
/// The actual work, is carried out in a DurableOperation which the JobQueue spins off, based on the contents
|
||
|
/// of a JobRecord.
|
||
|
///
|
||
|
/// For a concrete example, take message sending.
|
||
|
/// Add an outgoing message to the MessageSenderJobQueue, which first records a SSKMessageSenderJobRecord.
|
||
|
/// The MessageSenderJobQueue then uses that SSKMessageSenderJobRecord to create a MessageSenderOperation which
|
||
|
/// takes care of the actual business of communicating with the service.
|
||
|
///
|
||
|
/// DurableOperations are retryable - via their `remainingRetries` logic. However, if the operation encounters
|
||
|
/// an error where `error.isRetryable == false`, the operation will fail, regardless of available retries.
|
||
|
|
||
|
public extension Error {
|
||
|
var isRetryable: Bool {
|
||
|
return (self as NSError).isRetryable
|
||
|
}
|
||
|
}
|
||
|
|
||
|
extension SSKJobRecordStatus: CustomStringConvertible {
|
||
|
public var description: String {
|
||
|
switch self {
|
||
|
case .ready:
|
||
|
return "ready"
|
||
|
case .unknown:
|
||
|
return "unknown"
|
||
|
case .running:
|
||
|
return "running"
|
||
|
case .permanentlyFailed:
|
||
|
return "permanentlyFailed"
|
||
|
case .obsolete:
|
||
|
return "obsolete"
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public enum JobError: Error {
|
||
|
case assertionFailure(description: String)
|
||
|
case obsolete(description: String)
|
||
|
}
|
||
|
|
||
|
public protocol DurableOperation: class {
|
||
|
associatedtype JobRecordType: SSKJobRecord
|
||
|
associatedtype DurableOperationDelegateType: DurableOperationDelegate
|
||
|
|
||
|
var jobRecord: JobRecordType { get }
|
||
|
var durableOperationDelegate: DurableOperationDelegateType? { get set }
|
||
|
var operation: OWSOperation { get }
|
||
|
var remainingRetries: UInt { get set }
|
||
|
}
|
||
|
|
||
|
public protocol DurableOperationDelegate: class {
|
||
|
associatedtype DurableOperationType: DurableOperation
|
||
|
|
||
|
func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction)
|
||
|
func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction)
|
||
|
func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction)
|
||
|
}
|
||
|
|
||
|
public protocol JobQueue: DurableOperationDelegate {
|
||
|
typealias DurableOperationDelegateType = Self
|
||
|
typealias JobRecordType = DurableOperationType.JobRecordType
|
||
|
|
||
|
// MARK: Dependencies
|
||
|
|
||
|
var dbConnection: YapDatabaseConnection { get }
|
||
|
var finder: JobRecordFinder { get }
|
||
|
|
||
|
// MARK: Default Implementations
|
||
|
|
||
|
func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction)
|
||
|
func restartOldJobs()
|
||
|
func workStep()
|
||
|
func defaultSetup()
|
||
|
|
||
|
// MARK: Required
|
||
|
|
||
|
var runningOperations: [DurableOperationType] { get set }
|
||
|
var jobRecordLabel: String { get }
|
||
|
|
||
|
var isSetup: Bool { get set }
|
||
|
func setup()
|
||
|
func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction)
|
||
|
|
||
|
func operationQueue(jobRecord: JobRecordType) -> OperationQueue
|
||
|
func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType
|
||
|
|
||
|
/// When `requiresInternet` is true, we immediately run any jobs which are waiting for retry upon detecting Reachability.
|
||
|
///
|
||
|
/// Because `Reachability` isn't 100% reliable, the jobs will be attempted regardless of what we think our current Reachability is.
|
||
|
/// However, because these jobs will likely fail many times in succession, their `retryInterval` could be quite long by the time we
|
||
|
/// are back online.
|
||
|
var requiresInternet: Bool { get }
|
||
|
static var maxRetries: UInt { get }
|
||
|
}
|
||
|
|
||
|
public extension JobQueue {
|
||
|
|
||
|
// MARK: Dependencies
|
||
|
|
||
|
var dbConnection: YapDatabaseConnection {
|
||
|
return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection
|
||
|
}
|
||
|
|
||
|
var finder: JobRecordFinder {
|
||
|
return JobRecordFinder()
|
||
|
}
|
||
|
|
||
|
var reachabilityManager: SSKReachabilityManager {
|
||
|
return SSKEnvironment.shared.reachabilityManager
|
||
|
}
|
||
|
|
||
|
// MARK:
|
||
|
|
||
|
func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) {
|
||
|
assert(jobRecord.status == .ready)
|
||
|
|
||
|
jobRecord.save(with: transaction)
|
||
|
|
||
|
transaction.addCompletionQueue(DispatchQueue.global()) {
|
||
|
self.startWorkWhenAppIsReady()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func startWorkWhenAppIsReady() {
|
||
|
guard !CurrentAppContext().isRunningTests else {
|
||
|
DispatchQueue.global().async {
|
||
|
self.workStep()
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
AppReadiness.runNowOrWhenAppDidBecomeReady {
|
||
|
DispatchQueue.global().async {
|
||
|
self.workStep()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func workStep() {
|
||
|
Logger.debug("")
|
||
|
|
||
|
guard isSetup else {
|
||
|
if !CurrentAppContext().isRunningTests {
|
||
|
owsFailDebug("not setup")
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
Storage.writeSync { transaction in
|
||
|
guard let nextJob: JobRecordType = self.finder.getNextReady(label: self.jobRecordLabel, transaction: transaction) as? JobRecordType else {
|
||
|
Logger.verbose("nothing left to enqueue")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
do {
|
||
|
try nextJob.saveAsStarted(transaction: transaction)
|
||
|
|
||
|
let operationQueue = self.operationQueue(jobRecord: nextJob)
|
||
|
let durableOperation = try self.buildOperation(jobRecord: nextJob, transaction: transaction)
|
||
|
|
||
|
durableOperation.durableOperationDelegate = self as? Self.DurableOperationType.DurableOperationDelegateType
|
||
|
assert(durableOperation.durableOperationDelegate != nil)
|
||
|
|
||
|
let remainingRetries = self.remainingRetries(durableOperation: durableOperation)
|
||
|
durableOperation.remainingRetries = remainingRetries
|
||
|
|
||
|
self.runningOperations.append(durableOperation)
|
||
|
|
||
|
Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)")
|
||
|
operationQueue.addOperation(durableOperation.operation)
|
||
|
} catch JobError.assertionFailure(let description) {
|
||
|
owsFailDebug("assertion failure: \(description)")
|
||
|
nextJob.saveAsPermanentlyFailed(transaction: transaction)
|
||
|
} catch JobError.obsolete(let description) {
|
||
|
// TODO is this even worthwhile to have obsolete state? Should we just delete the task outright?
|
||
|
Logger.verbose("marking obsolete task as such. description:\(description)")
|
||
|
nextJob.saveAsObsolete(transaction: transaction)
|
||
|
} catch {
|
||
|
owsFailDebug("unexpected error")
|
||
|
}
|
||
|
|
||
|
DispatchQueue.global().async {
|
||
|
self.workStep()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public func restartOldJobs() {
|
||
|
Storage.writeSync { transaction in
|
||
|
let runningRecords = self.finder.allRecords(label: self.jobRecordLabel, status: .running, transaction: transaction)
|
||
|
Logger.info("marking old `running` JobRecords as ready: \(runningRecords.count)")
|
||
|
for record in runningRecords {
|
||
|
guard let jobRecord = record as? JobRecordType else {
|
||
|
owsFailDebug("unexpectred jobRecord: \(record)")
|
||
|
continue
|
||
|
}
|
||
|
do {
|
||
|
try jobRecord.saveRunningAsReady(transaction: transaction)
|
||
|
self.didMarkAsReady(oldJobRecord: jobRecord, transaction: transaction)
|
||
|
} catch {
|
||
|
owsFailDebug("failed to mark old running records as ready error: \(error)")
|
||
|
jobRecord.saveAsPermanentlyFailed(transaction: transaction)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Unless you need special handling, your setup method can be as simple as
|
||
|
///
|
||
|
/// func setup() {
|
||
|
/// defaultSetup()
|
||
|
/// }
|
||
|
///
|
||
|
/// So you might ask, why not just rename this method to `setup`? Because
|
||
|
/// `setup` is called from objc, and default implementations from a protocol
|
||
|
/// cannot be marked as @objc.
|
||
|
func defaultSetup() {
|
||
|
guard !isSetup else {
|
||
|
owsFailDebug("already ready already")
|
||
|
return
|
||
|
}
|
||
|
self.restartOldJobs()
|
||
|
|
||
|
if self.requiresInternet {
|
||
|
NotificationCenter.default.addObserver(forName: .reachabilityChanged,
|
||
|
object: self.reachabilityManager.observationContext,
|
||
|
queue: nil) { _ in
|
||
|
|
||
|
if self.reachabilityManager.isReachable {
|
||
|
Logger.verbose("isReachable: true")
|
||
|
self.becameReachable()
|
||
|
} else {
|
||
|
Logger.verbose("isReachable: false")
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
self.isSetup = true
|
||
|
|
||
|
self.startWorkWhenAppIsReady()
|
||
|
}
|
||
|
|
||
|
func remainingRetries(durableOperation: DurableOperationType) -> UInt {
|
||
|
let maxRetries = type(of: self).maxRetries
|
||
|
let failureCount = durableOperation.jobRecord.failureCount
|
||
|
|
||
|
guard maxRetries > failureCount else {
|
||
|
return 0
|
||
|
}
|
||
|
|
||
|
return maxRetries - failureCount
|
||
|
}
|
||
|
|
||
|
func becameReachable() {
|
||
|
guard requiresInternet else {
|
||
|
owsFailDebug("should only be called if `requiresInternet` is true")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
_ = self.runAnyQueuedRetry()
|
||
|
}
|
||
|
|
||
|
func runAnyQueuedRetry() -> DurableOperationType? {
|
||
|
guard let runningDurableOperation = self.runningOperations.first else {
|
||
|
return nil
|
||
|
}
|
||
|
runningDurableOperation.operation.runAnyQueuedRetry()
|
||
|
|
||
|
return runningDurableOperation
|
||
|
}
|
||
|
|
||
|
// MARK: DurableOperationDelegate
|
||
|
|
||
|
func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) {
|
||
|
self.runningOperations = self.runningOperations.filter { $0 !== operation }
|
||
|
operation.jobRecord.remove(with: transaction)
|
||
|
}
|
||
|
|
||
|
func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction) {
|
||
|
do {
|
||
|
try operation.jobRecord.addFailure(transaction: transaction)
|
||
|
} catch {
|
||
|
owsFailDebug("error while addingFailure: \(error)")
|
||
|
operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) {
|
||
|
self.runningOperations = self.runningOperations.filter { $0 !== operation }
|
||
|
operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction)
|
||
|
}
|
||
|
}
|