Merge pull request #26 from loki-project/long-polling

Long polling
pull/27/head
gamabuntan 5 years ago committed by GitHub
commit beab54ebf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -560,7 +560,6 @@
B8162F0522892C5F00D46544 /* FriendRequestViewDelegate.swift in Sources */ = {isa = PBXBuildFile; fileRef = B8162F0422892C5F00D46544 /* FriendRequestViewDelegate.swift */; };
B821F2F82272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = B821F2F72272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift */; };
B821F2FA2272CEEE002C88C0 /* OnboardingKeyPairViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = B821F2F92272CEEE002C88C0 /* OnboardingKeyPairViewController.swift */; };
B843951A228510FB000563FE /* Poller.swift in Sources */ = {isa = PBXBuildFile; fileRef = B8439519228510FB000563FE /* Poller.swift */; };
B90418E6183E9DD40038554A /* DateUtil.m in Sources */ = {isa = PBXBuildFile; fileRef = B90418E5183E9DD40038554A /* DateUtil.m */; };
B9EB5ABD1884C002007CBB57 /* MessageUI.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = B9EB5ABC1884C002007CBB57 /* MessageUI.framework */; };
BFF3FB9730634F37D25903F4 /* Pods_Signal.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D17BB5C25D615AB49813100C /* Pods_Signal.framework */; };
@ -1348,7 +1347,6 @@
B8162F0422892C5F00D46544 /* FriendRequestViewDelegate.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FriendRequestViewDelegate.swift; sourceTree = "<group>"; };
B821F2F72272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OnboardingAccountDetailsViewController.swift; sourceTree = "<group>"; };
B821F2F92272CEEE002C88C0 /* OnboardingKeyPairViewController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OnboardingKeyPairViewController.swift; sourceTree = "<group>"; };
B8439519228510FB000563FE /* Poller.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = Poller.swift; path = Signal/src/Loki/Poller.swift; sourceTree = SOURCE_ROOT; };
B90418E4183E9DD40038554A /* DateUtil.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DateUtil.h; sourceTree = "<group>"; };
B90418E5183E9DD40038554A /* DateUtil.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = DateUtil.m; sourceTree = "<group>"; };
B97940251832BD2400BD66CB /* UIUtil.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = UIUtil.h; sourceTree = "<group>"; };
@ -2607,7 +2605,6 @@
B8162F0422892C5F00D46544 /* FriendRequestViewDelegate.swift */,
B821F2F72272CED3002C88C0 /* OnboardingAccountDetailsViewController.swift */,
B821F2F92272CEEE002C88C0 /* OnboardingKeyPairViewController.swift */,
B8439519228510FB000563FE /* Poller.swift */,
24A830A12293CD0100F4CAC0 /* LokiP2PServer.swift */,
);
path = Loki;
@ -3598,7 +3595,6 @@
454A84042059C787008B8C75 /* MediaTileViewController.swift in Sources */,
340FC8B4204DAC8D007AEB0F /* OWSBackupSettingsViewController.m in Sources */,
34D1F0871F8678AA0066283D /* ConversationViewItem.m in Sources */,
B843951A228510FB000563FE /* Poller.swift in Sources */,
451A13B11E13DED2000A50FD /* AppNotifications.swift in Sources */,
34D99CE4217509C2000AFB39 /* AppEnvironment.swift in Sources */,
348570A820F67575004FF32B /* OWSMessageHeaderView.m in Sources */,

