Merge commit '07fefc168fcadb5e96aa3a076a27b6756cf91b6c' into p2p

pull/20/head
Mikunj 6 years ago
commit 0566d94105

@ -1 +1 @@
Subproject commit 04f0c4baf6a53b9e2e2f161d5da3574b2dbd333d Subproject commit 6eacc62ee03ae19105782f2ea60ac8ae46814788

@ -1207,6 +1207,8 @@ static NSTimeInterval launchStartedAt;
OWSAssertIsOnMainThread(); OWSAssertIsOnMainThread();
OWSLogInfo(@"storageIsReady"); OWSLogInfo(@"storageIsReady");
[LokiAPI loadSwarmCache];
[self checkIfAppIsReady]; [self checkIfAppIsReady];
} }

@ -8,6 +8,7 @@ private extension GCDWebServerResponse {
} }
private extension GCDWebServerDataRequest { private extension GCDWebServerDataRequest {
var truncatedContentType: String? { var truncatedContentType: String? {
guard let contentType = contentType else { return nil } guard let contentType = contentType else { return nil }
guard let substring = contentType.split(separator: ";").first else { return contentType } guard let substring = contentType.split(separator: ";").first else { return contentType }
@ -90,7 +91,8 @@ final class LokiP2PServer : NSObject {
@objc public var serverURL: URL? { return webServer.serverURL } @objc public var serverURL: URL? { return webServer.serverURL }
@objc public var isRunning: Bool { return webServer.isRunning } @objc public var isRunning: Bool { return webServer.isRunning }
@objc @discardableResult func start(onPort port: UInt) -> Bool { @discardableResult
@objc func start(onPort port: UInt) -> Bool {
guard !webServer.isRunning else { return false } guard !webServer.isRunning else { return false }
webServer.start(withPort: port, bonjourName: nil) webServer.start(withPort: port, bonjourName: nil)
return webServer.isRunning return webServer.isRunning

@ -0,0 +1,45 @@
import PromiseKit
internal extension LokiAPI {
private static let receivedMessageHashValuesKey = "receivedMessageHashValuesKey"
private static let receivedMessageHashValuesCollection = "receivedMessageHashValuesCollection"
internal static func getLastMessageHashValue(for target: Target) -> String? {
var result: String? = nil
// Uses a read/write connection because getting the last message hash value also removes expired messages as needed
storage.dbReadWriteConnection.readWrite { transaction in
result = storage.getLastMessageHash(forServiceNode: target.address, transaction: transaction)
}
return result
}
internal static func setLastMessageHashValue(for target: Target, hashValue: String, expiresAt: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setLastMessageHash(forServiceNode: target.address, hash: hashValue, expiresAt: expiresAt, transaction: transaction)
}
}
internal static func getReceivedMessageHashValues() -> Set<String>? {
var result: Set<String>? = nil
storage.dbReadConnection.read { transaction in
result = transaction.object(forKey: receivedMessageHashValuesKey, inCollection: receivedMessageHashValuesCollection) as! Set<String>?
}
return result
}
internal static func setReceivedMessageHashValues(to receivedMessageHashValues: Set<String>) {
storage.dbReadWriteConnection.readWrite { transaction in
transaction.setObject(receivedMessageHashValues, forKey: receivedMessageHashValuesKey, inCollection: receivedMessageHashValuesCollection)
}
}
}
internal extension AnyPromise {
internal static func from<T : Any>(_ promise: Promise<T>) -> AnyPromise {
let result = AnyPromise(promise)
result.retainUntilComplete()
return result
}
}

@ -1,13 +1,33 @@
import PromiseKit import PromiseKit
extension LokiAPI { public extension LokiAPI {
// MARK: Settings // MARK: Settings
private static let targetSnodeCount = 2 private static let minimumSnodeCount = 2 // TODO: For debugging purposes
private static let targetSnodeCount = 3 // TODO: For debugging purposes
private static let defaultSnodePort: UInt32 = 8080 private static let defaultSnodePort: UInt32 = 8080
// MARK: Caching // MARK: Caching
private static var swarmCache: [String:[Target]] = [:] private static let swarmCacheKey = "swarmCacheKey"
private static let swarmCacheCollection = "swarmCacheCollection"
fileprivate static var swarmCache: [String:[Target]] = [:]
@objc public static func loadSwarmCache() {
var result: [String:[Target]]? = nil
storage.dbReadConnection.read { transaction in
let intermediate = transaction.object(forKey: swarmCacheKey, inCollection: swarmCacheCollection) as! [String:[TargetWrapper]]?
result = intermediate?.mapValues { $0.map { Target(from: $0) } }
}
swarmCache = result ?? [:]
}
private static func saveSwarmCache() {
let intermediate = swarmCache.mapValues { $0.map { TargetWrapper(from: $0) } }
storage.dbReadWriteConnection.readWrite { transaction in
transaction.setObject(intermediate, forKey: swarmCacheKey, inCollection: swarmCacheCollection)
}
}
// MARK: Internal API // MARK: Internal API
private static func getRandomSnode() -> Promise<Target> { private static func getRandomSnode() -> Promise<Target> {
@ -17,11 +37,14 @@ extension LokiAPI {
} }
private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> { private static func getSwarm(for hexEncodedPublicKey: String) -> Promise<[Target]> {
if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= targetSnodeCount { if let cachedSwarm = swarmCache[hexEncodedPublicKey], cachedSwarm.count >= minimumSnodeCount {
return Promise<[Target]> { $0.fulfill(cachedSwarm) } return Promise<[Target]> { $0.fulfill(cachedSwarm) }
} else { } else {
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ]
return getRandomSnode().then { invoke(.getSwarm, on: $0, with: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 } return getRandomSnode().then { invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { parseTargets(from: $0) }.get { swarm in
swarmCache[hexEncodedPublicKey] = swarm
saveSwarmCache()
}
} }
} }
@ -44,3 +67,28 @@ extension LokiAPI {
// return addresses.map { Target(address: $0, port: defaultSnodePort) } // return addresses.map { Target(address: $0, port: defaultSnodePort) }
} }
} }
// MARK: Error Handling
internal extension Promise {
internal func handlingSwarmSpecificErrorsIfNeeded(for target: LokiAPI.Target, associatedWith hexEncodedPublicKey: String) -> Promise<T> {
return recover { error -> Promise<T> in
if let error = error as? NetworkManagerError {
switch error.statusCode {
case 0:
// The snode is unreachable; usually a problem with LokiNet
Logger.warn("[Loki] There appears to be a problem with LokiNet.")
case 421:
// The snode isn't associated with the given public key anymore
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
}
default: break
}
}
throw error
}
}
}

@ -0,0 +1,26 @@
internal extension LokiAPI {
internal struct Target : Hashable {
internal let address: String
internal let port: UInt32
internal init(address: String, port: UInt32) {
self.address = address
self.port = port
}
internal init(from targetWrapper: TargetWrapper) {
self.address = targetWrapper.address
self.port = targetWrapper.port
}
internal enum Method : String {
/// Only supported by snode targets.
case getSwarm = "get_snodes_for_pubkey"
/// Only supported by snode targets.
case getMessages = "retrieve"
case sendMessage = "store"
}
}
}

@ -10,19 +10,6 @@ import PromiseKit
internal static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey internal static let ourHexEncodedPubKey = OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey
// MARK: Types // MARK: Types
internal struct Target : Hashable {
let address: String
let port: UInt32
enum Method : String {
/// Only applicable to snode targets.
case getSwarm = "get_snodes_for_pubkey"
/// Only applicable to snode targets.
case getMessages = "retrieve"
case sendMessage = "store"
}
}
public typealias RawResponse = Any public typealias RawResponse = Any
public enum Error : LocalizedError { public enum Error : LocalizedError {
@ -44,10 +31,10 @@ import PromiseKit
override private init() { } override private init() { }
// MARK: Internal API // MARK: Internal API
internal static func invoke(_ method: Target.Method, on target: Target, with parameters: [String:Any] = [:]) -> Promise<RawResponse> { internal static func invoke(_ method: Target.Method, on target: Target, associatedWith hexEncodedPublicKey: String, parameters: [String:Any] = [:]) -> Promise<RawResponse> {
let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")! let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")!
let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ]) let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ])
return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject } return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey)
} }
// MARK: Public API // MARK: Public API
@ -55,7 +42,7 @@ import PromiseKit
return getTargetSnodes(for: ourHexEncodedPubKey).mapValues { targetSnode in return getTargetSnodes(for: ourHexEncodedPubKey).mapValues { targetSnode in
let lastHash = getLastMessageHashValue(for: targetSnode) ?? "" let lastHash = getLastMessageHashValue(for: targetSnode) ?? ""
let parameters: [String:Any] = [ "pubKey" : ourHexEncodedPubKey, "lastHash" : lastHash ] let parameters: [String:Any] = [ "pubKey" : ourHexEncodedPubKey, "lastHash" : lastHash ]
return invoke(.getMessages, on: targetSnode, with: parameters).map { rawResponse in return invoke(.getMessages, on: targetSnode, associatedWith: ourHexEncodedPubKey, parameters: parameters).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] } guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages) updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages) let newRawMessages = removeDuplicates(from: rawMessages)
@ -98,7 +85,7 @@ import PromiseKit
internal static func sendMessage(_ lokiMessage: Message, targets: Promise<[Target]>) -> Promise<Set<Promise<RawResponse>>> { internal static func sendMessage(_ lokiMessage: Message, targets: Promise<[Target]>) -> Promise<Set<Promise<RawResponse>>> {
let parameters = lokiMessage.toJSON() let parameters = lokiMessage.toJSON()
return targets.mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) } return targets.mapValues { invoke(.sendMessage, on: $0, associatedWith: lokiMessage.destination, parameters: parameters) }.map { Set($0) }
} }
public static func ping(_ hexEncodedPublicKey: String) -> Promise<Set<Promise<RawResponse>>> { public static func ping(_ hexEncodedPublicKey: String) -> Promise<Set<Promise<RawResponse>>> {
@ -107,7 +94,7 @@ import PromiseKit
// TODO: Send using P2P protocol // TODO: Send using P2P protocol
} else { } else {
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] // TODO: Figure out correct parameters let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] // TODO: Figure out correct parameters
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) } return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { Set($0) }
} }
} }
@ -156,56 +143,4 @@ import PromiseKit
return envelope return envelope
} }
} }
// MARK: Convenience
private static func getLastMessageHashValue(for target: Target) -> String? {
var result: String? = nil
// Uses a read/write connection because getting the last message hash value also removes expired messages as needed
storage.dbReadWriteConnection.readWrite { transaction in
result = storage.getLastMessageHash(forServiceNode: target.address, transaction: transaction)
}
return result
}
private static func setLastMessageHashValue(for target: Target, hashValue: String, expiresAt: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setLastMessageHash(forServiceNode: target.address, hash: hashValue, expiresAt: expiresAt, transaction: transaction)
}
}
private static func getReceivedMessageHashValues() -> Set<String>? {
var result: Set<String>? = nil
storage.dbReadConnection.read { transaction in
result = storage.getReceivedMessageHashes(with: transaction)
}
return result
}
private static func setReceivedMessageHashValues(to receivedMessageHashValues: Set<String>) {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setReceivedMessageHashes(receivedMessageHashValues, with: transaction)
}
}
}
private extension AnyPromise {
static func from<T : Any>(_ promise: Promise<T>) -> AnyPromise {
let result = AnyPromise(promise)
result.retainUntilComplete()
return result
}
}
// MARK: Error Handling
private extension Promise {
func recoverNetworkErrorIfNeeded(on queue: DispatchQueue) -> Promise<T> {
return recover(on: queue) { error -> Promise<T> in
switch error {
case NetworkManagerError.taskError(_, let underlyingError): throw underlyingError
default: throw error
}
}
}
} }

