Move contact intersection into batched operation

// FREEBIE
pull/1/head
Michael Kirk 7 years ago
parent f277ae877c
commit b7288b2565

@ -9,10 +9,18 @@
#import "OWSRequestFactory.h"
#import "PhoneNumber.h"
#import "TSNetworkManager.h"
#import <SignalServiceKit/SignalServiceKit-Swift.h>
#import <YapDatabase/YapDatabase.h>
NS_ASSUME_NONNULL_BEGIN
@interface ContactsUpdater ()
@property (nonatomic, readonly) NSOperationQueue *contactIntersectionQueue;
@end
@implementation ContactsUpdater
+ (instancetype)sharedUpdater {
@ -32,6 +40,8 @@ NS_ASSUME_NONNULL_BEGIN
return self;
}
_contactIntersectionQueue = [NSOperationQueue new];
OWSSingletonAssert();
return self;
@ -88,75 +98,36 @@ NS_ASSUME_NONNULL_BEGIN
- (void)contactIntersectionWithSet:(NSSet<NSString *> *)recipientIdsToLookup
success:(void (^)(NSSet<SignalRecipient *> *recipients))success
failure:(void (^)(NSError *error))failure {
failure:(void (^)(NSError *error))failure
{
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
NSMutableDictionary<NSString *, NSString *> *phoneNumbersByHashes = [NSMutableDictionary new];
for (NSString *recipientId in recipientIdsToLookup) {
NSString *hash = [Cryptography truncatedSHA1Base64EncodedWithoutPadding:recipientId];
phoneNumbersByHashes[hash] = recipientId;
}
NSArray<NSString *> *hashes = [phoneNumbersByHashes allKeys];
TSRequest *request = [OWSRequestFactory contactsIntersectionRequestWithHashesArray:hashes];
[[TSNetworkManager sharedManager] makeRequest:request
success:^(NSURLSessionDataTask *task, id responseDict) {
NSMutableSet<NSString *> *registeredRecipientIds = [NSMutableSet new];
if ([responseDict isKindOfClass:[NSDictionary class]]) {
NSArray<NSDictionary *> *_Nullable contactsArray = responseDict[@"contacts"];
if ([contactsArray isKindOfClass:[NSArray class]]) {
for (NSDictionary *contactDict in contactsArray) {
if (![contactDict isKindOfClass:[NSDictionary class]]) {
OWSProdLogAndFail(@"%@ invalid contact dictionary.", self.logTag);
continue;
}
NSString *_Nullable hash = contactDict[@"token"];
if (hash.length < 1) {
OWSProdLogAndFail(@"%@ contact missing hash.", self.logTag);
continue;
}
NSString *_Nullable recipientId = phoneNumbersByHashes[hash];
if (recipientId.length < 1) {
OWSProdLogAndFail(@"%@ An intersecting hash wasn't found in the mapping.", self.logTag);
continue;
}
if (![recipientIdsToLookup containsObject:recipientId]) {
OWSProdLogAndFail(@"%@ Intersection response included unexpected recipient.", self.logTag);
continue;
}
[registeredRecipientIds addObject:recipientId];
}
}
}
NSMutableSet<SignalRecipient *> *recipients = [NSMutableSet new];
[OWSPrimaryStorage.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
for (NSString *recipientId in recipientIdsToLookup) {
if ([registeredRecipientIds containsObject:recipientId]) {
SignalRecipient *recipient =
[SignalRecipient markRecipientAsRegisteredAndGet:recipientId transaction:transaction];
[recipients addObject:recipient];
} else {
[SignalRecipient removeUnregisteredRecipient:recipientId transaction:transaction];
}
}
}];
success([recipients copy]);
}
failure:^(NSURLSessionDataTask *task, NSError *error) {
if (!IsNSErrorNetworkFailure(error)) {
OWSProdError([OWSAnalyticsEvents contactsErrorContactsIntersectionFailed]);
}
NSHTTPURLResponse *response = (NSHTTPURLResponse *)task.response;
if (response.statusCode == 413) {
failure(OWSErrorWithCodeDescription(
OWSErrorCodeContactsUpdaterRateLimit, @"Contacts Intersection Rate Limit"));
} else {
failure(error);
}
}];
OWSContactDiscoveryOperation *operation =
[[OWSContactDiscoveryOperation alloc] initWithRecipientIdsToLookup:recipientIdsToLookup.allObjects];
NSArray<NSOperation *> *operationAndDependencies = [operation.dependencies arrayByAddingObject:operation];
[self.contactIntersectionQueue addOperations:operationAndDependencies waitUntilFinished:YES];
if (operation.failingError != nil) {
failure(operation.failingError);
return;
}
NSSet<NSString *> *registeredRecipientIds = operation.registeredRecipientIds;
NSMutableSet<SignalRecipient *> *recipients = [NSMutableSet new];
[OWSPrimaryStorage.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
for (NSString *recipientId in recipientIdsToLookup) {
if ([registeredRecipientIds containsObject:recipientId]) {
SignalRecipient *recipient =
[SignalRecipient markRecipientAsRegisteredAndGet:recipientId transaction:transaction];
[recipients addObject:recipient];
} else {
[SignalRecipient removeUnregisteredRecipient:recipientId transaction:transaction];
}
}
}];
success([recipients copy]);
});
}

