Added long polling.

pull/26/head
Mikunj 6 years ago
parent d0bed4b129
commit 04bdaff3c6

@ -1 +1 @@
Subproject commit bff3e9db302b7808bf99451c738138c326ad16e3
Subproject commit 0c0008c2306e7b5be87c2c943fade9bd5612ee17

@ -560,7 +560,6 @@
B8162F0522892C5F00D46544 /* FriendRequestViewDelegate.swift in Sources */ = {isa = PBXBuildFile; fileRef = B8162F0422892C5F00D46544 /* FriendRequestViewDelegate.swift */; };
B821F2F82272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = B821F2F72272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift */; };
B821F2FA2272CEEE002C88C0 /* OnboardingKeyPairViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = B821F2F92272CEEE002C88C0 /* OnboardingKeyPairViewController.swift */; };
B843951A228510FB000563FE /* Poller.swift in Sources */ = {isa = PBXBuildFile; fileRef = B8439519228510FB000563FE /* Poller.swift */; };
B90418E6183E9DD40038554A /* DateUtil.m in Sources */ = {isa = PBXBuildFile; fileRef = B90418E5183E9DD40038554A /* DateUtil.m */; };
B9EB5ABD1884C002007CBB57 /* MessageUI.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = B9EB5ABC1884C002007CBB57 /* MessageUI.framework */; };
BFF3FB9730634F37D25903F4 /* Pods_Signal.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D17BB5C25D615AB49813100C /* Pods_Signal.framework */; };
@ -1348,7 +1347,6 @@
B8162F0422892C5F00D46544 /* FriendRequestViewDelegate.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FriendRequestViewDelegate.swift; sourceTree = "<group>"; };
B821F2F72272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OnboardingAccountDetailsViewController.swift; sourceTree = "<group>"; };
B821F2F92272CEEE002C88C0 /* OnboardingKeyPairViewController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OnboardingKeyPairViewController.swift; sourceTree = "<group>"; };
B8439519228510FB000563FE /* Poller.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = Poller.swift; path = Signal/src/Loki/Poller.swift; sourceTree = SOURCE_ROOT; };
B90418E4183E9DD40038554A /* DateUtil.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DateUtil.h; sourceTree = "<group>"; };
B90418E5183E9DD40038554A /* DateUtil.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = DateUtil.m; sourceTree = "<group>"; };
B97940251832BD2400BD66CB /* UIUtil.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = UIUtil.h; sourceTree = "<group>"; };
@ -2607,7 +2605,6 @@
B8162F0422892C5F00D46544 /* FriendRequestViewDelegate.swift */,
B821F2F72272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift */,
B821F2F92272CEEE002C88C0 /* OnboardingKeyPairViewController.swift */,
B8439519228510FB000563FE /* Poller.swift */,
24A830A12293CD0100F4CAC0 /* LokiP2PServer.swift */,
);
path = Loki;
@ -3598,7 +3595,6 @@
454A84042059C787008B8C75 /* MediaTileViewController.swift in Sources */,
340FC8B4204DAC8D007AEB0F /* OWSBackupSettingsViewController.m in Sources */,
34D1F0871F8678AA0066283D /* ConversationViewItem.m in Sources */,
B843951A228510FB000563FE /* Poller.swift in Sources */,
451A13B11E13DED2000A50FD /* AppNotifications.swift in Sources */,
34D99CE4217509C2000AFB39 /* AppEnvironment.swift in Sources */,
348570A820F67575004FF32B /* OWSMessageHeaderView.m in Sources */,

@ -7,7 +7,7 @@
<key>CarthageVersion</key>
<string>0.33.0</string>
<key>OSXVersion</key>
<string>10.14.5</string>
<string>10.14.4</string>
<key>WebRTCCommit</key>
<string>1445d719bf05280270e9f77576f80f973fd847f8 M73</string>
</dict>

