Use private queues in message decrypter and batch processor.

// FREEBIE
pull/1/head
Matthew Chen 8 years ago
parent 077b74a0aa
commit 874ebf7038

@ -5,6 +5,7 @@
#import "OWSBatchMessageProcessor.h" #import "OWSBatchMessageProcessor.h"
#import "NSArray+OWS.h" #import "NSArray+OWS.h"
#import "OWSMessageManager.h" #import "OWSMessageManager.h"
#import "OWSQueues.h"
#import "OWSSignalServiceProtos.pb.h" #import "OWSSignalServiceProtos.pb.h"
#import "TSDatabaseView.h" #import "TSDatabaseView.h"
#import "TSStorageManager.h" #import "TSStorageManager.h"
@ -277,6 +278,16 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc
#pragma mark - instance methods #pragma mark - instance methods
- (dispatch_queue_t)serialQueue
{
static dispatch_queue_t queue = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
queue = dispatch_queue_create("org.whispersystems.message.process", DISPATCH_QUEUE_SERIAL);
});
return queue;
}
- (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData - (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData
{ {
OWSAssert(envelopeData); OWSAssert(envelopeData);
@ -287,7 +298,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc
- (void)drainQueue - (void)drainQueue
{ {
DispatchMainThreadSafe(^{ dispatch_async(self.serialQueue, ^{
if ([TSDatabaseView hasPendingViewRegistrations]) { if ([TSDatabaseView hasPendingViewRegistrations]) {
// We don't want to process incoming messages until database // We don't want to process incoming messages until database
// view registration is complete. // view registration is complete.
@ -305,7 +316,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc
- (void)drainQueueWorkStep - (void)drainQueueWorkStep
{ {
AssertIsOnMainThread(); AssertOnDispatchQueue(self.serialQueue);
NSArray<OWSMessageContentJob *> *jobs = [self.finder nextJobsForBatchSize:kIncomingMessageBatchSize]; NSArray<OWSMessageContentJob *> *jobs = [self.finder nextJobsForBatchSize:kIncomingMessageBatchSize];
OWSAssert(jobs); OWSAssert(jobs);
@ -315,40 +326,35 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSBatchMessageProc
return; return;
} }
[self processJobs:jobs [self processJobs:jobs];
completion:^{
dispatch_async(dispatch_get_main_queue(), ^{ [self.finder removeJobsWithIds:jobs.uniqueIds];
[self.finder removeJobsWithIds:jobs.uniqueIds];
DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.",
DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.", self.tag,
self.tag, jobs.count,
jobs.count, [OWSMessageContentJob numberOfKeysInCollection]);
[OWSMessageContentJob numberOfKeysInCollection]);
// Wait a bit in hopes of increasing the batch size.
// Wait a bit in hopes of increasing the batch size. // This delay won't affect the first message to arrive when this queue is idle,
// This delay won't affect the first message to arrive when this queue is idle, // so by definition we're receiving more than one message and can benefit from
// so by definition we're receiving more than one message and can benefit from // batching.
// batching. dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1f * NSEC_PER_SEC)), self.serialQueue, ^{
dispatch_after( [self drainQueueWorkStep];
dispatch_time(DISPATCH_TIME_NOW, (int64_t)0.1f * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ });
[self drainQueueWorkStep];
});
});
}];
} }
- (void)processJobs:(NSArray<OWSMessageContentJob *> *)jobs completion:(void (^)())completion - (void)processJobs:(NSArray<OWSMessageContentJob *> *)jobs
{ {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ AssertOnDispatchQueue(self.serialQueue);
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
for (OWSMessageContentJob *job in jobs) { [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self.messagesManager processEnvelope:job.envelopeProto for (OWSMessageContentJob *job in jobs) {
plaintextData:job.plaintextData [self.messagesManager processEnvelope:job.envelopeProto
transaction:transaction]; plaintextData:job.plaintextData
} transaction:transaction];
}]; }
completion(); }];
});
} }
#pragma mark Logging #pragma mark Logging

@ -6,6 +6,7 @@
#import "NSArray+OWS.h" #import "NSArray+OWS.h"
#import "OWSBatchMessageProcessor.h" #import "OWSBatchMessageProcessor.h"
#import "OWSMessageDecrypter.h" #import "OWSMessageDecrypter.h"
#import "OWSQueues.h"
#import "OWSSignalServiceProtos.pb.h" #import "OWSSignalServiceProtos.pb.h"
#import "TSDatabaseView.h" #import "TSDatabaseView.h"
#import "TSStorageManager.h" #import "TSStorageManager.h"
@ -249,6 +250,16 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
#pragma mark - instance methods #pragma mark - instance methods
- (dispatch_queue_t)serialQueue
{
static dispatch_queue_t queue = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
queue = dispatch_queue_create("org.whispersystems.message.decrypt", DISPATCH_QUEUE_SERIAL);
});
return queue;
}
- (void)enqueueEnvelopeForProcessing:(OWSSignalServiceProtosEnvelope *)envelope - (void)enqueueEnvelopeForProcessing:(OWSSignalServiceProtosEnvelope *)envelope
{ {
[self.finder addJobForEnvelope:envelope]; [self.finder addJobForEnvelope:envelope];
@ -256,7 +267,7 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
- (void)drainQueue - (void)drainQueue
{ {
DispatchMainThreadSafe(^{ dispatch_async(self.serialQueue, ^{
if ([TSDatabaseView hasPendingViewRegistrations]) { if ([TSDatabaseView hasPendingViewRegistrations]) {
// We don't want to process incoming messages until database // We don't want to process incoming messages until database
// view registration is complete. // view registration is complete.
@ -274,7 +285,7 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
- (void)drainQueueWorkStep - (void)drainQueueWorkStep
{ {
AssertIsOnMainThread(); AssertOnDispatchQueue(self.serialQueue);
OWSMessageDecryptJob *_Nullable job = [self.finder nextJob]; OWSMessageDecryptJob *_Nullable job = [self.finder nextJob];
if (!job) { if (!job) {
@ -296,27 +307,26 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
- (void)processJob:(OWSMessageDecryptJob *)job completion:(void (^)(BOOL))completion - (void)processJob:(OWSMessageDecryptJob *)job completion:(void (^)(BOOL))completion
{ {
AssertOnDispatchQueue(self.serialQueue);
OWSAssert(job); OWSAssert(job);
OWSSignalServiceProtosEnvelope *envelope = job.envelopeProto; OWSSignalServiceProtosEnvelope *envelope = job.envelopeProto;
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ [self.messageDecrypter decryptEnvelope:envelope
[self.messageDecrypter decryptEnvelope:envelope successBlock:^(NSData *_Nullable plaintextData) {
successBlock:^(NSData *_Nullable plaintextData) {
// We can't decrypt the same message twice, so we need to persist // We can't decrypt the same message twice, so we need to persist
// the decrypted envelope data ASAP to prevent data loss. // the decrypted envelope data ASAP to prevent data loss.
[self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData];
dispatch_async(dispatch_get_main_queue(), ^{ dispatch_async(self.serialQueue, ^{
completion(YES); completion(YES);
}); });
} }
failureBlock:^{ failureBlock:^{
dispatch_async(dispatch_get_main_queue(), ^{ dispatch_async(self.serialQueue, ^{
completion(NO); completion(NO);
}); });
}]; }];
});
} }
#pragma mark Logging #pragma mark Logging

Loading…
Cancel
Save