@ -0,0 +1,196 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
//
import Foundation
extension Array {
func chunked(by chunkSize: Int) -> [[Element]] {
return stride(from: 0, to: self.count, by: chunkSize).map {
Array(self[$0..<Swift.min($0 + chunkSize, self.count)])
}
}
}
@objc
class OWSContactDiscoveryOperation: OWSOperation {
// TODO verify proper batch size
// let batchSize = 2048
let batchSize = 10
let recipientIdsToLookup: [String]
@objc
var registeredRecipientIds: Set<String>
@objc
required init(recipientIdsToLookup: [String]) {
self.recipientIdsToLookup = recipientIdsToLookup
self.registeredRecipientIds = Set()
super.init()
Logger.debug("\(logTag) in \(#function) with recipientIdsToLookup: \(recipientIdsToLookup.count)")
for batchIds in recipientIdsToLookup.chunked(by: batchSize) {
let batchOperation = OWSContactDiscoveryBatchOperation(recipientIdsToLookup: batchIds)
self.addDependency(batchOperation)
}
}
// MARK: Mandatory overrides
// Called every retry, this is where the bulk of the operation's work should go.
override func run() {
Logger.debug("\(logTag) in \(#function)")
for dependency in self.dependencies {
guard let batchOperation = dependency as? OWSContactDiscoveryBatchOperation else {
owsFail("\(self.logTag) in \(#function) unexpected dependency: \(dependency)")
continue
}
self.registeredRecipientIds.formUnion(batchOperation.registeredRecipientIds)
}
self.reportSuccess()
}
// MARK: Optional Overrides
// Called one time only
override func checkForPreconditionError() -> Error? {
return super.checkForPreconditionError()
}
// Called at most one time.
override func didSucceed() {
super.didSucceed()
}
// Called at most one time, once retry is no longer possible.
override func didFail(error: Error) {
super.didFail(error: error)
}
}
class OWSContactDiscoveryBatchOperation: OWSOperation {
private let recipientIdsToLookup: [String]
var registeredRecipientIds: Set<String>
required init(recipientIdsToLookup: [String]) {
self.recipientIdsToLookup = recipientIdsToLookup
self.registeredRecipientIds = Set()
super.init()
Logger.debug("\(logTag) in \(#function) with recipientIdsToLookup: \(recipientIdsToLookup.count)")
}
private var networkManager: TSNetworkManager {
return TSNetworkManager.shared()
}
private func parse(response: Any?, phoneNumbersByHashes: [String: String]) throws -> Set<String> {
guard let responseDict = response as? [String: AnyObject] else {
let responseError: NSError = OWSErrorMakeUnableToProcessServerResponseError() as NSError
responseError.isRetryable = true
throw responseError
}
guard let contactDicts = responseDict["contacts"] as? [[String: AnyObject]] else {
let responseError: NSError = OWSErrorMakeUnableToProcessServerResponseError() as NSError
responseError.isRetryable = true
throw responseError
}
var registeredRecipientIds: Set<String> = Set()
for contactDict in contactDicts {
guard let hash = contactDict["token"] as? String, hash.count > 0 else {
owsFail("\(self.logTag) in \(#function) hash was unexpectedly nil")
continue
}
guard let recipientId = phoneNumbersByHashes[hash], recipientId.count > 0 else {
owsFail("\(self.logTag) in \(#function) recipientId was unexpectedly nil")
continue
}
guard recipientIdsToLookup.contains(recipientId) else {
owsFail("\(self.logTag) in \(#function) unexpected recipientId")
continue
}
registeredRecipientIds.insert(recipientId)
}
return registeredRecipientIds
}
// MARK: Mandatory overrides
// Called every retry, this is where the bulk of the operation's work should go.
override func run() {
Logger.debug("\(logTag) in \(#function)")
var phoneNumbersByHashes: [String: String] = [:]
for recipientId in recipientIdsToLookup {
let hash = Cryptography.truncatedSHA1Base64EncodedWithoutPadding(recipientId)
phoneNumbersByHashes[hash] = recipientId
}
let hashes: [String] = Array(phoneNumbersByHashes.keys)
let request = OWSRequestFactory.contactsIntersectionRequest(withHashesArray: hashes)
self.networkManager.makeRequest(request,
success: { (task, responseDict) in
do {
self.registeredRecipientIds = try self.parse(response: responseDict, phoneNumbersByHashes: phoneNumbersByHashes)
self.reportSuccess()
} catch {
self.reportError(error)
}
},
failure: { (task, error) in
if (!IsNSErrorNetworkFailure(error)) {
// FIXME not accessible in swift for some reason.
// OWSProdError(OWSAnalyticsEvents.contactsErrorContactsIntersectionFailed)
}
guard let response = task.response as? HTTPURLResponse else {
let responseError: NSError = OWSErrorMakeUnableToProcessServerResponseError() as NSError
responseError.isRetryable = true
self.reportError(responseError)
return
}
if (response.statusCode == 413) {
let rateLimitError = OWSErrorWithCodeDescription(OWSErrorCode.contactsUpdaterRateLimit, "Contacts Intersection Rate Limit")
self.reportError(rateLimitError)
}
self.reportError(error)
})
}
// MARK: Optional Overrides
// Called one time only
override func checkForPreconditionError() -> Error? {
return super.checkForPreconditionError()
}
// Called at most one time.
override func didSucceed() {
super.didSucceed()
}
// Called at most one time, once retry is no longer possible.
override func didFail(error: Error) {
super.didFail(error: error)
}
}

