Merge branch 'charlesmchen/batchProcessingGlitch'

pull/1/head
Matthew Chen 7 years ago
commit f8b7c08be8

@ -8,6 +8,7 @@ NS_ASSUME_NONNULL_BEGIN
+ (NSDateFormatter *)dateFormatter;
+ (NSDateFormatter *)timeFormatter;
+ (BOOL)dateIsOlderThanOneDay:(NSDate *)date;
+ (BOOL)dateIsOlderThanOneWeek:(NSDate *)date;
+ (BOOL)dateIsToday:(NSDate *)date;

@ -265,24 +265,14 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
_finder = finder;
_isDrainingQueue = NO;
[[NSNotificationCenter defaultCenter] addObserver:self
selector:@selector(appIsReady)
name:AppIsReadyNotification
object:nil];
// Start processing.
[AppReadiness runNowOrWhenAppIsReady:^{
[self drainQueue];
}];
return self;
}
- (void)dealloc
{
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
- (void)appIsReady
{
[self drainQueue];
}
#pragma mark - instance methods
- (dispatch_queue_t)serialQueue
@ -308,17 +298,14 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
- (void)drainQueue
{
OWSAssert(AppReadiness.isAppReady);
// Don't process incoming messages in app extensions.
if (!CurrentAppContext().isMainApp) {
return;
}
dispatch_async(self.serialQueue, ^{
if (!AppReadiness.isAppReady) {
// We don't want to process incoming messages until storage is ready.
return;
}
if (self.isDrainingQueue) {
return;
}
@ -471,7 +458,13 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
// We need to persist the decrypted envelope data ASAP to prevent data loss.
[self.processingQueue enqueueEnvelopeData:envelopeData plaintextData:plaintextData transaction:transaction];
[self.processingQueue drainQueue];
// The new envelope won't be visible to the finder until this transaction commits,
// so drainQueue in the transaction completion.
[transaction addCompletionQueue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
completionBlock:^{
[self.processingQueue drainQueue];
}];
}
@end

@ -139,6 +139,13 @@ NS_ASSUME_NONNULL_BEGIN
{
if (AppReadiness.isAppReady) {
[OWSMessageUtils.sharedManager updateApplicationBadgeCount];
} else {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[AppReadiness runNowOrWhenAppIsReady:^{
[OWSMessageUtils.sharedManager updateApplicationBadgeCount];
}];
});
}
}

@ -123,7 +123,8 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
{
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
[[[OWSMessageDecryptJob alloc] initWithEnvelope:envelope] saveWithTransaction:transaction];
OWSMessageDecryptJob *job = [[OWSMessageDecryptJob alloc] initWithEnvelope:envelope];
[job saveWithTransaction:transaction];
}];
}
@ -242,24 +243,13 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
_finder = finder;
_isDrainingQueue = NO;
[[NSNotificationCenter defaultCenter] addObserver:self
selector:@selector(appIsReady)
name:AppIsReadyNotification
object:nil];
[AppReadiness runNowOrWhenAppIsReady:^{
[self drainQueue];
}];
return self;
}
- (void)dealloc
{
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
- (void)appIsReady
{
[self drainQueue];
}
#pragma mark - instance methods
- (dispatch_queue_t)serialQueue
@ -279,17 +269,14 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
- (void)drainQueue
{
OWSAssert(AppReadiness.isAppReady);
// Don't decrypt messages in app extensions.
if (!CurrentAppContext().isMainApp) {
return;
}
dispatch_async(self.serialQueue, ^{
if (!AppReadiness.isAppReady) {
// We don't want to process incoming messages until storage is ready.
return;
}
if (self.isDrainingQueue) {
return;
}

@ -177,13 +177,10 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE
OWSSingletonAssert();
[[NSNotificationCenter defaultCenter] addObserver:self
selector:@selector(appIsReady)
name:AppIsReadyNotification
object:nil];
// Try to start processing.
[self scheduleProcessing];
// Start processing.
[AppReadiness runNowOrWhenAppIsReady:^{
[self scheduleProcessing];
}];
return self;
}
@ -193,21 +190,14 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
- (void)appIsReady
{
[self scheduleProcessing];
}
// Schedules a processing pass, unless one is already scheduled.
- (void)scheduleProcessing
{
OWSAssert(AppReadiness.isAppReady);
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
@synchronized(self)
{
if (!AppReadiness.isAppReady) {
DDLogInfo(@"%@ Deferring read receipt processing; storage not yet ready.", self.logTag);
return;
}
if (self.isProcessing) {
return;
}

@ -610,6 +610,12 @@ NSString *const kNSNotification_SocketManagerStateDidChange = @"kNSNotification_
OWSAssertIsOnMainThread();
if (!AppReadiness.isAppReady) {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[AppReadiness runNowOrWhenAppIsReady:^{
[self applyDesiredSocketState];
}];
});
return;
}

@ -4,8 +4,6 @@
NS_ASSUME_NONNULL_BEGIN
extern NSString *const AppIsReadyNotification;
typedef void (^AppReadyBlock)(void);
@interface AppReadiness : NSObject

@ -3,12 +3,9 @@
//
#import "AppReadiness.h"
#import "NSNotificationCenter+OWS.h"
NS_ASSUME_NONNULL_BEGIN
NSString *const AppIsReadyNotification = @"AppIsReadyNotification";
@interface AppReadiness ()
@property (atomic) BOOL isAppReady;
@ -85,8 +82,6 @@ NSString *const AppIsReadyNotification = @"AppIsReadyNotification";
self.isAppReady = YES;
[self runAppReadyBlocks];
[[NSNotificationCenter defaultCenter] postNotificationNameAsync:AppIsReadyNotification object:nil userInfo:nil];
}
- (void)runAppReadyBlocks

Loading…
Cancel
Save