@ -0,0 +1,22 @@
@objc internal final class TargetWrapper : NSObject, NSCoding {
internal let address: String
internal let port: UInt32
internal init(from target: LokiAPI.Target) {
address = target.address
port = target.port
super.init()
}
internal init?(coder: NSCoder) {
address = coder.decodeObject(forKey: "address") as! String
port = coder.decodeObject(forKey: "port") as! UInt32
super.init()
}
internal func encode(with coder: NSCoder) {
coder.encode(address, forKey: "address")
coder.encode(port, forKey: "port")
}
}

@ -1,11 +1,11 @@
public extension ECKeyPair { public extension ECKeyPair {
@objc var hexEncodedPrivateKey: String { @objc public var hexEncodedPrivateKey: String {
return privateKey.map { String(format: "%02hhx", $0) }.joined() return privateKey.map { String(format: "%02hhx", $0) }.joined()
} }
@objc var hexEncodedPublicKey: String { @objc public var hexEncodedPublicKey: String {
// Prefixing with "05" is necessary for what seems to be a sort of Signal public key versioning system // Prefixing with "05" is necessary for what seems to be a sort of Signal public key versioning system
// Ref: [NSData prependKeyType] in AxolotKit // Ref: [NSData prependKeyType] in AxolotKit
return "05" + publicKey.map { String(format: "%02hhx", $0) }.joined() return "05" + publicKey.map { String(format: "%02hhx", $0) }.joined()

@ -4,7 +4,7 @@ import Curve25519Kit
private extension String { private extension String {
// Convert hex string to Data // Convert hex string to Data
var hexData: Data { fileprivate var hexData: Data {
var hex = self var hex = self
var data = Data() var data = Data()
while(hex.count > 0) { while(hex.count > 0) {

@ -95,9 +95,6 @@ NS_ASSUME_NONNULL_BEGIN
*/ */
- (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:)); - (void)setLastMessageHashForServiceNode:(NSString *)serviceNode hash:(NSString *)hash expiresAt:(u_int64_t)expiresAt transaction:(YapDatabaseReadWriteTransaction *)transaction NS_SWIFT_NAME(setLastMessageHash(forServiceNode:hash:expiresAt:transaction:));
- (NSSet<NSString *> *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction;
- (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction;
@end @end
NS_ASSUME_NONNULL_END NS_ASSUME_NONNULL_END

@ -153,12 +153,4 @@
[transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection]; [transaction removeObjectForKey:serviceNode inCollection:LKLastMessageHashCollection];
} }
- (NSSet<NSString *> *_Nullable)getReceivedMessageHashesWithTransaction:(YapDatabaseReadTransaction *)transaction {
return (NSSet *)[[transaction objectForKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection] as:NSSet.class];
}
- (void)setReceivedMessageHashes:(NSSet<NSString *> *)receivedMessageHashes withTransaction:(YapDatabaseReadWriteTransaction *)transaction {
[transaction setObject:receivedMessageHashes forKey:LKReceivedMessageHashesKey inCollection:LKReceivedMessageHashesCollection];
}
@end @end

@ -2,12 +2,12 @@ import CryptoSwift
private extension UInt64 { private extension UInt64 {
init(_ decimal: Decimal) { fileprivate init(_ decimal: Decimal) {
self.init(truncating: decimal as NSDecimalNumber) self.init(truncating: decimal as NSDecimalNumber)
} }
// Convert a UInt8 array to a UInt64 // Convert a UInt8 array to a UInt64
init(_ bytes: [UInt8]) { fileprivate init(_ bytes: [UInt8]) {
precondition(bytes.count <= MemoryLayout<UInt64>.size) precondition(bytes.count <= MemoryLayout<UInt64>.size)
var value: UInt64 = 0 var value: UInt64 = 0
for byte in bytes { for byte in bytes {
@ -24,7 +24,7 @@ private extension MutableCollection where Element == UInt8, Index == Int {
/// ///
/// - Parameter amount: The amount to increment by /// - Parameter amount: The amount to increment by
/// - Returns: The incremented collection /// - Returns: The incremented collection
func increment(by amount: Int) -> Self { fileprivate func increment(by amount: Int) -> Self {
var result = self var result = self
var increment = amount var increment = amount
for i in (0..<result.count).reversed() { for i in (0..<result.count).reversed() {

@ -156,7 +156,7 @@ public final class FriendRequestExpirationJob : NSObject {
// MARK: Events // MARK: Events
private extension FriendRequestExpirationJob { private extension FriendRequestExpirationJob {
@objc func didBecomeActive() { @objc fileprivate func didBecomeActive() {
AssertIsOnMainThread() AssertIsOnMainThread()
AppReadiness.runNowOrWhenAppDidBecomeReady { AppReadiness.runNowOrWhenAppDidBecomeReady {
FriendRequestExpirationJob.serialQueue.async { FriendRequestExpirationJob.serialQueue.async {
@ -165,7 +165,7 @@ private extension FriendRequestExpirationJob {
} }
} }
@objc func willResignActive() { @objc fileprivate func willResignActive() {
AssertIsOnMainThread() AssertIsOnMainThread()
resetNextExpireTimer() resetNextExpireTimer()
} }
@ -175,7 +175,7 @@ private extension FriendRequestExpirationJob {
// MARK: Asserts // MARK: Asserts
private extension FriendRequestExpirationJob { private extension FriendRequestExpirationJob {
func AssertIsOnFriendRequestExpireQueue() { fileprivate func AssertIsOnFriendRequestExpireQueue() {
#if DEBUG #if DEBUG
guard #available(iOS 10.0, *) else { return } guard #available(iOS 10.0, *) else { return }
dispatchPrecondition(condition: .onQueue(FriendRequestExpirationJob.serialQueue)) dispatchPrecondition(condition: .onQueue(FriendRequestExpirationJob.serialQueue))

Loading…
Cancel
Save