@ -132,21 +132,9 @@ void AssertIsOnSendingQueue()
- (nullable NSError *)checkForPreconditionError
{
for (NSOperation *dependency in self.dependencies) {
if (![dependency isKindOfClass:[OWSOperation class]]) {
NSString *errorDescription =
[NSString stringWithFormat:@"%@ unknown dependency: %@", self.logTag, dependency.class];
NSError *assertionError = OWSErrorMakeAssertionError(errorDescription);
return assertionError;
}
OWSOperation *upload = (OWSOperation *)dependency;
// Cannot proceed if dependency failed - surface the dependency's error.
NSError *_Nullable dependencyError = upload.failingError;
if (dependencyError) {
return dependencyError;
}
NSError *_Nullable error = [super checkForPreconditionError];
if (error) {
return error;
}
// Sanity check preconditions

@ -41,7 +41,7 @@ typedef NS_ENUM(NSUInteger, TSVerificationTransport) { TSVerificationTransportVo
+ (TSRequest *)availablePreKeysCountRequest;
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray *)hashes;
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray<NSString *> *)hashes;
+ (TSRequest *)currentSignedPreKeyRequest;

@ -116,7 +116,7 @@ NS_ASSUME_NONNULL_BEGIN
return [TSRequest requestWithUrl:[NSURL URLWithString:path] method:@"GET" parameters:@{}];
}
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray *)hashes
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray<NSString *> *)hashes
{
OWSAssert(hashes.count > 0);

@ -0,0 +1,6 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
//
// ObjC classes from which Swift classes inherit must be included in this framework header.
#import "OWSOperation.h"

@ -5,6 +5,7 @@
#import "OWSOperation.h"
#import "NSError+MessageSending.h"
#import "OWSBackgroundTask.h"
#import "OWSError.h"
NS_ASSUME_NONNULL_BEGIN
@ -47,8 +48,23 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
// Called one time only
- (nullable NSError *)checkForPreconditionError
{
// no-op
// Override in subclass if necessary
for (NSOperation *dependency in self.dependencies) {
if (![dependency isKindOfClass:[OWSOperation class]]) {
NSString *errorDescription =
[NSString stringWithFormat:@"%@ unknown dependency: %@", self.logTag, dependency.class];
return OWSErrorMakeAssertionError(errorDescription);
}
OWSOperation *dependentOperation = (OWSOperation *)dependency;
// Don't proceed if dependency failed - surface the dependency's error.
NSError *_Nullable dependencyError = dependentOperation.failingError;
if (dependencyError != nil) {
return dependencyError;
}
}
return nil;
}

Loading…
Cancel
Save