From afc753e7edb7feeb8b5819c3e7bdcaf3c4c692dd Mon Sep 17 00:00:00 2001 From: Matthew Chen Date: Wed, 13 Sep 2017 12:49:19 -0400 Subject: [PATCH] Add batch message processor. // FREEBIE --- Signal/src/AppDelegate.m | 4 +- .../src/Messages/OWSBatchMessageProcessor.h | 20 + .../src/Messages/OWSBatchMessageProcessor.m | 375 ++++++++++++++++++ .../src/Messages/OWSMessageReceiver.m | 33 +- .../src/Storage/TSStorageManager.m | 4 +- 5 files changed, 421 insertions(+), 15 deletions(-) create mode 100644 SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h create mode 100644 SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m diff --git a/Signal/src/AppDelegate.m b/Signal/src/AppDelegate.m index e7ef19ec0..009767219 100644 --- a/Signal/src/AppDelegate.m +++ b/Signal/src/AppDelegate.m @@ -25,11 +25,11 @@ #import "VersionMigrations.h" #import "ViewControllerUtils.h" #import +#import #import #import #import #import -#import #import #import #import @@ -817,7 +817,7 @@ static NSString *const kURLHostVerifyPrefix = @"verify"; // If there were any messages in our local queue which we hadn't yet processed. [[OWSMessageReceiver sharedInstance] handleAnyUnprocessedEnvelopesAsync]; - // [[OWSMessageDecrypter sharedInstance] handleAnyUnprocessedEnvelopesAsync]; + [[OWSBatchMessageProcessor sharedInstance] handleAnyUnprocessedEnvelopesAsync]; [OWSProfileManager.sharedManager fetchLocalUsersProfile]; } diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h new file mode 100644 index 000000000..66009908c --- /dev/null +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h @@ -0,0 +1,20 @@ +// +// Copyright (c) 2017 Open Whisper Systems. All rights reserved. +// + +NS_ASSUME_NONNULL_BEGIN + +@class OWSSignalServiceProtosEnvelope; +@class YapDatabase; + +@interface OWSBatchMessageProcessor : NSObject + ++ (instancetype)sharedInstance; ++ (void)syncRegisterDatabaseExtension:(YapDatabase *)database; + +- (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData; +- (void)handleAnyUnprocessedEnvelopesAsync; + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m new file mode 100644 index 000000000..45fdb55e0 --- /dev/null +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m @@ -0,0 +1,375 @@ +// +// Copyright (c) 2017 Open Whisper Systems. All rights reserved. +// + +#import "OWSBatchMessageProcessor.h" +#import "OWSSignalServiceProtos.pb.h" +#import "TSDatabaseView.h" +#import "TSMessagesManager.h" +#import "TSStorageManager.h" +#import "TSYapDatabaseObject.h" +#import +#import +#import + +NS_ASSUME_NONNULL_BEGIN + +#pragma mark - Persisted data model + +@class OWSSignalServiceProtosEnvelope; + +@interface OWSBatchMessageProcessingJob : TSYapDatabaseObject + +@property (nonatomic, readonly) NSDate *createdAt; + +- (instancetype)initWithEnvelopeData:(NSData *)envelopeData + plaintextData:(NSData *_Nullable)plaintextData NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithUniqueId:(NSString *)uniqueId NS_UNAVAILABLE; +- (OWSSignalServiceProtosEnvelope *)envelopeProto; + +@end + +#pragma mark - + +@interface OWSBatchMessageProcessingJob () + +@property (nonatomic, readonly) NSData *envelopeData; +@property (nonatomic, readonly, nullable) NSData *plaintextData; + +@end + +#pragma mark - + +@implementation OWSBatchMessageProcessingJob + +- (instancetype)initWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData +{ + OWSAssert(envelopeData); + + self = [super initWithUniqueId:[NSUUID new].UUIDString]; + if (!self) { + return self; + } + + _envelopeData = envelopeData; + _plaintextData = plaintextData; + _createdAt = [NSDate new]; + + return self; +} + +- (OWSSignalServiceProtosEnvelope *)envelopeProto +{ + return [OWSSignalServiceProtosEnvelope parseFromData:self.envelopeData]; +} + +@end + +#pragma mark - Finder + +NSString *const OWSBatchMessageProcessingJobFinderExtensionName = @"OWSBatchMessageProcessingJobFinderExtensionName"; +NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMessageProcessingJobFinderExtensionGroup"; + +@interface OWSBatchMessageProcessingJobFinder : NSObject + +- (nullable OWSBatchMessageProcessingJob *)nextJob; +- (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData; +- (void)removeJobWithId:(NSString *)uniqueId; + +@end + +#pragma mark - + +@interface OWSBatchMessageProcessingJobFinder () + +@property (nonatomic, readonly) YapDatabaseConnection *dbConnection; + +@end + +#pragma mark - + +@implementation OWSBatchMessageProcessingJobFinder + +- (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection +{ + OWSSingletonAssert(); + + self = [super init]; + if (!self) { + return self; + } + + _dbConnection = dbConnection; + + return self; +} + +- (nullable OWSBatchMessageProcessingJob *)nextJob +{ + __block OWSBatchMessageProcessingJob *_Nullable job; + [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { + YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSBatchMessageProcessingJobFinderExtensionName]; + OWSAssert(viewTransaction != nil); + job = [viewTransaction firstObjectInGroup:OWSBatchMessageProcessingJobFinderExtensionGroup]; + }]; + + return job; +} + +- (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData +{ + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [[[OWSBatchMessageProcessingJob alloc] initWithEnvelopeData:envelopeData plaintextData:plaintextData] + saveWithTransaction:transaction]; + }]; +} + +- (void)removeJobWithId:(NSString *)uniqueId +{ + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [transaction removeObjectForKey:uniqueId inCollection:[OWSBatchMessageProcessingJob collection]]; + }]; +} + ++ (YapDatabaseView *)databaseExtension +{ + YapDatabaseViewSorting *sorting = + [YapDatabaseViewSorting withObjectBlock:^NSComparisonResult(YapDatabaseReadTransaction *transaction, + NSString *group, + NSString *collection1, + NSString *key1, + id object1, + NSString *collection2, + NSString *key2, + id object2) { + + if (![object1 isKindOfClass:[OWSBatchMessageProcessingJob class]]) { + OWSFail(@"Unexpected object: %@ in collection: %@", [object1 class], collection1); + return NSOrderedSame; + } + OWSBatchMessageProcessingJob *job1 = (OWSBatchMessageProcessingJob *)object1; + + if (![object2 isKindOfClass:[OWSBatchMessageProcessingJob class]]) { + OWSFail(@"Unexpected object: %@ in collection: %@", [object2 class], collection2); + return NSOrderedSame; + } + OWSBatchMessageProcessingJob *job2 = (OWSBatchMessageProcessingJob *)object2; + + return [job1.createdAt compare:job2.createdAt]; + }]; + + YapDatabaseViewGrouping *grouping = + [YapDatabaseViewGrouping withObjectBlock:^NSString *_Nullable(YapDatabaseReadTransaction *_Nonnull transaction, + NSString *_Nonnull collection, + NSString *_Nonnull key, + id _Nonnull object) { + if (![object isKindOfClass:[OWSBatchMessageProcessingJob class]]) { + OWSFail(@"Unexpected object: %@ in collection: %@", object, collection); + return nil; + } + + // Arbitrary string - all in the same group. We're only using the view for sorting. + return OWSBatchMessageProcessingJobFinderExtensionGroup; + }]; + + YapDatabaseViewOptions *options = [YapDatabaseViewOptions new]; + options.allowedCollections = [[YapWhitelistBlacklist alloc] + initWithWhitelist:[NSSet setWithObject:[OWSBatchMessageProcessingJob collection]]]; + + return [[YapDatabaseView alloc] initWithGrouping:grouping sorting:sorting versionTag:@"1" options:options]; +} + + ++ (void)syncRegisterDatabaseExtension:(YapDatabase *)database +{ + YapDatabaseView *existingView = [database registeredExtension:OWSBatchMessageProcessingJobFinderExtensionName]; + if (existingView) { + OWSFail(@"%@ was already initialized.", OWSBatchMessageProcessingJobFinderExtensionName); + // already initialized + return; + } + [database registerExtension:[self databaseExtension] withName:OWSBatchMessageProcessingJobFinderExtensionName]; +} + +@end + +#pragma mark - Queue Processing + +@interface OWSBatchMessageProcessingQueue : NSObject + +@property (nonatomic, readonly) TSMessagesManager *messagesManager; +@property (nonatomic, readonly) OWSBatchMessageProcessingJobFinder *finder; +@property (nonatomic) BOOL isDrainingQueue; + +- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + finder:(OWSBatchMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER; +- (instancetype)init NS_UNAVAILABLE; + +@end + +#pragma mark - + +@implementation OWSBatchMessageProcessingQueue + +- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + finder:(OWSBatchMessageProcessingJobFinder *)finder +{ + OWSSingletonAssert(); + + self = [super init]; + if (!self) { + return self; + } + + _messagesManager = messagesManager; + _finder = finder; + _isDrainingQueue = NO; + + return self; +} + +#pragma mark - instance methods + +- (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData +{ + OWSAssert(envelopeData); + + [self.finder addJobWithEnvelopeData:envelopeData plaintextData:plaintextData]; +} + +- (void)drainQueue +{ + dispatch_async(dispatch_get_main_queue(), ^{ + if (self.isDrainingQueue) { + return; + } + self.isDrainingQueue = YES; + + [self drainQueueWorkStep]; + }); +} + +- (void)drainQueueWorkStep +{ + AssertIsOnMainThread(); + + OWSBatchMessageProcessingJob *_Nullable job = [self.finder nextJob]; + if (job == nil) { + self.isDrainingQueue = NO; + DDLogVerbose(@"%@ Queue is drained", self.tag); + return; + } + + [self processJob:job + completion:^{ + dispatch_async(dispatch_get_main_queue(), ^{ + DDLogVerbose(@"%@ completed job. %lu jobs left.", + self.tag, + (unsigned long)[OWSBatchMessageProcessingJob numberOfKeysInCollection]); + [self.finder removeJobWithId:job.uniqueId]; + [self drainQueueWorkStep]; + }); + }]; +} + +- (void)processJob:(OWSBatchMessageProcessingJob *)job completion:(void (^)())completion +{ + dispatch_async(dispatch_get_main_queue(), ^{ + [self.messagesManager processEnvelope:job.envelopeProto plaintextData:job.plaintextData]; + completion(); + }); +} + +#pragma mark Logging + ++ (NSString *)tag +{ + return [NSString stringWithFormat:@"[%@]", self.class]; +} + +- (NSString *)tag +{ + return self.class.tag; +} + +@end + +#pragma mark - OWSBatchMessageProcessor + +@interface OWSBatchMessageProcessor () + +@property (nonatomic, readonly) OWSBatchMessageProcessingQueue *processingQueue; +@property (nonatomic, readonly) YapDatabaseConnection *dbConnection; + +@end + +#pragma mark - + +@implementation OWSBatchMessageProcessor + +- (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection + messagesManager:(TSMessagesManager *)messagesManager +{ + OWSSingletonAssert(); + + self = [super init]; + if (!self) { + return self; + } + + OWSBatchMessageProcessingJobFinder *finder = + [[OWSBatchMessageProcessingJobFinder alloc] initWithDBConnection:dbConnection]; + OWSBatchMessageProcessingQueue *processingQueue = + [[OWSBatchMessageProcessingQueue alloc] initWithMessagesManager:messagesManager finder:finder]; + + _processingQueue = processingQueue; + + return self; +} + +- (instancetype)initDefault +{ + // For concurrency coherency we use the same dbConnection to persist and read the unprocessed envelopes + YapDatabaseConnection *dbConnection = [[TSStorageManager sharedManager].database newConnection]; + TSMessagesManager *messagesManager = [TSMessagesManager sharedManager]; + + return [self initWithDBConnection:dbConnection messagesManager:messagesManager]; +} + ++ (instancetype)sharedInstance +{ + static OWSBatchMessageProcessor *sharedInstance; + + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + sharedInstance = [[self alloc] initDefault]; + }); + + return sharedInstance; +} + +#pragma mark - class methods + ++ (void)syncRegisterDatabaseExtension:(YapDatabase *)database +{ + [OWSBatchMessageProcessingJobFinder syncRegisterDatabaseExtension:database]; +} + +#pragma mark - instance methods + +- (void)handleAnyUnprocessedEnvelopesAsync +{ + [self.processingQueue drainQueue]; +} + +- (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData +{ + OWSAssert(envelopeData); + + [self.processingQueue enqueueEnvelopeData:envelopeData plaintextData:plaintextData]; + [self.processingQueue drainQueue]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index eb2e4a00b..cde7ade6b 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -3,6 +3,7 @@ // #import "OWSMessageReceiver.h" +#import "OWSBatchMessageProcessor.h" #import "OWSSignalServiceProtos.pb.h" #import "TSDatabaseView.h" #import "TSMessagesManager.h" @@ -194,10 +195,12 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces @interface OWSMessageProcessingQueue : NSObject @property (nonatomic, readonly) TSMessagesManager *messagesManager; +@property (nonatomic, readonly) OWSBatchMessageProcessor *batchMessageProcessor; @property (nonatomic, readonly) OWSMessageProcessingJobFinder *finder; @property (nonatomic) BOOL isDrainingQueue; - (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + batchMessageProcessor:(OWSBatchMessageProcessor *)batchMessageProcessor finder:(OWSMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER; - (instancetype)init NS_UNAVAILABLE; @@ -208,6 +211,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces @implementation OWSMessageProcessingQueue - (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + batchMessageProcessor:(OWSBatchMessageProcessor *)batchMessageProcessor finder:(OWSMessageProcessingJobFinder *)finder { OWSSingletonAssert(); @@ -218,6 +222,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces } _messagesManager = messagesManager; + _batchMessageProcessor = batchMessageProcessor; _finder = finder; _isDrainingQueue = NO; @@ -256,11 +261,13 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces [self processJob:job completion:^{ - DDLogVerbose(@"%@ completed job. %lu jobs left.", - self.tag, - (unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]); - [self.finder removeJobWithId:job.uniqueId]; - [self drainQueueWorkStep]; + dispatch_async(dispatch_get_main_queue(), ^{ + DDLogVerbose(@"%@ completed job. %lu jobs left.", + self.tag, + (unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]); + [self.finder removeJobWithId:job.uniqueId]; + [self drainQueueWorkStep]; + }); }]; } @@ -269,10 +276,8 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ [self.messagesManager decryptEnvelope:job.envelopeProto successBlock:^(NSData *_Nullable plaintextData) { - dispatch_async(dispatch_get_main_queue(), ^{ - [self.messagesManager processEnvelope:job.envelopeProto plaintextData:plaintextData]; - completion(); - }); + [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; + completion(); } failureBlock:^{ completion(); @@ -309,6 +314,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces - (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection messagesManager:(TSMessagesManager *)messagesManager + batchMessageProcessor:(OWSBatchMessageProcessor *)batchMessageProcessor { OWSSingletonAssert(); @@ -319,7 +325,9 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces OWSMessageProcessingJobFinder *finder = [[OWSMessageProcessingJobFinder alloc] initWithDBConnection:dbConnection]; OWSMessageProcessingQueue *processingQueue = - [[OWSMessageProcessingQueue alloc] initWithMessagesManager:messagesManager finder:finder]; + [[OWSMessageProcessingQueue alloc] initWithMessagesManager:messagesManager + batchMessageProcessor:batchMessageProcessor + finder:finder]; _processingQueue = processingQueue; @@ -331,8 +339,11 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces // For concurrency coherency we use the same dbConnection to persist and read the unprocessed envelopes YapDatabaseConnection *dbConnection = [[TSStorageManager sharedManager].database newConnection]; TSMessagesManager *messagesManager = [TSMessagesManager sharedManager]; + OWSBatchMessageProcessor *batchMessageProcessor = [OWSBatchMessageProcessor sharedInstance]; - return [self initWithDBConnection:dbConnection messagesManager:messagesManager]; + return [self initWithDBConnection:dbConnection + messagesManager:messagesManager + batchMessageProcessor:batchMessageProcessor]; } + (instancetype)sharedInstance diff --git a/SignalServiceKit/src/Storage/TSStorageManager.m b/SignalServiceKit/src/Storage/TSStorageManager.m index ca923ff3e..898432694 100644 --- a/SignalServiceKit/src/Storage/TSStorageManager.m +++ b/SignalServiceKit/src/Storage/TSStorageManager.m @@ -18,7 +18,7 @@ #import "TSThread.h" #import <25519/Randomness.h> #import -#import +#import #import #import @@ -306,7 +306,7 @@ void setDatabaseInitialized() [TSDatabaseView registerUnreadDatabaseView]; [self.database registerExtension:[TSDatabaseSecondaryIndexes registerTimeStampIndex] withName:@"idx"]; [OWSMessageReceiver syncRegisterDatabaseExtension:self.database]; - // [OWSMessageDecrypter syncRegisterDatabaseExtension:self.database]; + [OWSBatchMessageProcessor syncRegisterDatabaseExtension:self.database]; // See comments on OWSDatabaseConnection. //