Merge branch 'mkirk/drain-queue-perf'

pull/1/head
Michael Kirk 7 years ago
commit 2a6df19e0e

@ -793,6 +793,9 @@ static NSString *const kURLHostVerifyPrefix = @"verify";
[AppVersion.instance appLaunchDidComplete];
[self ensureRootViewController];
// If there were any messages in our local queue which we hadn't yet processed.
[[OWSMessageReceiver sharedInstance] handleAnyUnprocessedEnvelopesAsync];
}
- (void)ensureRootViewController

@ -13,6 +13,7 @@ NS_ASSUME_NONNULL_BEGIN
+ (void)syncRegisterDatabaseExtension:(YapDatabase *)database;
- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope;
- (void)handleAnyUnprocessedEnvelopesAsync;
@end

@ -186,6 +186,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
@property (nonatomic, readonly) TSMessagesManager *messagesManager;
@property (nonatomic, readonly) OWSMessageProcessingJobFinder *finder;
@property (nonatomic) BOOL isDrainingQueue;
- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager
finder:(OWSMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER;
@ -207,6 +208,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
_messagesManager = messagesManager;
_finder = finder;
_isDrainingQueue = NO;
return self;
}
@ -220,41 +222,43 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
- (void)drainQueue
{
dispatch_async(self.class.serialGCDQueue, ^{
OWSMessageProcessingJob *_Nullable job = [self.finder nextJob];
if (job == nil) {
DDLogVerbose(@"%@ Queue is drained", self.tag);
return;
}
[self processJob:job
completion:^{
[self drainQueue];
}];
});
AssertIsOnMainThread();
if (self.isDrainingQueue) {
return;
}
self.isDrainingQueue = YES;
[self drainQueueWorkStep];
}
- (void)processJob:(OWSMessageProcessingJob *)job completion:(void (^)())completion
- (void)drainQueueWorkStep
{
dispatch_async(dispatch_get_main_queue(), ^{
[self.messagesManager processEnvelope:job.envelopeProto
completion:^{
[self.finder removeJobWithId:job.uniqueId];
completion();
}];
});
}
AssertIsOnMainThread();
OWSMessageProcessingJob *_Nullable job = [self.finder nextJob];
if (job == nil) {
self.isDrainingQueue = NO;
DDLogVerbose(@"%@ Queue is drained", self.tag);
return;
}
#pragma mark Helpers
[self processJob:job
completion:^{
DDLogVerbose(@"%@ completed job. %lu jobs left.",
self.tag,
(unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]);
[self drainQueueWorkStep];
}];
}
+ (dispatch_queue_t)serialGCDQueue
- (void)processJob:(OWSMessageProcessingJob *)job completion:(void (^)())completion
{
static dispatch_once_t onceToken;
static dispatch_queue_t queue;
dispatch_once(&onceToken, ^{
queue = dispatch_queue_create("org.whispersystems.signal.messageProcessingQueue", NULL);
});
return queue;
[self.messagesManager processEnvelope:job.envelopeProto
completion:^{
[self.finder removeJobWithId:job.uniqueId];
completion();
}];
}
#pragma mark Logging
@ -331,6 +335,13 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
#pragma mark - instance methods
- (void)handleAnyUnprocessedEnvelopesAsync
{
dispatch_async(dispatch_get_main_queue(), ^{
[self.processingQueue drainQueue];
});
}
- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
{
// Drop any too-large messages on the floor. Well behaving clients should never send them.

Loading…
Cancel
Save