From 874ebf70385c3633c585c1e979269985c2e33b0e Mon Sep 17 00:00:00 2001 From: Matthew Chen Date: Thu, 21 Sep 2017 14:36:52 -0400 Subject: [PATCH] Use private queues in message decrypter and batch processor. // FREEBIE --- .../src/Messages/OWSBatchMessageProcessor.m | 72 ++++++++++--------- .../src/Messages/OWSMessageReceiver.m | 46 +++++++----- 2 files changed, 67 insertions(+), 51 deletions(-) diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m index 2f5674cd4..91609dad3 100644 --- a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m @@ -5,6 +5,7 @@ #import "OWSBatchMessageProcessor.h" #import "NSArray+OWS.h" #import "OWSMessageManager.h" +#import "OWSQueues.h" #import "OWSSignalServiceProtos.pb.h" #import "TSDatabaseView.h" #import "TSStorageManager.h" @@ -277,6 +278,16 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc #pragma mark - instance methods +- (dispatch_queue_t)serialQueue +{ + static dispatch_queue_t queue = nil; + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + queue = dispatch_queue_create("org.whispersystems.message.process", DISPATCH_QUEUE_SERIAL); + }); + return queue; +} + - (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData { OWSAssert(envelopeData); @@ -287,7 +298,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc - (void)drainQueue { - DispatchMainThreadSafe(^{ + dispatch_async(self.serialQueue, ^{ if ([TSDatabaseView hasPendingViewRegistrations]) { // We don't want to process incoming messages until database // view registration is complete. @@ -305,7 +316,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc - (void)drainQueueWorkStep { - AssertIsOnMainThread(); + AssertOnDispatchQueue(self.serialQueue); NSArray *jobs = [self.finder nextJobsForBatchSize:kIncomingMessageBatchSize]; OWSAssert(jobs); @@ -315,40 +326,35 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc return; } - [self processJobs:jobs - completion:^{ - dispatch_async(dispatch_get_main_queue(), ^{ - [self.finder removeJobsWithIds:jobs.uniqueIds]; - - DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.", - self.tag, - jobs.count, - [OWSMessageContentJob numberOfKeysInCollection]); - - // Wait a bit in hopes of increasing the batch size. - // This delay won't affect the first message to arrive when this queue is idle, - // so by definition we're receiving more than one message and can benefit from - // batching. - dispatch_after( - dispatch_time(DISPATCH_TIME_NOW, (int64_t)0.1f * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ - [self drainQueueWorkStep]; - }); - }); - }]; + [self processJobs:jobs]; + + [self.finder removeJobsWithIds:jobs.uniqueIds]; + + DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.", + self.tag, + jobs.count, + [OWSMessageContentJob numberOfKeysInCollection]); + + // Wait a bit in hopes of increasing the batch size. + // This delay won't affect the first message to arrive when this queue is idle, + // so by definition we're receiving more than one message and can benefit from + // batching. + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1f * NSEC_PER_SEC)), self.serialQueue, ^{ + [self drainQueueWorkStep]; + }); } -- (void)processJobs:(NSArray *)jobs completion:(void (^)())completion +- (void)processJobs:(NSArray *)jobs { - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - for (OWSMessageContentJob *job in jobs) { - [self.messagesManager processEnvelope:job.envelopeProto - plaintextData:job.plaintextData - transaction:transaction]; - } - }]; - completion(); - }); + AssertOnDispatchQueue(self.serialQueue); + + [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + for (OWSMessageContentJob *job in jobs) { + [self.messagesManager processEnvelope:job.envelopeProto + plaintextData:job.plaintextData + transaction:transaction]; + } + }]; } #pragma mark Logging diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index a3e7ce5ea..6d58214da 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -6,6 +6,7 @@ #import "NSArray+OWS.h" #import "OWSBatchMessageProcessor.h" #import "OWSMessageDecrypter.h" +#import "OWSQueues.h" #import "OWSSignalServiceProtos.pb.h" #import "TSDatabaseView.h" #import "TSStorageManager.h" @@ -249,6 +250,16 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin #pragma mark - instance methods +- (dispatch_queue_t)serialQueue +{ + static dispatch_queue_t queue = nil; + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + queue = dispatch_queue_create("org.whispersystems.message.decrypt", DISPATCH_QUEUE_SERIAL); + }); + return queue; +} + - (void)enqueueEnvelopeForProcessing:(OWSSignalServiceProtosEnvelope *)envelope { [self.finder addJobForEnvelope:envelope]; @@ -256,7 +267,7 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin - (void)drainQueue { - DispatchMainThreadSafe(^{ + dispatch_async(self.serialQueue, ^{ if ([TSDatabaseView hasPendingViewRegistrations]) { // We don't want to process incoming messages until database // view registration is complete. @@ -274,7 +285,7 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin - (void)drainQueueWorkStep { - AssertIsOnMainThread(); + AssertOnDispatchQueue(self.serialQueue); OWSMessageDecryptJob *_Nullable job = [self.finder nextJob]; if (!job) { @@ -296,27 +307,26 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin - (void)processJob:(OWSMessageDecryptJob *)job completion:(void (^)(BOOL))completion { + AssertOnDispatchQueue(self.serialQueue); OWSAssert(job); OWSSignalServiceProtosEnvelope *envelope = job.envelopeProto; - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - [self.messageDecrypter decryptEnvelope:envelope - successBlock:^(NSData *_Nullable plaintextData) { + [self.messageDecrypter decryptEnvelope:envelope + successBlock:^(NSData *_Nullable plaintextData) { - // We can't decrypt the same message twice, so we need to persist - // the decrypted envelope data ASAP to prevent data loss. - [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; + // We can't decrypt the same message twice, so we need to persist + // the decrypted envelope data ASAP to prevent data loss. + [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; - dispatch_async(dispatch_get_main_queue(), ^{ - completion(YES); - }); - } - failureBlock:^{ - dispatch_async(dispatch_get_main_queue(), ^{ - completion(NO); - }); - }]; - }); + dispatch_async(self.serialQueue, ^{ + completion(YES); + }); + } + failureBlock:^{ + dispatch_async(self.serialQueue, ^{ + completion(NO); + }); + }]; } #pragma mark Logging