From 0c2bb439f935bc728121ae57bbb0e9cffd792746 Mon Sep 17 00:00:00 2001 From: Michael Kirk Date: Mon, 22 Oct 2018 10:53:14 -0600 Subject: [PATCH] kick-queue upon reachability --- Signal/src/Jobs/SessionResetJob.swift | 9 +++-- .../src/Network/MessageSenderJobQueue.swift | 8 ++-- SignalServiceKit/src/Util/JobQueue.swift | 39 ++++++++++++++++++- SignalServiceKit/src/Util/OWSOperation.h | 6 ++- SignalServiceKit/src/Util/OWSOperation.m | 35 ++++++++++++++--- 5 files changed, 84 insertions(+), 13 deletions(-) diff --git a/Signal/src/Jobs/SessionResetJob.swift b/Signal/src/Jobs/SessionResetJob.swift index 7dbdff30b..7f68eaf5c 100644 --- a/Signal/src/Jobs/SessionResetJob.swift +++ b/Signal/src/Jobs/SessionResetJob.swift @@ -20,6 +20,8 @@ public class SessionResetJobQueue: NSObject, JobQueue { public typealias DurableOperationType = SessionResetOperation public let jobRecordLabel: String = "SessionReset" public static let maxRetries: UInt = 10 + public let requiresInternet: Bool = true + public var runningOperations: [SessionResetOperation] = [] @objc public func setup() { @@ -60,7 +62,7 @@ public class SessionResetOperation: OWSOperation, DurableOperation { weak public var durableOperationDelegate: SessionResetJobQueue? - public var operation: Operation { + public var operation: OWSOperation { return self } @@ -143,7 +145,7 @@ public class SessionResetOperation: OWSOperation, DurableOperation { } } - override public func retryDelay() -> dispatch_time_t { + override public func retryInterval() -> TimeInterval { // Arbitrary backoff factor... // With backOffFactor of 1.9 // try 1 delay: 0.00s @@ -156,7 +158,8 @@ public class SessionResetOperation: OWSOperation, DurableOperation { let maxBackoff = kHourInterval let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount))) - return UInt64(seconds) * NSEC_PER_SEC + + return seconds } override public func didFail(error: Error) { diff --git a/SignalServiceKit/src/Network/MessageSenderJobQueue.swift b/SignalServiceKit/src/Network/MessageSenderJobQueue.swift index f8ef6bb81..7976a3a0f 100644 --- a/SignalServiceKit/src/Network/MessageSenderJobQueue.swift +++ b/SignalServiceKit/src/Network/MessageSenderJobQueue.swift @@ -66,6 +66,8 @@ public class MessageSenderJobQueue: NSObject, JobQueue { public typealias DurableOperationType = MessageSenderOperation public static let jobRecordLabel: String = "MessageSender" public static let maxRetries: UInt = 10 + public let requiresInternet: Bool = true + public var runningOperations: [MessageSenderOperation] = [] public var jobRecordLabel: String { return type(of: self).jobRecordLabel @@ -136,7 +138,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation { weak public var durableOperationDelegate: MessageSenderJobQueue? - public var operation: Operation { + public var operation: OWSOperation { return self } @@ -183,7 +185,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation { } } - override public func retryDelay() -> dispatch_time_t { + override public func retryInterval() -> TimeInterval { guard !CurrentAppContext().isRunningTests else { return 0 } @@ -200,7 +202,7 @@ public class MessageSenderOperation: OWSOperation, DurableOperation { let maxBackoff = kHourInterval let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount))) - return UInt64(seconds) * NSEC_PER_SEC + return seconds } override public func didFail(error: Error) { diff --git a/SignalServiceKit/src/Util/JobQueue.swift b/SignalServiceKit/src/Util/JobQueue.swift index 6037d5630..95cd6655d 100644 --- a/SignalServiceKit/src/Util/JobQueue.swift +++ b/SignalServiceKit/src/Util/JobQueue.swift @@ -37,7 +37,7 @@ public protocol DurableOperation: class { var jobRecord: JobRecordType { get } var durableOperationDelegate: DurableOperationDelegateType? { get set } - var operation: Operation { get } + var operation: OWSOperation { get } var remainingRetries: UInt { get set } } @@ -67,6 +67,7 @@ public protocol JobQueue: DurableOperationDelegate { // MARK: Required + var runningOperations: [DurableOperationType] { get set } var jobRecordLabel: String { get } var isSetup: Bool { get set } @@ -76,6 +77,12 @@ public protocol JobQueue: DurableOperationDelegate { func operationQueue(jobRecord: JobRecordType) -> OperationQueue func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType + /// When `requiresInternet` is true, we immediately run any jobs which are waiting for retry upon detecting Reachability. + /// + /// Because `Reachability` isn't 100% reliable, the jobs will be attempted regardless of what we think our current Reachability is. + /// However, because these jobs will likely fail many times in succession, their `retryInterval` could be quite long by the time we + /// are back online. + var requiresInternet: Bool { get } static var maxRetries: UInt { get } } @@ -91,6 +98,10 @@ public extension JobQueue { return JobRecordFinder() } + var reachabilityManager: SSKReachabilityManager { + return SSKEnvironment.shared.reachabilityManager + } + // MARK: func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) { @@ -131,6 +142,8 @@ public extension JobQueue { let remainingRetries = self.remainingRetries(durableOperation: durableOperation) durableOperation.remainingRetries = remainingRetries + self.runningOperations.append(durableOperation) + Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)") operationQueue.addOperation(durableOperation.operation) } catch JobError.assertionFailure(let description) { @@ -186,6 +199,20 @@ public extension JobQueue { } self.restartOldJobs() + if self.requiresInternet { + NotificationCenter.default.addObserver(forName: .reachabilityChanged, + object: self.reachabilityManager.observationContext, + queue: nil) { _ in + + if self.reachabilityManager.isReachable { + Logger.verbose("isReachable: true") + self.becameReachable() + } else { + Logger.verbose("isReachable: false") + } + } + } + self.isSetup = true DispatchQueue.global().async { @@ -204,9 +231,18 @@ public extension JobQueue { return maxRetries - failureCount } + func becameReachable() { + guard requiresInternet else { + return + } + + self.runningOperations.first?.operation.runAnyQueuedRetry() + } + // MARK: DurableOperationDelegate func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) { + self.runningOperations = self.runningOperations.filter { $0 !== operation } operation.jobRecord.remove(with: transaction) } @@ -220,6 +256,7 @@ public extension JobQueue { } func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) { + self.runningOperations = self.runningOperations.filter { $0 !== operation } operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction) } } diff --git a/SignalServiceKit/src/Util/OWSOperation.h b/SignalServiceKit/src/Util/OWSOperation.h index 463a7e3cb..ebeeaa53f 100644 --- a/SignalServiceKit/src/Util/OWSOperation.h +++ b/SignalServiceKit/src/Util/OWSOperation.h @@ -52,10 +52,14 @@ typedef NS_ENUM(NSInteger, OWSOperationState) { - (void)didFailWithError:(NSError *)error NS_SWIFT_NAME(didFail(error:)); // How long to wait before retry, if possible -- (dispatch_time_t)retryDelay; +- (NSTimeInterval)retryInterval; #pragma mark - Success/Error - Do Not Override +// Runs now if a retry timer has been set by a previous failure, +// otherwise assumes we're currently running and does nothing. +- (void)runAnyQueuedRetry; + // Report that the operation completed successfully. // // Each invocation of `run` must make exactly one call to one of: `reportSuccess`, `reportCancelled`, or `reportError:` diff --git a/SignalServiceKit/src/Util/OWSOperation.m b/SignalServiceKit/src/Util/OWSOperation.m index 6e3da8a92..b04da4498 100644 --- a/SignalServiceKit/src/Util/OWSOperation.m +++ b/SignalServiceKit/src/Util/OWSOperation.m @@ -4,6 +4,7 @@ #import "OWSOperation.h" #import "NSError+MessageSending.h" +#import "NSTimer+OWS.h" #import "OWSBackgroundTask.h" #import "OWSError.h" @@ -17,6 +18,8 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; @property (nullable) NSError *failingError; @property (atomic) OWSOperationState operationState; @property (nonatomic) OWSBackgroundTask *backgroundTask; +@property (nonatomic) NSTimer *_Nullable retryTimer; +@property (nonatomic, readonly) dispatch_queue_t retryTimerSerialQueue; @end @@ -31,7 +34,8 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; _operationState = OWSOperationStateNew; _backgroundTask = [OWSBackgroundTask backgroundTaskWithLabel:self.logTag]; - + _retryTimerSerialQueue = dispatch_queue_create("SignalServiceKit.OWSOperation.retryTimer", DISPATCH_QUEUE_SERIAL); + // Operations are not retryable by default. _remainingRetries = 0; @@ -125,6 +129,22 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; [self run]; } +- (void)runAnyQueuedRetry +{ + __block NSTimer *_Nullable retryTimer; + dispatch_sync(self.retryTimerSerialQueue, ^{ + retryTimer = self.retryTimer; + self.retryTimer = nil; + [retryTimer invalidate]; + }); + + if (retryTimer != nil) { + [self run]; + } else { + OWSLogVerbose(@"not re-running since operation is already running."); + } +} + #pragma mark - Public Methods // These methods are not intended to be subclassed @@ -170,15 +190,20 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; self.remainingRetries--; - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, self.retryDelay), dispatch_get_main_queue(), ^{ - [self run]; + dispatch_sync(self.retryTimerSerialQueue, ^{ + [self.retryTimer invalidate]; + self.retryTimer = [NSTimer weakScheduledTimerWithTimeInterval:self.retryInterval + target:self + selector:@selector(runAnyQueuedRetry) + userInfo:nil + repeats:NO]; }); } // Override in subclass if you want something more sophisticated, e.g. exponential backoff -- (dispatch_time_t)retryDelay +- (NSTimeInterval)retryInterval { - return (0.1 * NSEC_PER_SEC); + return 0.1; } #pragma mark - Life Cycle