@ -171,6 +171,8 @@ static NSTimeInterval launchStartedAt;
OWSLogInfo(@"applicationDidEnterBackground.");
[DDLog flushLog];
[LokiAPI stopLongPolling];
}
- (void)applicationWillEnterForeground:(UIApplication *)application
@ -189,6 +191,7 @@ static NSTimeInterval launchStartedAt;
[DDLog flushLog];
[LokiAPI stopLongPolling];
if (self.lokiP2PServer) { [self.lokiP2PServer stop]; }
}
@ -306,6 +309,9 @@ static NSTimeInterval launchStartedAt;
selector:@selector(registrationLockDidChange:)
name:NSNotificationName_2FAStateDidChange
object:nil];
// Loki Message received
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(receivedNewMessages:) name:NSNotification.receivedNewMessages object:nil];
OWSLogInfo(@"application: didFinishLaunchingWithOptions completed.");
@ -751,11 +757,8 @@ static NSTimeInterval launchStartedAt;
[self.socketManager requestSocketOpen];
[Environment.shared.contactsManager fetchSystemContactsOnceIfAlreadyAuthorized];
// Loki: Fetch immediately
[[AppEnvironment.shared.messageFetcherJob run] retainUntilComplete];
// Loki: Start poller
[Poller.shared startIfNeeded];
// Loki: Start long polling
[LokiAPI startLongPollingIfNecessary];
// Loki: Tell our friends that we are online
[LokiP2PManager broadcastOnlineStatus];
@ -1178,6 +1181,7 @@ static NSTimeInterval launchStartedAt;
{
OWSLogInfo(@"performing background fetch");
[AppReadiness runNowOrWhenAppDidBecomeReady:^{
// Loki: We don't want to spin up the long poller here as it is probably wasteful on resources
__block AnyPromise *job = [AppEnvironment.shared.messageFetcherJob run].then(^{
// HACK: Call completion handler after n seconds.
//
@ -1359,6 +1363,9 @@ static NSTimeInterval launchStartedAt;
// For non-legacy users, read receipts are on by default.
[self.readReceiptManager setAreReadReceiptsEnabled:YES];
// Start long polling
[LokiAPI startLongPollingIfNecessary];
}
}
@ -1404,6 +1411,24 @@ static NSTimeInterval launchStartedAt;
[UIViewController attemptRotationToDeviceOrientation];
}
#pragma mark - Long polling
- (void)receivedNewMessages:(NSNotification *)notification
{
NSArray *messages = (NSArray *)notification.userInfo[@"messages"];
for (SSKProtoEnvelope *envelope in messages) {
OWSLogInfo(@"[Loki] Received messages from long polling");
@try {
NSData *envelopeData = envelope.serializedDataIgnoringErrors;
if (envelopeData != nil) {
[SSKEnvironment.shared.messageReceiver handleReceivedEnvelopeData:envelopeData];
}
} @catch (NSException *exception) {
OWSFailDebug(@"Failed to serialize envelope");
}
}
}
#pragma mark - status bar touches
- (void)touchesBegan:(NSSet *)touches withEvent:(UIEvent *)event

@ -1,26 +0,0 @@
import PromiseKit
@objc final class Poller : NSObject {
private var isStarted = false
private var currentJob: Promise<Void>?
// MARK: Configuration
private static let interval: TimeInterval = 5 * 60
// MARK: Initialization
@objc static let shared = Poller()
private override init() { }
// MARK: General
@objc func startIfNeeded() {
guard !isStarted else { return }
Timer.scheduledTimer(timeInterval: Poller.interval, target: self, selector: #selector(poll), userInfo: nil, repeats: true)
isStarted = true
}
@objc private func poll() {
guard currentJob == nil else { return }
currentJob = AppEnvironment.shared.messageFetcherJob.run().ensure { [weak self] in self?.currentJob = nil }
}
}

@ -0,0 +1,117 @@
import PromiseKit
private typealias Callback = () -> Void
public extension LokiAPI {
private static var isLongPolling = false
private static var shouldStopPolling = false
private static var usedSnodes = [Target]()
private static var cancels = [Callback]()
private static let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
/// Start long polling.
/// This will send a notification if new messages were received
@objc public static func startLongPollingIfNecessary() {
guard !isLongPolling else { return }
isLongPolling = true
shouldStopPolling = false
Logger.info("[Loki] Started long polling")
longPoll()
}
/// Stop long polling
@objc public static func stopLongPolling() {
shouldStopPolling = true
isLongPolling = false
usedSnodes.removeAll()
cancelAllPromises()
Logger.info("[Loki] Stopped long polling")
}
/// The long polling loop
private static func longPoll() {
// This is here so we can stop the infinite loop
guard !shouldStopPolling else { return }
fetchSwarmIfNeeded(for: hexEncodedPublicKey).then { _ -> Guarantee<[Result<Void>]> in
var promises = [Promise<Void>]()
let connections = 3
for i in 0..<connections {
let (promise, cancel) = openConnection()
promises.append(promise)
cancels.append(cancel)
}
return when(resolved: promises)
}.done { _ in
// Since all promises are complete, we can clear the cancels
cancelAllPromises()
// Keep long polling until it is stopped
longPoll()
}.retainUntilComplete()
}
private static func cancelAllPromises() {
cancels.forEach { cancel in cancel() }
cancels.removeAll()
}
private static func getUnusedSnodes() -> [Target] {
let snodes = getCachedSnodes(for: hexEncodedPublicKey)
return snodes.filter { !usedSnodes.contains($0) }
}
/// Open a connection to an unused snode and get messages from it
private static func openConnection() -> (Promise<Void>, cancel: Callback) {
var isCancelled = false
let cancel = {
isCancelled = true
}
func connectToNextSnode() -> Promise<Void> {
guard let nextSnode = getUnusedSnodes().first else {
// We don't have anymore unused snodes
return Promise.value(())
}
// Add the snode to the used array
usedSnodes.append(nextSnode)
func getMessagesInfinitely(from target: Target) -> Promise<Void> {
// The only way to exit the infinite loop is to throw an error 3 times or cancel
return getMessages(from: target).then { messages -> Promise<Void> in
// Send our messages as a notification
NotificationCenter.default.post(name: .receivedNewMessages, object: nil, userInfo: ["messages": messages])
// Check if we need to abort
guard !isCancelled else { throw PMKError.cancelled }
// Continue fetching if we haven't cancelled
return getMessagesInfinitely(from: target)
}.retryingIfNeeded(maxRetryCount: 3)
}
// Keep getting messages for this snode
// If we errored out then connect to the next snode
return getMessagesInfinitely(from: nextSnode).recover { _ -> Promise<Void> in
// Connect to the next snode if we haven't cancelled
if (!isCancelled) {
// We also need to remove the cached snode so we don't contact it again
removeCachedSnode(nextSnode, for: hexEncodedPublicKey)
return connectToNextSnode()
}
// Cancelled, so just return successfully
return Promise.value(())
}
}
// Keep connecting to snodes
return (connectToNextSnode(), cancel)
}
}

@ -35,7 +35,16 @@ public extension LokiAPI {
}
}
private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> {
internal static func getCachedSnodes(for hexEncodedPublicKey: String) -> [Target] {
return swarmCache[hexEncodedPublicKey] ?? []
}
internal static func removeCachedSnode(_ target: Target, for hexEncodedPublicKey: String) {
guard let cache = swarmCache[hexEncodedPublicKey] else { return }
swarmCache[hexEncodedPublicKey] = cache.filter { $0 != target }
}
internal static func fetchSwarmIfNeeded(for hexEncodedPublicKey: String) -> Promise<[Target]> {
if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= minimumSnodeCount {
return Promise<[Target]> { $0.fulfill(cachedSwarm) }
} else {
@ -47,7 +56,7 @@ public extension LokiAPI {
// MARK: Public API
internal static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<[Target]> {
// shuffled() uses the system's default random generator, which is cryptographically secure
return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSnodeCount)) }
return fetchSwarmIfNeeded(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSnodeCount)) }
}
// MARK: Parsing

@ -31,9 +31,14 @@ import PromiseKit
override private init() { }
// MARK: Internal API
internal static func invoke(_ method: Target.Method, on target: Target, associatedWith hexEncodedPublicKey: String, parameters: [String:Any] = [:]) -> RawResponsePromise {
internal static func invoke(_ method: Target.Method, on target: Target, associatedWith hexEncodedPublicKey: String, parameters: [String:Any] = [:], headers: [String:String] = [:], timeout: TimeInterval? = nil) -> RawResponsePromise {
let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")!
let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ])
request.allHTTPHeaderFields = headers
if let timeout = timeout {
request.timeoutInterval = timeout
}
return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }
.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey).recoveringNetworkErrorsIfNeeded()
}
@ -42,17 +47,24 @@ import PromiseKit
public static func getMessages() -> Promise<Set<MessageListPromise>> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
let lastHashValue = getLastMessageHashValue(for: targetSnode) ?? ""
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ]
return invoke(.getMessages, on: targetSnode, associatedWith: hexEncodedPublicKey, parameters: parameters).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
return getMessages(from: targetSnode, longPolling: false)
}.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount)
}
internal static func getMessages(from target: Target, longPolling: Bool = true) -> MessageListPromise {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
let lastHashValue = getLastMessageHashValue(for: target) ?? ""
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ]
let headers = longPolling ? ["X-Loki-Long-Poll" : "true"] : [:]
let timeout: TimeInterval? = longPolling ? 40 : nil // 40 second timeout
return invoke(.getMessages, on: target, associatedWith: hexEncodedPublicKey, parameters: parameters, headers: headers, timeout: timeout).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: target, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
}
public static func sendSignalMessage(_ signalMessage: SignalMessage, with timestamp: UInt64, onP2PSuccess: @escaping () -> Void) -> Promise<Set<RawResponsePromise>> {
guard let lokiMessage = Message.from(signalMessage: signalMessage, with: timestamp) else { return Promise(error: Error.messageConversionFailed) }
let destination = lokiMessage.destination
@ -103,7 +115,7 @@ import PromiseKit
private static func updateLastMessageHashValueIfPossible(for target: Target, from rawMessages: [JSON]) {
guard let lastMessage = rawMessages.last, let hashValue = lastMessage["hash"] as? String, let expiresAt = lastMessage["expiration"] as? Int else {
Logger.warn("[Loki] Failed to update last message hash value from: \(rawMessages).")
if rawMessages.count > 0 { Logger.warn("[Loki] Failed to update last message hash value from: \(rawMessages).") }
return
}
setLastMessageHashValue(for: target, hashValue: hashValue, expiresAt: UInt64(expiresAt))

@ -1,4 +0,0 @@
public extension Notification.Name {
public static let contactOnlineStatusChanged = Notification.Name("contactOnlineStatusChanged")
}

@ -1,5 +1,9 @@
public extension Notification.Name {
public static let contactOnlineStatusChanged = Notification.Name("contactOnlineStatusChanged")
public static let receivedNewMessages = Notification.Name("receivedNewMessages")
// Friend request
public static let threadFriendRequestStatusChanged = Notification.Name("threadFriendRequestStatusChanged")
public static let messageFriendRequestStatusChanged = Notification.Name("messageFriendRequestStatusChanged")
}
@ -7,6 +11,10 @@ public extension Notification.Name {
// MARK: - Obj-C
@objc public extension NSNotification {
@objc public static let contactOnlineStatusChanged = Notification.Name.contactOnlineStatusChanged.rawValue as NSString
@objc public static let receivedNewMessages = Notification.Name.receivedNewMessages.rawValue as NSString
// Friend request
@objc public static let threadFriendRequestStatusChanged = Notification.Name.threadFriendRequestStatusChanged.rawValue as NSString
@objc public static let messageFriendRequestStatusChanged = Notification.Name.messageFriendRequestStatusChanged.rawValue as NSString
}
Loading…
Cancel
Save