From 88afca30c6be8dbf339d1c4df279ad9f4d58fe61 Mon Sep 17 00:00:00 2001 From: Mikunj Date: Mon, 27 May 2019 09:50:37 +1000 Subject: [PATCH] Moved P2P logic to LokiP2PManager. --- Pods | 2 +- Signal/src/AppDelegate.m | 4 +- SignalServiceKit/src/Loki/API/LokiAPI.swift | 9 +- ...LokiAPI+P2P.swift => LokiP2PManager.swift} | 181 ++++++++++-------- .../src/Messages/OWSMessageManager.m | 14 +- 5 files changed, 117 insertions(+), 93 deletions(-) rename SignalServiceKit/src/Loki/API/{LokiAPI+P2P.swift => LokiP2PManager.swift} (61%) diff --git a/Pods b/Pods index 6eacc62ee..334390694 160000 --- a/Pods +++ b/Pods @@ -1 +1 @@ -Subproject commit 6eacc62ee03ae19105782f2ea60ac8ae46814788 +Subproject commit 3343906944c5db1e40599f4d36c35fd6dc75da20 diff --git a/Signal/src/AppDelegate.m b/Signal/src/AppDelegate.m index 6cfb6395b..4ec18a72b 100644 --- a/Signal/src/AppDelegate.m +++ b/Signal/src/AppDelegate.m @@ -320,7 +320,7 @@ static NSTimeInterval launchStartedAt; if (self.lokiP2PServer.isRunning) { break; } BOOL isStarted = [self.lokiP2PServer startOnPort:port.unsignedIntegerValue]; if (isStarted) { - [LokiAPI setOurP2PAddressWithUrl:self.lokiP2PServer.serverURL]; + [LokiP2PManager setOurP2PAddressWithUrl:self.lokiP2PServer.serverURL]; OWSLogInfo(@"[Loki] Started server at %@.", self.lokiP2PServer.serverURL); break; } @@ -753,7 +753,7 @@ static NSTimeInterval launchStartedAt; [Poller.shared startIfNeeded]; // Loki: Tell our friends that we are online - [LokiAPI broadcastOnlineStatus]; + [LokiP2PManager broadcastOnlineStatus]; if (![UIApplication sharedApplication].isRegisteredForRemoteNotifications) { OWSLogInfo(@"Retrying to register for remote notifications since user hasn't registered yet."); diff --git a/SignalServiceKit/src/Loki/API/LokiAPI.swift b/SignalServiceKit/src/Loki/API/LokiAPI.swift index 706818ed0..8cd2e3cfe 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI.swift +++ b/SignalServiceKit/src/Loki/API/LokiAPI.swift @@ -7,7 +7,7 @@ import PromiseKit private static let version = "v1" public static let defaultMessageTTL: UInt64 = 1 * 24 * 60 * 60 * 1000 - internal static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey + private static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey // MARK: Types public typealias RawResponse = Any @@ -74,18 +74,19 @@ import PromiseKit // If we have the p2p details and we have marked the user as online OR we are pinging the user, then use peer to peer // If that failes then fallback to storage server - if let p2pDetails = contactP2PDetails[destination], message.isPing || p2pDetails.isOnline { + if let p2pDetails = LokiP2PManager.getDetails(forContact: destination), message.isPing || p2pDetails.isOnline { let targets = Promise.wrap([p2pDetails.target]) return sendMessage(message, targets: targets).then { result -> Promise>> in - LokiAPI.setOnline(true, forContact: destination) + LokiP2PManager.setOnline(true, forContact: destination) return Promise.wrap(result) }.recover { error -> Promise>> in // The user is not online - LokiAPI.setOnline(false, forContact: destination) + LokiP2PManager.setOnline(false, forContact: destination) // If it was a ping then don't send to the storage server if (message.isPing) { Logger.warn("[Loki] Failed to ping \(destination) - Marking contact as offline.") + error.isRetryable = false throw error } diff --git a/SignalServiceKit/src/Loki/API/LokiAPI+P2P.swift b/SignalServiceKit/src/Loki/API/LokiP2PManager.swift similarity index 61% rename from SignalServiceKit/src/Loki/API/LokiAPI+P2P.swift rename to SignalServiceKit/src/Loki/API/LokiP2PManager.swift index 8711fdf7a..9d5ce3747 100644 --- a/SignalServiceKit/src/Loki/API/LokiAPI+P2P.swift +++ b/SignalServiceKit/src/Loki/API/LokiP2PManager.swift @@ -1,6 +1,8 @@ -extension LokiAPI { +@objc public class LokiP2PManager : NSObject { + private static let storage = OWSPrimaryStorage.shared() private static let messageSender: MessageSender = SSKEnvironment.shared.messageSender + private static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey /// The amount of time before pinging when a user is set to offline private static let offlinePingTime = 2 * kMinuteInterval @@ -13,15 +15,77 @@ extension LokiAPI { var timerDuration: Double var pingTimer: Timer? = nil - var target: Target { - return Target(address: address, port: port) + var target: LokiAPI.Target { + return LokiAPI.Target(address: address, port: port) } } - internal static var ourP2PAddress: Target? = nil + /// Our p2p address + private static var ourP2PAddress: LokiAPI.Target? = nil /// This is where we store the p2p details of our contacts - internal static var contactP2PDetails = [String: P2PDetails]() + private static var contactP2PDetails = [String: P2PDetails]() + + // MARK: - Public functions + + /// Set our local P2P address + /// + /// - Parameter url: The url to our local server + @objc public static func setOurP2PAddress(url: URL) { + guard let scheme = url.scheme, let host = url.host, let port = url.port else { return } + let target = LokiAPI.Target(address: "\(scheme)://\(host)", port: UInt32(port)) + ourP2PAddress = target + } + + /// Ping a contact + /// + /// - Parameter pubKey: The contact hex pubkey + @objc(pingContact:) + public static func ping(contact pubKey: String) { + // Dispatch on the main queue so we escape any transaction blocks + DispatchQueue.main.async { + guard let thread = TSContactThread.fetch(uniqueId: pubKey) else { + Logger.warn("[Loki][Ping] Failed to fetch thread for \(pubKey)") + return + } + guard let message = lokiAddressMessage(for: thread, isPing: true) else { + Logger.warn("[Loki][Ping] Failed to build ping message for \(pubKey)") + return + } + + messageSender.sendPromise(message: message).retainUntilComplete() + } + } + + /// Broadcash an online message to all our friends. + /// This shouldn't be called inside a transaction. + @objc public static func broadcastOnlineStatus() { + // Escape any transaction blocks + DispatchQueue.main.async { + let friendThreads = getAllFriendThreads() + for thread in friendThreads { + sendOnlineBroadcastMessage(forThread: thread) + } + } + } + + // MARK: - Internal functions + + /// Get the P2P details for the given contact. + /// + /// - Parameter pubKey: The contact hex pubkey + /// - Returns: The P2P Details or nil if they don't exist + internal static func getDetails(forContact pubKey: String) -> P2PDetails? { + return contactP2PDetails[pubKey] + } + + /// Get the `LokiAddressMessage` for the given thread. + /// + /// - Parameter thread: The contact thread. + /// - Returns: The `LokiAddressMessage` for that thread. + @objc public static func onlineBroadcastMessage(forThread thread: TSThread) -> LokiAddressMessage? { + return lokiAddressMessage(for: thread, isPing: false) + } /// Handle P2P logic when we receive a `LokiAddressMessage` /// @@ -30,8 +94,9 @@ extension LokiAPI { /// - address: The pther users p2p address /// - port: The other users p2p port /// - receivedThroughP2P: Wether we received the message through p2p - @objc public static func didReceiveLokiAddressMessage(forContact pubKey: String, address: String, port: UInt32, receivedThroughP2P: Bool, transaction: YapDatabaseReadTransaction) { + @objc internal static func didReceiveLokiAddressMessage(forContact pubKey: String, address: String, port: UInt32, receivedThroughP2P: Bool) { // Stagger the ping timers so that contacts don't ping each other at the same time + let timerDuration = pubKey < ourHexEncodedPubKey ? 1 * kMinuteInterval : 2 * kMinuteInterval // Get out current contact details @@ -49,10 +114,10 @@ extension LokiAPI { /* We need to check if we should ping the user. We don't ping the user IF: - - We had old contact details - - We got a P2P message - - The old contact was set as `Online` - - The new p2p details match the old one + - We had old contact details + - We got a P2P message + - The old contact was set as `Online` + - The new p2p details match the old one */ if oldContactExists && receivedThroughP2P && wasOnline && p2pDetailsMatch { setOnline(true, forContact: pubKey) @@ -62,15 +127,20 @@ extension LokiAPI { /* Ping the contact. This happens in the following scenarios: - 1. We didn't have the contact, we need to ping them to let them know our details. - 2. wasP2PMessage = false, so we assume the contact doesn't have our details. - 3. We had the contact marked as offline, we need to make sure that we can reach their server. - 4. The other contact details have changed, we need to make sure that we can reach their new server. + 1. We didn't have the contact, we need to ping them to let them know our details. + 2. wasP2PMessage = false, so we assume the contact doesn't have our details. + 3. We had the contact marked as offline, we need to make sure that we can reach their server. + 4. The other contact details have changed, we need to make sure that we can reach their new server. */ - ping(contact: pubKey, withTransaction: transaction) + ping(contact: pubKey) } - @objc public static func setOnline(_ isOnline: Bool, forContact pubKey: String) { + /// Mark a contact as online or offline. + /// + /// - Parameters: + /// - isOnline: Whether to set the contact to online or offline. + /// - pubKey: The contact hexh pubKey + @objc internal static func setOnline(_ isOnline: Bool, forContact pubKey: String) { // Make sure we are on the main thread DispatchQueue.main.async { guard var details = contactP2PDetails[pubKey] else { return } @@ -86,67 +156,9 @@ extension LokiAPI { } } - @objc public static func ping(contact pubKey: String) { - AssertIsOnMainThread() - storage.dbReadConnection.read { transaction in - ping(contact: pubKey, withTransaction: transaction) - } - } - - @objc public static func ping(contact pubKey: String, withTransaction transaction: YapDatabaseReadTransaction) { - guard let thread = TSContactThread.fetch(uniqueId: pubKey, transaction: transaction) else { - Logger.warn("[Loki][Ping] Failed to fetch thread for \(pubKey)") - return - } - guard let message = lokiAddressMessage(for: thread, isPing: true) else { - Logger.warn("[Loki][Ping] Failed to build ping message for \(pubKey)") - return - } - - messageSender.sendPromise(message: message).retainUntilComplete() - } - - /// Set our local P2P address - /// - /// - Parameter url: The url to our local server - @objc public static func setOurP2PAddress(url: URL) { - guard let scheme = url.scheme, let host = url.host, let port = url.port else { return } - let target = Target(address: "\(scheme)://\(host)", port: UInt32(port)) - ourP2PAddress = target - } - - /// Broadcash an online message to all our friends. - /// This shouldn't be called inside a transaction. - @objc public static func broadcastOnlineStatus() { - AssertIsOnMainThread() - - let friendThreads = getAllFriendThreads() - for thread in friendThreads { - sendOnlineBroadcastMessage(forThread: thread) - } - } - - /// Get the `LokiAddressMessage` for the given thread. - /// - /// - Parameter thread: The contact thread. - /// - Returns: The `LokiAddressMessage` for that thread. - @objc public static func onlineBroadcastMessage(forThread thread: TSThread) -> LokiAddressMessage? { - return lokiAddressMessage(for: thread, isPing: false) - } + // MARK: - Private functions - private static func lokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? { - guard let ourAddress = ourP2PAddress else { - Logger.error("P2P Address not set") - return nil - } - - return LokiAddressMessage(in: thread, address: ourAddress.address, port: ourAddress.port, isPing: isPing) - } - - /// Send a `Loki Address` message to the given thread - /// - /// - Parameter thread: The contact thread to send the message to - @objc public static func sendOnlineBroadcastMessage(forThread thread: TSContactThread) { + private static func sendOnlineBroadcastMessage(forThread thread: TSContactThread) { AssertIsOnMainThread() guard let message = onlineBroadcastMessage(forThread: thread) else { @@ -156,14 +168,7 @@ extension LokiAPI { messageSender.sendPromise(message: message).catch { error in Logger.warn("Failed to send online status to \(thread.contactIdentifier())") - }.retainUntilComplete() - } - - @objc public static func sendOnlineBroadcastMessage(forThread thread: TSContactThread, transaction: YapDatabaseReadWriteTransaction) { - guard let ourAddress = ourP2PAddress else { - owsFailDebug("P2P Address not set") - return - } + }.retainUntilComplete() } private static func getAllFriendThreads() -> [TSContactThread] { @@ -179,4 +184,12 @@ extension LokiAPI { return friendThreadIds.compactMap { TSContactThread.fetch(uniqueId: $0) } } + private static func lokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? { + guard let ourAddress = ourP2PAddress else { + Logger.error("P2P Address not set") + return nil + } + + return LokiAddressMessage(in: thread, address: ourAddress.address, port: ourAddress.port, isPing: isPing) + } } diff --git a/SignalServiceKit/src/Messages/OWSMessageManager.m b/SignalServiceKit/src/Messages/OWSMessageManager.m index 2c21590ce..4d6eb034f 100644 --- a/SignalServiceKit/src/Messages/OWSMessageManager.m +++ b/SignalServiceKit/src/Messages/OWSMessageManager.m @@ -437,7 +437,7 @@ NS_ASSUME_NONNULL_BEGIN if (contentProto.lokiAddressMessage) { NSString *address = contentProto.lokiAddressMessage.ptpAddress; uint32_t port = contentProto.lokiAddressMessage.ptpPort; - [LokiAPI didReceiveLokiAddressMessageForContact:envelope.source address:address port:port receivedThroughP2P:envelope.isPtpMessage transaction: transaction]; + [LokiP2PManager didReceiveLokiAddressMessageForContact:envelope.source address:address port:port receivedThroughP2P:envelope.isPtpMessage]; } if (contentProto.syncMessage) { @@ -1459,6 +1459,16 @@ NS_ASSUME_NONNULL_BEGIN (unsigned long)timestamp); return nil; } + + // Loki + // If we received a message from a contact in the last 2 minues that was not p2p, then we need to ping them. + // We assume this occurred because they don't have our p2p details. + if (!envelope.isPtpMessage && envelope.source != nil) { + uint64_t timestamp = envelope.timestamp; + uint64_t now = NSDate.ows_millisecondTimeStamp; + uint64_t ageInSeconds = (now - timestamp) / 1000; + if (ageInSeconds <= 120) { [LokiP2PManager pingContact:envelope.source]; } + } [self finalizeIncomingMessage:incomingMessage thread:thread @@ -1509,7 +1519,7 @@ NS_ASSUME_NONNULL_BEGIN } // Send our p2p details to the other user - LKAddressMessage *_Nullable onlineMessage = [LokiAPI onlineBroadcastMessageForThread:thread]; + LKAddressMessage *_Nullable onlineMessage = [LokiP2PManager onlineBroadcastMessageForThread:thread]; if (onlineMessage != nil) { [self.messageSenderJobQueue addMessage:onlineMessage transaction:transaction]; }