|
|
|
@ -3,6 +3,7 @@
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
#import "OWSMessageReceiver.h"
|
|
|
|
|
#import "NSArray+OWS.h"
|
|
|
|
|
#import "OWSBatchMessageProcessor.h"
|
|
|
|
|
#import "OWSSignalServiceProtos.pb.h"
|
|
|
|
|
#import "TSDatabaseView.h"
|
|
|
|
@ -20,7 +21,7 @@ NS_ASSUME_NONNULL_BEGIN
|
|
|
|
|
|
|
|
|
|
@class OWSSignalServiceProtosEnvelope;
|
|
|
|
|
|
|
|
|
|
@interface OWSMessageProcessingJob : TSYapDatabaseObject
|
|
|
|
|
@interface OWSMessageDecryptJob : TSYapDatabaseObject
|
|
|
|
|
|
|
|
|
|
@property (nonatomic, readonly) NSDate *createdAt;
|
|
|
|
|
|
|
|
|
@ -32,7 +33,7 @@ NS_ASSUME_NONNULL_BEGIN
|
|
|
|
|
|
|
|
|
|
#pragma mark -
|
|
|
|
|
|
|
|
|
|
@interface OWSMessageProcessingJob ()
|
|
|
|
|
@interface OWSMessageDecryptJob ()
|
|
|
|
|
|
|
|
|
|
@property (nonatomic, readonly) NSData *envelopeData;
|
|
|
|
|
|
|
|
|
@ -40,7 +41,12 @@ NS_ASSUME_NONNULL_BEGIN
|
|
|
|
|
|
|
|
|
|
#pragma mark -
|
|
|
|
|
|
|
|
|
|
@implementation OWSMessageProcessingJob
|
|
|
|
|
@implementation OWSMessageDecryptJob
|
|
|
|
|
|
|
|
|
|
+ (NSString *)collection
|
|
|
|
|
{
|
|
|
|
|
return @"OWSMessageProcessingJob";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (instancetype)initWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
|
|
|
|
|
{
|
|
|
|
@ -66,20 +72,16 @@ NS_ASSUME_NONNULL_BEGIN
|
|
|
|
|
|
|
|
|
|
#pragma mark - Finder
|
|
|
|
|
|
|
|
|
|
NSString *const OWSMessageProcessingJobFinderExtensionName = @"OWSMessageProcessingJobFinderExtensionName";
|
|
|
|
|
NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProcessingJobFinderExtensionGroup";
|
|
|
|
|
NSString *const OWSMessageDecryptJobFinderExtensionName = @"OWSMessageProcessingJobFinderExtensionName";
|
|
|
|
|
NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessingJobFinderExtensionGroup";
|
|
|
|
|
|
|
|
|
|
@interface OWSMessageProcessingJobFinder : NSObject
|
|
|
|
|
|
|
|
|
|
- (NSArray<OWSMessageProcessingJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize;
|
|
|
|
|
- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope;
|
|
|
|
|
- (void)removeJobWithId:(NSString *)uniqueId;
|
|
|
|
|
@interface OWSMessageDecryptJobFinder : NSObject
|
|
|
|
|
|
|
|
|
|
@end
|
|
|
|
|
|
|
|
|
|
#pragma mark -
|
|
|
|
|
|
|
|
|
|
@interface OWSMessageProcessingJobFinder ()
|
|
|
|
|
@interface OWSMessageDecryptJobFinder ()
|
|
|
|
|
|
|
|
|
|
@property (nonatomic, readonly) YapDatabaseConnection *dbConnection;
|
|
|
|
|
|
|
|
|
@ -87,7 +89,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
|
|
|
|
|
#pragma mark -
|
|
|
|
|
|
|
|
|
|
@implementation OWSMessageProcessingJobFinder
|
|
|
|
|
@implementation OWSMessageDecryptJobFinder
|
|
|
|
|
|
|
|
|
|
- (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection
|
|
|
|
|
{
|
|
|
|
@ -103,49 +105,40 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
return self;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (NSArray<OWSMessageProcessingJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize
|
|
|
|
|
- (NSArray<OWSMessageDecryptJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize
|
|
|
|
|
{
|
|
|
|
|
NSMutableArray<OWSMessageProcessingJob *> *jobs = [NSMutableArray new];
|
|
|
|
|
NSMutableArray<OWSMessageDecryptJob *> *jobs = [NSMutableArray new];
|
|
|
|
|
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
|
|
|
|
|
YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSMessageProcessingJobFinderExtensionName];
|
|
|
|
|
YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSMessageDecryptJobFinderExtensionName];
|
|
|
|
|
OWSAssert(viewTransaction != nil);
|
|
|
|
|
NSMutableArray<NSString *> *jobIds = [NSMutableArray new];
|
|
|
|
|
[viewTransaction enumerateKeysInGroup:OWSMessageProcessingJobFinderExtensionGroup
|
|
|
|
|
usingBlock:^(NSString *_Nonnull collection,
|
|
|
|
|
NSString *_Nonnull key,
|
|
|
|
|
NSUInteger index,
|
|
|
|
|
BOOL *_Nonnull stop) {
|
|
|
|
|
[jobIds addObject:key];
|
|
|
|
|
if (jobIds.count >= maxBatchSize) {
|
|
|
|
|
*stop = YES;
|
|
|
|
|
}
|
|
|
|
|
}];
|
|
|
|
|
|
|
|
|
|
for (NSString *jobId in jobIds) {
|
|
|
|
|
OWSMessageProcessingJob *_Nullable job =
|
|
|
|
|
[OWSMessageProcessingJob fetchObjectWithUniqueID:jobId transaction:transaction];
|
|
|
|
|
if (job) {
|
|
|
|
|
[jobs addObject:job];
|
|
|
|
|
} else {
|
|
|
|
|
OWSFail(@"Could not load job: %@", jobId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
[viewTransaction enumerateKeysAndObjectsInGroup:OWSMessageDecryptJobFinderExtensionGroup
|
|
|
|
|
usingBlock:^(NSString *_Nonnull collection,
|
|
|
|
|
NSString *_Nonnull key,
|
|
|
|
|
id _Nonnull object,
|
|
|
|
|
NSUInteger index,
|
|
|
|
|
BOOL *_Nonnull stop) {
|
|
|
|
|
OWSMessageDecryptJob *job = object;
|
|
|
|
|
[jobs addObject:job];
|
|
|
|
|
if (jobs.count >= maxBatchSize) {
|
|
|
|
|
*stop = YES;
|
|
|
|
|
}
|
|
|
|
|
}];
|
|
|
|
|
}];
|
|
|
|
|
|
|
|
|
|
return jobs;
|
|
|
|
|
return [jobs copy];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
|
|
|
|
|
{
|
|
|
|
|
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
|
|
|
|
|
[[[OWSMessageProcessingJob alloc] initWithEnvelope:envelope] saveWithTransaction:transaction];
|
|
|
|
|
[[[OWSMessageDecryptJob alloc] initWithEnvelope:envelope] saveWithTransaction:transaction];
|
|
|
|
|
}];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (void)removeJobWithId:(NSString *)uniqueId
|
|
|
|
|
- (void)removeJobsWithIds:(NSArray<NSString *> *)uniqueIds
|
|
|
|
|
{
|
|
|
|
|
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) {
|
|
|
|
|
[transaction removeObjectForKey:uniqueId inCollection:[OWSMessageProcessingJob collection]];
|
|
|
|
|
[transaction removeObjectsForKeys:uniqueIds inCollection:[OWSMessageDecryptJob collection]];
|
|
|
|
|
}];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -161,17 +154,17 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
NSString *key2,
|
|
|
|
|
id object2) {
|
|
|
|
|
|
|
|
|
|
if (![object1 isKindOfClass:[OWSMessageProcessingJob class]]) {
|
|
|
|
|
if (![object1 isKindOfClass:[OWSMessageDecryptJob class]]) {
|
|
|
|
|
OWSFail(@"Unexpected object: %@ in collection: %@", [object1 class], collection1);
|
|
|
|
|
return NSOrderedSame;
|
|
|
|
|
}
|
|
|
|
|
OWSMessageProcessingJob *job1 = (OWSMessageProcessingJob *)object1;
|
|
|
|
|
OWSMessageDecryptJob *job1 = (OWSMessageDecryptJob *)object1;
|
|
|
|
|
|
|
|
|
|
if (![object2 isKindOfClass:[OWSMessageProcessingJob class]]) {
|
|
|
|
|
if (![object2 isKindOfClass:[OWSMessageDecryptJob class]]) {
|
|
|
|
|
OWSFail(@"Unexpected object: %@ in collection: %@", [object2 class], collection2);
|
|
|
|
|
return NSOrderedSame;
|
|
|
|
|
}
|
|
|
|
|
OWSMessageProcessingJob *job2 = (OWSMessageProcessingJob *)object2;
|
|
|
|
|
OWSMessageDecryptJob *job2 = (OWSMessageDecryptJob *)object2;
|
|
|
|
|
|
|
|
|
|
return [job1.createdAt compare:job2.createdAt];
|
|
|
|
|
}];
|
|
|
|
@ -181,18 +174,18 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
NSString *_Nonnull collection,
|
|
|
|
|
NSString *_Nonnull key,
|
|
|
|
|
id _Nonnull object) {
|
|
|
|
|
if (![object isKindOfClass:[OWSMessageProcessingJob class]]) {
|
|
|
|
|
if (![object isKindOfClass:[OWSMessageDecryptJob class]]) {
|
|
|
|
|
OWSFail(@"Unexpected object: %@ in collection: %@", object, collection);
|
|
|
|
|
return nil;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Arbitrary string - all in the same group. We're only using the view for sorting.
|
|
|
|
|
return OWSMessageProcessingJobFinderExtensionGroup;
|
|
|
|
|
return OWSMessageDecryptJobFinderExtensionGroup;
|
|
|
|
|
}];
|
|
|
|
|
|
|
|
|
|
YapDatabaseViewOptions *options = [YapDatabaseViewOptions new];
|
|
|
|
|
options.allowedCollections =
|
|
|
|
|
[[YapWhitelistBlacklist alloc] initWithWhitelist:[NSSet setWithObject:[OWSMessageProcessingJob collection]]];
|
|
|
|
|
[[YapWhitelistBlacklist alloc] initWithWhitelist:[NSSet setWithObject:[OWSMessageDecryptJob collection]]];
|
|
|
|
|
|
|
|
|
|
return [[YapDatabaseView alloc] initWithGrouping:grouping sorting:sorting versionTag:@"1" options:options];
|
|
|
|
|
}
|
|
|
|
@ -200,40 +193,40 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
|
|
|
|
|
+ (void)syncRegisterDatabaseExtension:(YapDatabase *)database
|
|
|
|
|
{
|
|
|
|
|
YapDatabaseView *existingView = [database registeredExtension:OWSMessageProcessingJobFinderExtensionName];
|
|
|
|
|
YapDatabaseView *existingView = [database registeredExtension:OWSMessageDecryptJobFinderExtensionName];
|
|
|
|
|
if (existingView) {
|
|
|
|
|
OWSFail(@"%@ was already initialized.", OWSMessageProcessingJobFinderExtensionName);
|
|
|
|
|
OWSFail(@"%@ was already initialized.", OWSMessageDecryptJobFinderExtensionName);
|
|
|
|
|
// already initialized
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
[database registerExtension:[self databaseExtension] withName:OWSMessageProcessingJobFinderExtensionName];
|
|
|
|
|
[database registerExtension:[self databaseExtension] withName:OWSMessageDecryptJobFinderExtensionName];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@end
|
|
|
|
|
|
|
|
|
|
#pragma mark - Queue Processing
|
|
|
|
|
|
|
|
|
|
@interface OWSMessageProcessingQueue : NSObject
|
|
|
|
|
@interface OWSMessageDecryptQueue : NSObject
|
|
|
|
|
|
|
|
|
|
@property (nonatomic, readonly) TSMessagesManager *messagesManager;
|
|
|
|
|
@property (nonatomic, readonly) OWSBatchMessageProcessor *batchMessageProcessor;
|
|
|
|
|
@property (nonatomic, readonly) OWSMessageProcessingJobFinder *finder;
|
|
|
|
|
@property (nonatomic, readonly) OWSMessageDecryptJobFinder *finder;
|
|
|
|
|
@property (nonatomic) BOOL isDrainingQueue;
|
|
|
|
|
|
|
|
|
|
- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager
|
|
|
|
|
batchMessageProcessor:(OWSBatchMessageProcessor *)batchMessageProcessor
|
|
|
|
|
finder:(OWSMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER;
|
|
|
|
|
finder:(OWSMessageDecryptJobFinder *)finder NS_DESIGNATED_INITIALIZER;
|
|
|
|
|
- (instancetype)init NS_UNAVAILABLE;
|
|
|
|
|
|
|
|
|
|
@end
|
|
|
|
|
|
|
|
|
|
#pragma mark -
|
|
|
|
|
|
|
|
|
|
@implementation OWSMessageProcessingQueue
|
|
|
|
|
@implementation OWSMessageDecryptQueue
|
|
|
|
|
|
|
|
|
|
- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager
|
|
|
|
|
batchMessageProcessor:(OWSBatchMessageProcessor *)batchMessageProcessor
|
|
|
|
|
finder:(OWSMessageProcessingJobFinder *)finder
|
|
|
|
|
finder:(OWSMessageDecryptJobFinder *)finder
|
|
|
|
|
{
|
|
|
|
|
OWSSingletonAssert();
|
|
|
|
|
|
|
|
|
@ -294,7 +287,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
{
|
|
|
|
|
AssertIsOnMainThread();
|
|
|
|
|
|
|
|
|
|
NSArray<OWSMessageProcessingJob *> *jobs = [self.finder nextJobsForBatchSize:kIncomingMessageBatchSize];
|
|
|
|
|
NSArray<OWSMessageDecryptJob *> *jobs = [self.finder nextJobsForBatchSize:kIncomingMessageBatchSize];
|
|
|
|
|
OWSAssert(jobs);
|
|
|
|
|
if (jobs.count < 1) {
|
|
|
|
|
self.isDrainingQueue = NO;
|
|
|
|
@ -305,19 +298,17 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
[self processJobs:jobs
|
|
|
|
|
completion:^{
|
|
|
|
|
dispatch_async(dispatch_get_main_queue(), ^{
|
|
|
|
|
for (OWSMessageProcessingJob *job in jobs) {
|
|
|
|
|
[self.finder removeJobWithId:job.uniqueId];
|
|
|
|
|
}
|
|
|
|
|
[self.finder removeJobsWithIds:jobs.uniqueIds];
|
|
|
|
|
DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.",
|
|
|
|
|
self.tag,
|
|
|
|
|
jobs.count,
|
|
|
|
|
[OWSMessageProcessingJob numberOfKeysInCollection]);
|
|
|
|
|
[OWSMessageDecryptJob numberOfKeysInCollection]);
|
|
|
|
|
[self drainQueueWorkStep];
|
|
|
|
|
});
|
|
|
|
|
}];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (void)processJobs:(NSArray<OWSMessageProcessingJob *> *)jobs completion:(void (^)())completion
|
|
|
|
|
- (void)processJobs:(NSArray<OWSMessageDecryptJob *> *)jobs completion:(void (^)())completion
|
|
|
|
|
{
|
|
|
|
|
[self processJobs:jobs
|
|
|
|
|
unprocessedJobs:[jobs mutableCopy]
|
|
|
|
@ -325,8 +316,8 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
completion:completion];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
- (void)processJobs:(NSArray<OWSMessageProcessingJob *> *)jobs
|
|
|
|
|
unprocessedJobs:(NSMutableArray<OWSMessageProcessingJob *> *)unprocessedJobs
|
|
|
|
|
- (void)processJobs:(NSArray<OWSMessageDecryptJob *> *)jobs
|
|
|
|
|
unprocessedJobs:(NSMutableArray<OWSMessageDecryptJob *> *)unprocessedJobs
|
|
|
|
|
plaintextDataMap:(NSMutableDictionary<NSString *, NSData *> *)plaintextDataMap
|
|
|
|
|
completion:(void (^)())completion
|
|
|
|
|
{
|
|
|
|
@ -334,7 +325,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
OWSAssert(unprocessedJobs.count <= jobs.count);
|
|
|
|
|
|
|
|
|
|
if (unprocessedJobs.count < 1) {
|
|
|
|
|
for (OWSMessageProcessingJob *job in jobs) {
|
|
|
|
|
for (OWSMessageDecryptJob *job in jobs) {
|
|
|
|
|
NSData *_Nullable plaintextData = plaintextDataMap[job.uniqueId];
|
|
|
|
|
[self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData];
|
|
|
|
|
}
|
|
|
|
@ -344,7 +335,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
|
|
|
|
|
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
|
|
|
|
|
OWSAssert(unprocessedJobs.count > 0);
|
|
|
|
|
OWSMessageProcessingJob *job = unprocessedJobs.firstObject;
|
|
|
|
|
OWSMessageDecryptJob *job = unprocessedJobs.firstObject;
|
|
|
|
|
[unprocessedJobs removeObjectAtIndex:0];
|
|
|
|
|
[self.messagesManager decryptEnvelope:job.envelopeProto
|
|
|
|
|
successBlock:^(NSData *_Nullable plaintextData) {
|
|
|
|
@ -383,7 +374,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
|
|
|
|
|
@interface OWSMessageReceiver ()
|
|
|
|
|
|
|
|
|
|
@property (nonatomic, readonly) OWSMessageProcessingQueue *processingQueue;
|
|
|
|
|
@property (nonatomic, readonly) OWSMessageDecryptQueue *processingQueue;
|
|
|
|
|
@property (nonatomic, readonly) YapDatabaseConnection *dbConnection;
|
|
|
|
|
|
|
|
|
|
@end
|
|
|
|
@ -403,11 +394,11 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
return self;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
OWSMessageProcessingJobFinder *finder = [[OWSMessageProcessingJobFinder alloc] initWithDBConnection:dbConnection];
|
|
|
|
|
OWSMessageProcessingQueue *processingQueue =
|
|
|
|
|
[[OWSMessageProcessingQueue alloc] initWithMessagesManager:messagesManager
|
|
|
|
|
batchMessageProcessor:batchMessageProcessor
|
|
|
|
|
finder:finder];
|
|
|
|
|
OWSMessageDecryptJobFinder *finder = [[OWSMessageDecryptJobFinder alloc] initWithDBConnection:dbConnection];
|
|
|
|
|
OWSMessageDecryptQueue *processingQueue =
|
|
|
|
|
[[OWSMessageDecryptQueue alloc] initWithMessagesManager:messagesManager
|
|
|
|
|
batchMessageProcessor:batchMessageProcessor
|
|
|
|
|
finder:finder];
|
|
|
|
|
|
|
|
|
|
_processingQueue = processingQueue;
|
|
|
|
|
|
|
|
|
@ -442,7 +433,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
|
|
|
|
|
|
|
|
|
|
+ (void)syncRegisterDatabaseExtension:(YapDatabase *)database
|
|
|
|
|
{
|
|
|
|
|
[OWSMessageProcessingJobFinder syncRegisterDatabaseExtension:database];
|
|
|
|
|
[OWSMessageDecryptJobFinder syncRegisterDatabaseExtension:database];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#pragma mark - instance methods
|
|
|
|
|