Moved P2P logic to LokiP2PManager.

pull/20/head
Mikunj 6 years ago
parent af6a969653
commit 88afca30c6

@ -1 +1 @@
Subproject commit 6eacc62ee03ae19105782f2ea60ac8ae46814788 Subproject commit 3343906944c5db1e40599f4d36c35fd6dc75da20

@ -320,7 +320,7 @@ static NSTimeInterval launchStartedAt;
if (self.lokiP2PServer.isRunning) { break; } if (self.lokiP2PServer.isRunning) { break; }
BOOL isStarted = [self.lokiP2PServer startOnPort:port.unsignedIntegerValue]; BOOL isStarted = [self.lokiP2PServer startOnPort:port.unsignedIntegerValue];
if (isStarted) { if (isStarted) {
[LokiAPI setOurP2PAddressWithUrl:self.lokiP2PServer.serverURL]; [LokiP2PManager setOurP2PAddressWithUrl:self.lokiP2PServer.serverURL];
OWSLogInfo(@"[Loki] Started server at %@.", self.lokiP2PServer.serverURL); OWSLogInfo(@"[Loki] Started server at %@.", self.lokiP2PServer.serverURL);
break; break;
} }
@ -753,7 +753,7 @@ static NSTimeInterval launchStartedAt;
[Poller.shared startIfNeeded]; [Poller.shared startIfNeeded];
// Loki: Tell our friends that we are online // Loki: Tell our friends that we are online
[LokiAPI broadcastOnlineStatus]; [LokiP2PManager broadcastOnlineStatus];
if (![UIApplication sharedApplication].isRegisteredForRemoteNotifications) { if (![UIApplication sharedApplication].isRegisteredForRemoteNotifications) {
OWSLogInfo(@"Retrying to register for remote notifications since user hasn't registered yet."); OWSLogInfo(@"Retrying to register for remote notifications since user hasn't registered yet.");

@ -7,7 +7,7 @@ import PromiseKit
private static let version = "v1" private static let version = "v1"
public static let defaultMessageTTL: UInt64 = 1 * 24 * 60 * 60 * 1000 public static let defaultMessageTTL: UInt64 = 1 * 24 * 60 * 60 * 1000
internal static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey private static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
// MARK: Types // MARK: Types
public typealias RawResponse = Any public typealias RawResponse = Any
@ -74,18 +74,19 @@ import PromiseKit
// If we have the p2p details and we have marked the user as online OR we are pinging the user, then use peer to peer // If we have the p2p details and we have marked the user as online OR we are pinging the user, then use peer to peer
// If that failes then fallback to storage server // If that failes then fallback to storage server
if let p2pDetails = contactP2PDetails[destination], message.isPing || p2pDetails.isOnline { if let p2pDetails = LokiP2PManager.getDetails(forContact: destination), message.isPing || p2pDetails.isOnline {
let targets = Promise.wrap([p2pDetails.target]) let targets = Promise.wrap([p2pDetails.target])
return sendMessage(message, targets: targets).then { result -> Promise<Set<Promise<RawResponse>>> in return sendMessage(message, targets: targets).then { result -> Promise<Set<Promise<RawResponse>>> in
LokiAPI.setOnline(true, forContact: destination) LokiP2PManager.setOnline(true, forContact: destination)
return Promise.wrap(result) return Promise.wrap(result)
}.recover { error -> Promise<Set<Promise<RawResponse>>> in }.recover { error -> Promise<Set<Promise<RawResponse>>> in
// The user is not online // The user is not online
LokiAPI.setOnline(false, forContact: destination) LokiP2PManager.setOnline(false, forContact: destination)
// If it was a ping then don't send to the storage server // If it was a ping then don't send to the storage server
if (message.isPing) { if (message.isPing) {
Logger.warn("[Loki] Failed to ping \(destination) - Marking contact as offline.") Logger.warn("[Loki] Failed to ping \(destination) - Marking contact as offline.")
error.isRetryable = false
throw error throw error
} }

@ -1,6 +1,8 @@
extension LokiAPI { @objc public class LokiP2PManager : NSObject {
private static let storage = OWSPrimaryStorage.shared()
private static let messageSender: MessageSender = SSKEnvironment.shared.messageSender private static let messageSender: MessageSender = SSKEnvironment.shared.messageSender
private static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
/// The amount of time before pinging when a user is set to offline /// The amount of time before pinging when a user is set to offline
private static let offlinePingTime = 2 * kMinuteInterval private static let offlinePingTime = 2 * kMinuteInterval
@ -13,15 +15,77 @@ extension LokiAPI {
var timerDuration: Double var timerDuration: Double
var pingTimer: Timer? = nil var pingTimer: Timer? = nil
var target: Target { var target: LokiAPI.Target {
return Target(address: address, port: port) return LokiAPI.Target(address: address, port: port)
} }
} }
internal static var ourP2PAddress: Target? = nil /// Our p2p address
private static var ourP2PAddress: LokiAPI.Target? = nil
/// This is where we store the p2p details of our contacts /// This is where we store the p2p details of our contacts
internal static var contactP2PDetails = [String: P2PDetails]() private static var contactP2PDetails = [String: P2PDetails]()
// MARK: - Public functions
/// Set our local P2P address
///
/// - Parameter url: The url to our local server
@objc public static func setOurP2PAddress(url: URL) {
guard let scheme = url.scheme, let host = url.host, let port = url.port else { return }
let target = LokiAPI.Target(address: "\(scheme)://\(host)", port: UInt32(port))
ourP2PAddress = target
}
/// Ping a contact
///
/// - Parameter pubKey: The contact hex pubkey
@objc(pingContact:)
public static func ping(contact pubKey: String) {
// Dispatch on the main queue so we escape any transaction blocks
DispatchQueue.main.async {
guard let thread = TSContactThread.fetch(uniqueId: pubKey) else {
Logger.warn("[Loki][Ping] Failed to fetch thread for \(pubKey)")
return
}
guard let message = lokiAddressMessage(for: thread, isPing: true) else {
Logger.warn("[Loki][Ping] Failed to build ping message for \(pubKey)")
return
}
messageSender.sendPromise(message: message).retainUntilComplete()
}
}
/// Broadcash an online message to all our friends.
/// This shouldn't be called inside a transaction.
@objc public static func broadcastOnlineStatus() {
// Escape any transaction blocks
DispatchQueue.main.async {
let friendThreads = getAllFriendThreads()
for thread in friendThreads {
sendOnlineBroadcastMessage(forThread: thread)
}
}
}
// MARK: - Internal functions
/// Get the P2P details for the given contact.
///
/// - Parameter pubKey: The contact hex pubkey
/// - Returns: The P2P Details or nil if they don't exist
internal static func getDetails(forContact pubKey: String) -> P2PDetails? {
return contactP2PDetails[pubKey]
}
/// Get the `LokiAddressMessage` for the given thread.
///
/// - Parameter thread: The contact thread.
/// - Returns: The `LokiAddressMessage` for that thread.
@objc public static func onlineBroadcastMessage(forThread thread: TSThread) -> LokiAddressMessage? {
return lokiAddressMessage(for: thread, isPing: false)
}
/// Handle P2P logic when we receive a `LokiAddressMessage` /// Handle P2P logic when we receive a `LokiAddressMessage`
/// ///
@ -30,8 +94,9 @@ extension LokiAPI {
/// - address: The pther users p2p address /// - address: The pther users p2p address
/// - port: The other users p2p port /// - port: The other users p2p port
/// - receivedThroughP2P: Wether we received the message through p2p /// - receivedThroughP2P: Wether we received the message through p2p
@objc public static func didReceiveLokiAddressMessage(forContact pubKey: String, address: String, port: UInt32, receivedThroughP2P: Bool, transaction: YapDatabaseReadTransaction) { @objc internal static func didReceiveLokiAddressMessage(forContact pubKey: String, address: String, port: UInt32, receivedThroughP2P: Bool) {
// Stagger the ping timers so that contacts don't ping each other at the same time // Stagger the ping timers so that contacts don't ping each other at the same time
let timerDuration = pubKey < ourHexEncodedPubKey ? 1 * kMinuteInterval : 2 * kMinuteInterval let timerDuration = pubKey < ourHexEncodedPubKey ? 1 * kMinuteInterval : 2 * kMinuteInterval
// Get out current contact details // Get out current contact details
@ -67,10 +132,15 @@ extension LokiAPI {
3. We had the contact marked as offline, we need to make sure that we can reach their server. 3. We had the contact marked as offline, we need to make sure that we can reach their server.
4. The other contact details have changed, we need to make sure that we can reach their new server. 4. The other contact details have changed, we need to make sure that we can reach their new server.
*/ */
ping(contact: pubKey, withTransaction: transaction) ping(contact: pubKey)
} }
@objc public static func setOnline(_ isOnline: Bool, forContact pubKey: String) { /// Mark a contact as online or offline.
///
/// - Parameters:
/// - isOnline: Whether to set the contact to online or offline.
/// - pubKey: The contact hexh pubKey
@objc internal static func setOnline(_ isOnline: Bool, forContact pubKey: String) {
// Make sure we are on the main thread // Make sure we are on the main thread
DispatchQueue.main.async { DispatchQueue.main.async {
guard var details = contactP2PDetails[pubKey] else { return } guard var details = contactP2PDetails[pubKey] else { return }
@ -86,67 +156,9 @@ extension LokiAPI {
} }
} }
@objc public static func ping(contact pubKey: String) { // MARK: - Private functions
AssertIsOnMainThread()
storage.dbReadConnection.read { transaction in
ping(contact: pubKey, withTransaction: transaction)
}
}
@objc public static func ping(contact pubKey: String, withTransaction transaction: YapDatabaseReadTransaction) {
guard let thread = TSContactThread.fetch(uniqueId: pubKey, transaction: transaction) else {
Logger.warn("[Loki][Ping] Failed to fetch thread for \(pubKey)")
return
}
guard let message = lokiAddressMessage(for: thread, isPing: true) else {
Logger.warn("[Loki][Ping] Failed to build ping message for \(pubKey)")
return
}
messageSender.sendPromise(message: message).retainUntilComplete()
}
/// Set our local P2P address private static func sendOnlineBroadcastMessage(forThread thread: TSContactThread) {
///
/// - Parameter url: The url to our local server
@objc public static func setOurP2PAddress(url: URL) {
guard let scheme = url.scheme, let host = url.host, let port = url.port else { return }
let target = Target(address: "\(scheme)://\(host)", port: UInt32(port))
ourP2PAddress = target
}
/// Broadcash an online message to all our friends.
/// This shouldn't be called inside a transaction.
@objc public static func broadcastOnlineStatus() {
AssertIsOnMainThread()
let friendThreads = getAllFriendThreads()
for thread in friendThreads {
sendOnlineBroadcastMessage(forThread: thread)
}
}
/// Get the `LokiAddressMessage` for the given thread.
///
/// - Parameter thread: The contact thread.
/// - Returns: The `LokiAddressMessage` for that thread.
@objc public static func onlineBroadcastMessage(forThread thread: TSThread) -> LokiAddressMessage? {
return lokiAddressMessage(for: thread, isPing: false)
}
private static func lokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? {
guard let ourAddress = ourP2PAddress else {
Logger.error("P2P Address not set")
return nil
}
return LokiAddressMessage(in: thread, address: ourAddress.address, port: ourAddress.port, isPing: isPing)
}
/// Send a `Loki Address` message to the given thread
///
/// - Parameter thread: The contact thread to send the message to
@objc public static func sendOnlineBroadcastMessage(forThread thread: TSContactThread) {
AssertIsOnMainThread() AssertIsOnMainThread()
guard let message = onlineBroadcastMessage(forThread: thread) else { guard let message = onlineBroadcastMessage(forThread: thread) else {
@ -159,13 +171,6 @@ extension LokiAPI {
}.retainUntilComplete() }.retainUntilComplete()
} }
@objc public static func sendOnlineBroadcastMessage(forThread thread: TSContactThread, transaction: YapDatabaseReadWriteTransaction) {
guard let ourAddress = ourP2PAddress else {
owsFailDebug("P2P Address not set")
return
}
}
private static func getAllFriendThreads() -> [TSContactThread] { private static func getAllFriendThreads() -> [TSContactThread] {
var friendThreadIds = [String]() var friendThreadIds = [String]()
TSContactThread.enumerateCollectionObjects { (object, _) in TSContactThread.enumerateCollectionObjects { (object, _) in
@ -179,4 +184,12 @@ extension LokiAPI {
return friendThreadIds.compactMap { TSContactThread.fetch(uniqueId: $0) } return friendThreadIds.compactMap { TSContactThread.fetch(uniqueId: $0) }
} }
private static func lokiAddressMessage(for thread: TSThread, isPing: Bool) -> LokiAddressMessage? {
guard let ourAddress = ourP2PAddress else {
Logger.error("P2P Address not set")
return nil
}
return LokiAddressMessage(in: thread, address: ourAddress.address, port: ourAddress.port, isPing: isPing)
}
} }

@ -437,7 +437,7 @@ NS_ASSUME_NONNULL_BEGIN
if (contentProto.lokiAddressMessage) { if (contentProto.lokiAddressMessage) {
NSString *address = contentProto.lokiAddressMessage.ptpAddress; NSString *address = contentProto.lokiAddressMessage.ptpAddress;
uint32_t port = contentProto.lokiAddressMessage.ptpPort; uint32_t port = contentProto.lokiAddressMessage.ptpPort;
[LokiAPI didReceiveLokiAddressMessageForContact:envelope.source address:address port:port receivedThroughP2P:envelope.isPtpMessage transaction: transaction]; [LokiP2PManager didReceiveLokiAddressMessageForContact:envelope.source address:address port:port receivedThroughP2P:envelope.isPtpMessage];
} }
if (contentProto.syncMessage) { if (contentProto.syncMessage) {
@ -1460,6 +1460,16 @@ NS_ASSUME_NONNULL_BEGIN
return nil; return nil;
} }
// Loki
// If we received a message from a contact in the last 2 minues that was not p2p, then we need to ping them.
// We assume this occurred because they don't have our p2p details.
if (!envelope.isPtpMessage && envelope.source != nil) {
uint64_t timestamp = envelope.timestamp;
uint64_t now = NSDate.ows_millisecondTimeStamp;
uint64_t ageInSeconds = (now - timestamp) / 1000;
if (ageInSeconds <= 120) { [LokiP2PManager pingContact:envelope.source]; }
}
[self finalizeIncomingMessage:incomingMessage [self finalizeIncomingMessage:incomingMessage
thread:thread thread:thread
envelope:envelope envelope:envelope
@ -1509,7 +1519,7 @@ NS_ASSUME_NONNULL_BEGIN
} }
// Send our p2p details to the other user // Send our p2p details to the other user
LKAddressMessage *_Nullable onlineMessage = [LokiAPI onlineBroadcastMessageForThread:thread]; LKAddressMessage *_Nullable onlineMessage = [LokiP2PManager onlineBroadcastMessageForThread:thread];
if (onlineMessage != nil) { if (onlineMessage != nil) {
[self.messageSenderJobQueue addMessage:onlineMessage transaction:transaction]; [self.messageSenderJobQueue addMessage:onlineMessage transaction:transaction];
} }

Loading…
Cancel
Save