@ -171,6 +171,8 @@ static NSTimeInterval launchStartedAt;
OWSLogInfo(@"applicationDidEnterBackground.");
[DDLog flushLog];
[LokiAPI stopLongPolling];
}
- (void)applicationWillEnterForeground:(UIApplication *)application
@ -189,6 +191,7 @@ static NSTimeInterval launchStartedAt;
[DDLog flushLog];
[LokiAPI stopLongPolling];
if (self.lokiP2PServer) { [self.lokiP2PServer stop]; }
}
@ -306,6 +309,9 @@ static NSTimeInterval launchStartedAt;
selector:@selector(registrationLockDidChange:)
name:NSNotificationName_2FAStateDidChange
object:nil];
// Loki - Observe messages received notifications
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(handleNewMessagesReceived:) name:NSNotification.newMessagesReceived object:nil];
OWSLogInfo(@"application: didFinishLaunchingWithOptions completed.");
@ -751,11 +757,8 @@ static NSTimeInterval launchStartedAt;
[self.socketManager requestSocketOpen];
[Environment.shared.contactsManager fetchSystemContactsOnceIfAlreadyAuthorized];
// Loki: Fetch immediately
[[AppEnvironment.shared.messageFetcherJob run] retainUntilComplete];
// Loki: Start poller
[Poller.shared startIfNeeded];
// Loki: Start long polling
[LokiAPI startLongPollingIfNecessary];
// Loki: Tell our friends that we are online
[LokiP2PManager broadcastOnlineStatus];
@ -1178,6 +1181,7 @@ static NSTimeInterval launchStartedAt;
{
OWSLogInfo(@"performing background fetch");
[AppReadiness runNowOrWhenAppDidBecomeReady:^{
// Loki: We don't want to spin up the long poller here as it is probably wasteful on resources
__block AnyPromise *job = [AppEnvironment.shared.messageFetcherJob run].then(^{
// HACK: Call completion handler after n seconds.
//
@ -1359,6 +1363,9 @@ static NSTimeInterval launchStartedAt;
// For non-legacy users, read receipts are on by default.
[self.readReceiptManager setAreReadReceiptsEnabled:YES];
// Start long polling
[LokiAPI startLongPollingIfNecessary];
}
}
@ -1404,6 +1411,23 @@ static NSTimeInterval launchStartedAt;
[UIViewController attemptRotationToDeviceOrientation];
}
#pragma mark - Long polling
- (void)handleNewMessagesReceived:(NSNotification *)notification
{
NSArray *messages = (NSArray *)notification.userInfo[@"messages"];
OWSLogInfo(@"[Loki] Received %lu messages through long polling.", messages.count);
for (SSKProtoEnvelope *envelope in messages) {
NSData *envelopeData = envelope.serializedDataIgnoringErrors;
if (envelopeData != nil) {
[SSKEnvironment.shared.messageReceiver handleReceivedEnvelopeData:envelopeData];
} else {
OWSFailDebug(@"Failed to deserialize envelope.");
}
}
}
#pragma mark - status bar touches
- (void)touchesBegan:(NSSet *)touches withEvent:(UIEvent *)event

@ -1,26 +0,0 @@
import PromiseKit
@objc final class Poller : NSObject {
private var isStarted = false
private var currentJob: Promise<Void>?
// MARK: Configuration
private static let interval: TimeInterval = 5 * 60
// MARK: Initialization
@objc static let shared = Poller()
private override init() { }
// MARK: General
@objc func startIfNeeded() {
guard !isStarted else { return }
Timer.scheduledTimer(timeInterval: Poller.interval, target: self, selector: #selector(poll), userInfo: nil, repeats: true)
isStarted = true
}
@objc private func poll() {
guard currentJob == nil else { return }
currentJob = AppEnvironment.shared.messageFetcherJob.run().ensure { [weak self] in self?.currentJob = nil }
}
}

@ -0,0 +1,118 @@
import PromiseKit
private typealias Callback = () -> Void
public extension LokiAPI {
private static var isLongPolling = false
private static var shouldStopPolling = false
private static var usedSnodes = [LokiAPITarget]()
private static var cancels = [Callback]()
private static let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
/// Start long polling.
/// This will send a notification if new messages were received
@objc public static func startLongPollingIfNecessary() {
guard !isLongPolling else { return }
isLongPolling = true
shouldStopPolling = false
Logger.info("[Loki] Started long polling")
longPoll()
}
/// Stop long polling
@objc public static func stopLongPolling() {
shouldStopPolling = true
isLongPolling = false
usedSnodes.removeAll()
cancelAllPromises()
Logger.info("[Loki] Stopped long polling")
}
/// The long polling loop
private static func longPoll() {
// This is here so we can stop the infinite loop
guard !shouldStopPolling else { return }
getSwarm(for: hexEncodedPublicKey).then { _ -> Guarantee<[Result<Void>]> in
var promises = [Promise<Void>]()
let connections = 3
for i in 0..<connections {
let (promise, cancel) = openConnection()
promises.append(promise)
cancels.append(cancel)
}
return when(resolved: promises)
}.done { _ in
// Since all promises are complete, we can clear the cancels
cancelAllPromises()
// Keep long polling until it is stopped
longPoll()
}.retainUntilComplete()
}
private static func cancelAllPromises() {
cancels.forEach { cancel in cancel() }
cancels.removeAll()
}
private static func getUnusedSnodes() -> [LokiAPITarget] {
let snodes = LokiAPI.swarmCache[hexEncodedPublicKey] ?? []
return snodes.filter { !usedSnodes.contains($0) }
}
/// Open a connection to an unused snode and get messages from it
private static func openConnection() -> (Promise<Void>, cancel: Callback) {
var isCancelled = false
let cancel = {
isCancelled = true
}
func connectToNextSnode() -> Promise<Void> {
guard let nextSnode = getUnusedSnodes().first else {
// We don't have anymore unused snodes
return Promise.value(())
}
// Add the snode to the used array
usedSnodes.append(nextSnode)
func getMessagesInfinitely(from target: LokiAPITarget) -> Promise<Void> {
// The only way to exit the infinite loop is to throw an error 3 times or cancel
return getRawMessages(from: target, useLongPolling: true).then { rawResponse -> Promise<Void> in
// Check if we need to abort
guard !isCancelled else { throw PMKError.cancelled }
// Process the messages
let messages = parseRawMessagesResponse(rawResponse, from: target)
// Send our messages as a notification
NotificationCenter.default.post(name: .newMessagesReceived, object: nil, userInfo: ["messages": messages])
// Continue fetching if we haven't cancelled
return getMessagesInfinitely(from: target)
}.retryingIfNeeded(maxRetryCount: 3)
}
// Keep getting messages for this snode
// If we errored out then connect to the next snode
return getMessagesInfinitely(from: nextSnode).recover { _ -> Promise<Void> in
// Cancelled, so just return successfully
guard !isCancelled else { return Promise.value(()) }
// Connect to the next snode if we haven't cancelled
// We also need to remove the cached snode so we don't contact it again
dropIfNeeded(nextSnode, hexEncodedPublicKey: hexEncodedPublicKey)
return connectToNextSnode()
}
}
// Keep connecting to snodes
return (connectToNextSnode(), cancel)
}
}

@ -11,7 +11,7 @@ public extension LokiAPI {
private static let swarmCacheKey = "swarmCacheKey"
private static let swarmCacheCollection = "swarmCacheCollection"
fileprivate static var swarmCache: [String:[LokiAPITarget]] {
internal static var swarmCache: [String:[LokiAPITarget]] {
get {
var result: [String:[LokiAPITarget]]? = nil
storage.dbReadConnection.read { transaction in
@ -26,6 +26,14 @@ public extension LokiAPI {
}
}
internal static func dropIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) {
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
}
}
// MARK: Internal API
private static func getRandomSnode() -> Promise<LokiAPITarget> {
return Promise<LokiAPITarget> { seal in
@ -33,7 +41,7 @@ public extension LokiAPI {
}
}
private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> {
internal static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> {
if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= minimumSnodeCount {
return Promise<[LokiAPITarget]> { $0.fulfill(cachedSwarm) }
} else {
@ -76,11 +84,7 @@ internal extension Promise {
case 421:
// The snode isn't associated with the given public key anymore
Logger.warn("[Loki] Invalidating swarm for: \(hexEncodedPublicKey).")
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
}
LokiAPI.dropIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey)
default: break
}
}

@ -6,7 +6,8 @@ import PromiseKit
// MARK: Settings
private static let version = "v1"
private static let maxRetryCount: UInt = 3
public static let defaultMessageTTL: UInt64 = 1 * 24 * 60 * 60 * 1000
private static let longPollingTimeout: TimeInterval = 40
public static let defaultMessageTTL: UInt64 = 24 * 60 * 60 * 1000
// MARK: Types
public typealias RawResponse = Any
@ -31,25 +32,30 @@ import PromiseKit
override private init() { }
// MARK: Internal API
internal static func invoke(_ method: LokiAPITarget.Method, on target: LokiAPITarget, associatedWith hexEncodedPublicKey: String, parameters: [String:Any] = [:]) -> RawResponsePromise {
internal static func invoke(_ method: LokiAPITarget.Method, on target: LokiAPITarget, associatedWith hexEncodedPublicKey: String,
parameters: [String:Any], headers: [String:String]? = nil, timeout: TimeInterval? = nil) -> RawResponsePromise {
let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")!
let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ])
if let headers = headers { request.allHTTPHeaderFields = headers }
if let timeout = timeout { request.timeoutInterval = timeout }
return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }
.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey).recoveringNetworkErrorsIfNeeded()
}
internal static func getRawMessages(from target: LokiAPITarget, useLongPolling: Bool) -> RawResponsePromise {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
let lastHashValue = getLastMessageHashValue(for: target) ?? ""
let parameters = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ]
let headers: [String:String]? = useLongPolling ? [ "X-Loki-Long-Poll" : "true" ] : nil
let timeout: TimeInterval? = useLongPolling ? longPollingTimeout : nil
return invoke(.getMessages, on: target, associatedWith: hexEncodedPublicKey, parameters: parameters, headers: headers, timeout: timeout)
}
// MARK: Public API
public static func getMessages() -> Promise<Set<MessageListPromise>> {
let hexEncodedPublicKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
let lastHashValue = getLastMessageHashValue(for: targetSnode) ?? ""
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHashValue ]
return invoke(.getMessages, on: targetSnode, associatedWith: hexEncodedPublicKey, parameters: parameters).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
return getRawMessages(from: targetSnode, useLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) }
}.map { Set($0) }.retryingIfNeeded(maxRetryCount: maxRetryCount)
}
@ -101,12 +107,19 @@ import PromiseKit
// The parsing utilities below use a best attempt approach to parsing; they warn for parsing failures but don't throw exceptions.
internal static func parseRawMessagesResponse(_ rawResponse: Any, from target: LokiAPITarget) -> [SSKProtoEnvelope] {
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: target, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
return parseProtoEnvelopes(from: newRawMessages)
}
private static func updateLastMessageHashValueIfPossible(for target: LokiAPITarget, from rawMessages: [JSON]) {
guard let lastMessage = rawMessages.last, let hashValue = lastMessage["hash"] as? String, let expiresAt = lastMessage["expiration"] as? Int else {
if let lastMessage = rawMessages.last, let hashValue = lastMessage["hash"] as? String, let expirationDate = lastMessage["expiration"] as? Int {
setLastMessageHashValue(for: target, hashValue: hashValue, expiresAt: UInt64(expirationDate))
} else if (!rawMessages.isEmpty) {
Logger.warn("[Loki] Failed to update last message hash value from: \(rawMessages).")
return
}
setLastMessageHashValue(for: target, hashValue: hashValue, expiresAt: UInt64(expiresAt))
}
private static func removeDuplicates(from rawMessages: [JSON]) -> [JSON] {

@ -174,12 +174,12 @@
AssertIsOnMainThread()
guard let message = onlineBroadcastMessage(forThread: thread) else {
owsFailDebug("P2P Address not set")
Logger.warn("[Loki] P2P address not set.")
return
}
messageSender.sendPromise(message: message).catch { error in
Logger.warn("Failed to send online status to \(thread.contactIdentifier())")
Logger.warn("Failed to send online status to \(thread.contactIdentifier()).")
}.retainUntilComplete()
}
@ -198,7 +198,7 @@
private static func createLokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? {
guard let ourAddress = ourP2PAddress else {
Logger.error("P2P Address not set")
Logger.warn("[Loki] P2P address not set.")
return nil
}

@ -1,4 +0,0 @@
public extension Notification.Name {
public static let contactOnlineStatusChanged = Notification.Name("contactOnlineStatusChanged")
}

@ -1,5 +1,7 @@
public extension Notification.Name {
public static let contactOnlineStatusChanged = Notification.Name("contactOnlineStatusChanged")
public static let newMessagesReceived = Notification.Name("newMessagesReceived")
public static let threadFriendRequestStatusChanged = Notification.Name("threadFriendRequestStatusChanged")
public static let messageFriendRequestStatusChanged = Notification.Name("messageFriendRequestStatusChanged")
}
@ -7,6 +9,8 @@ public extension Notification.Name {
// MARK: - Obj-C
@objc public extension NSNotification {
@objc public static let contactOnlineStatusChanged = Notification.Name.contactOnlineStatusChanged.rawValue as NSString
@objc public static let newMessagesReceived = Notification.Name.newMessagesReceived.rawValue as NSString
@objc public static let threadFriendRequestStatusChanged = Notification.Name.threadFriendRequestStatusChanged.rawValue as NSString
@objc public static let messageFriendRequestStatusChanged = Notification.Name.messageFriendRequestStatusChanged.rawValue as NSString
}
Loading…
Cancel
Save