Clean up threading

pull/214/head
nielsandriesse 5 years ago
parent 146329627f
commit b8fb751b8d

@ -38,8 +38,8 @@ public extension LokiAPI {
]
print("[Loki] Populating snode pool using: \(target).")
let (promise, seal) = Promise<LokiAPITarget>.pending()
attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.global()) {
HTTP.execute(.post, url, parameters: parameters).map(on: DispatchQueue.global()) { json -> LokiAPITarget in
attempt(maxRetryCount: 4) {
HTTP.execute(.post, url, parameters: parameters).map2 { json -> LokiAPITarget in
guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed }
snodePool = try Set(rawTargets.flatMap { rawTarget in
guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else {
@ -51,13 +51,13 @@ public extension LokiAPI {
// randomElement() uses the system's default random generator, which is cryptographically secure
return snodePool.randomElement()!
}
}.done(on: DispatchQueue.global()) { snode in
}.done2 { snode in
seal.fulfill(snode)
try! Storage.writeSync { transaction in
print("[Loki] Persisting snode pool to database.")
storage.setSnodePool(LokiAPI.snodePool, in: transaction)
}
}.catch(on: DispatchQueue.global()) { error in
}.catch2 { error in
print("[Loki] Failed to contact seed node at: \(target).")
seal.reject(error)
}
@ -81,11 +81,11 @@ public extension LokiAPI {
} else {
print("[Loki] Getting swarm for: \(hexEncodedPublicKey).")
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ]
return getRandomSnode().then(on: workQueue) {
return getRandomSnode().then2 {
invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters)
}.map {
}.map2 {
parseTargets(from: $0)
}.get { swarm in
}.get2 { swarm in
swarmCache[hexEncodedPublicKey] = swarm
try! Storage.writeSync { transaction in
storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
@ -96,7 +96,7 @@ public extension LokiAPI {
internal static func getTargetSnodes(for hexEncodedPublicKey: String) -> Promise<[LokiAPITarget]> {
// shuffled() uses the system's default random generator, which is cryptographically secure
return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSwarmSnodeCount)) }
return getSwarm(for: hexEncodedPublicKey).map2 { Array($0.shuffled().prefix(targetSwarmSnodeCount)) }
}
internal static func dropSnodeFromSnodePool(_ target: LokiAPITarget) {
@ -145,7 +145,7 @@ public extension LokiAPI {
internal extension Promise {
internal func handlingSnodeErrorsIfNeeded(for target: LokiAPITarget, associatedWith hexEncodedPublicKey: String) -> Promise<T> {
return recover(on: LokiAPI.errorHandlingQueue) { error -> Promise<T> in
return recover2 { error -> Promise<T> in
if let error = error as? LokiHTTPClient.HTTPError {
switch error.statusCode {
case 0, 400, 500, 503:

@ -1,16 +1,8 @@
import PromiseKit
// TODO: We guarantee that things happen in-order through promise chaining. For performance we should be able to use different queues for everything as long
// as we always modify state from the same queue.
@objc(LKAPI)
public final class LokiAPI : NSObject {
/// All service node related errors must be handled on this queue to avoid race conditions maintaining e.g. failure counts.
internal static let errorHandlingQueue = DispatchQueue(label: "LokiAPI.errorHandlingQueue")
internal static let stateQueue = DispatchQueue(label: "LokiAPI.stateQueue")
internal static let workQueue = DispatchQueue(label: "LokiAPI.workQueue", qos: .userInitiated)
internal static var storage: OWSPrimaryStorage { OWSPrimaryStorage.shared() }
// MARK: Settings
@ -22,7 +14,7 @@ public final class LokiAPI : NSObject {
/// - Note: Changing this on the fly is not recommended.
internal static var useOnionRequests = true
// MARK: Nested Types
// MARK: Types
public typealias RawResponse = Any
@objc public class LokiAPIError : NSError { // Not called `Error` for Obj-C interoperablity
@ -46,13 +38,13 @@ public final class LokiAPI : NSObject {
parameters: JSON, headers: [String:String]? = nil, timeout: TimeInterval? = nil) -> RawResponsePromise {
let url = URL(string: "\(target.address):\(target.port)/storage_rpc/v1")!
if useOnionRequests {
return OnionRequestAPI.sendOnionRequest(invoking: method, on: target, with: parameters, associatedWith: hexEncodedPublicKey).map { $0 as Any }
return OnionRequestAPI.sendOnionRequest(invoking: method, on: target, with: parameters, associatedWith: hexEncodedPublicKey).map2 { $0 as Any }
} else {
let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ])
if let headers = headers { request.allHTTPHeaderFields = headers }
request.timeoutInterval = timeout ?? defaultTimeout
return TSNetworkManager.shared().perform(request, withCompletionQueue: workQueue)
.map { $0.responseObject }
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated))
.map2 { $0.responseObject }
.handlingSnodeErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey)
.recoveringNetworkErrorsIfNeeded()
}
@ -68,16 +60,16 @@ public final class LokiAPI : NSObject {
// MARK: Public API
public static func getMessages() -> Promise<Set<MessageListPromise>> {
return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) {
getTargetSnodes(for: getUserHexEncodedPublicKey()).mapValues { targetSnode in
getRawMessages(from: targetSnode, usingLongPolling: false).map { parseRawMessagesResponse($0, from: targetSnode) }
}.map { Set($0) }
return attempt(maxRetryCount: maxRetryCount) {
getTargetSnodes(for: getUserHexEncodedPublicKey()).mapValues2 { targetSnode in
getRawMessages(from: targetSnode, usingLongPolling: false).map2 { parseRawMessagesResponse($0, from: targetSnode) }
}.map2 { Set($0) }
}
}
@objc(sendSignalMessage:onP2PSuccess:)
public static func objc_sendSignalMessage(_ signalMessage: SignalMessage, onP2PSuccess: @escaping () -> Void) -> AnyPromise {
let promise = sendSignalMessage(signalMessage, onP2PSuccess: onP2PSuccess).mapValues { AnyPromise.from($0) }.map { Set($0) }
let promise = sendSignalMessage(signalMessage, onP2PSuccess: onP2PSuccess).mapValues2 { AnyPromise.from($0) }.map2 { Set($0) }
return AnyPromise.from(promise)
}
@ -87,18 +79,18 @@ public final class LokiAPI : NSObject {
let destination = lokiMessage.destination
func sendLokiMessage(_ lokiMessage: LokiMessage, to target: LokiAPITarget) -> RawResponsePromise {
let parameters = lokiMessage.toJSON()
return attempt(maxRetryCount: maxRetryCount, recoveringOn: workQueue) {
return attempt(maxRetryCount: maxRetryCount) {
invoke(.sendMessage, on: target, associatedWith: destination, parameters: parameters)
}
}
func sendLokiMessageUsingSwarmAPI() -> Promise<Set<RawResponsePromise>> {
notificationCenter.post(name: .calculatingPoW, object: NSNumber(value: signalMessage.timestamp))
return lokiMessage.calculatePoW().then { lokiMessageWithPoW -> Promise<Set<RawResponsePromise>> in
return lokiMessage.calculatePoW().then2 { lokiMessageWithPoW -> Promise<Set<RawResponsePromise>> in
notificationCenter.post(name: .routing, object: NSNumber(value: signalMessage.timestamp))
return getTargetSnodes(for: destination).map { snodes in
return getTargetSnodes(for: destination).map2 { snodes in
return Set(snodes.map { snode in
notificationCenter.post(name: .messageSending, object: NSNumber(value: signalMessage.timestamp))
return sendLokiMessage(lokiMessageWithPoW, to: snode).map { rawResponse in
return sendLokiMessage(lokiMessageWithPoW, to: snode).map2 { rawResponse in
if let json = rawResponse as? JSON, let powDifficulty = json["difficulty"] as? Int {
guard powDifficulty != LokiAPI.powDifficulty else { return rawResponse }
print("[Loki] Setting proof of work difficulty to \(powDifficulty).")
@ -115,10 +107,10 @@ public final class LokiAPI : NSObject {
if let peer = LokiP2PAPI.getInfo(for: destination), (lokiMessage.isPing || peer.isOnline) {
let target = LokiAPITarget(address: peer.address, port: peer.port, publicKeySet: nil)
// TODO: Retrying
return Promise.value([ target ]).mapValues { sendLokiMessage(lokiMessage, to: $0) }.map { Set($0) }.get { _ in
return Promise.value([ target ]).mapValues2 { sendLokiMessage(lokiMessage, to: $0) }.map2 { Set($0) }.get2 { _ in
LokiP2PAPI.markOnline(destination)
onP2PSuccess()
}.recover { error -> Promise<Set<RawResponsePromise>> in
}.recover2 { error -> Promise<Set<RawResponsePromise>> in
LokiP2PAPI.markOffline(destination)
if lokiMessage.isPing {
print("[Loki] Failed to ping \(destination); marking contact as offline.")
@ -228,7 +220,7 @@ public final class LokiAPI : NSObject {
private extension Promise {
fileprivate func recoveringNetworkErrorsIfNeeded() -> Promise<T> {
return recover { error -> Promise<T> in
return recover2 { error -> Promise<T> in
switch error {
case NetworkManagerError.taskError(_, let underlyingError): throw underlyingError
case LokiHTTPClient.HTTPError.networkError(_, _, let underlyingError): throw underlyingError ?? error

@ -37,7 +37,7 @@ public class LokiDotNetAPI : NSObject {
if let token = getAuthTokenFromDatabase(for: server) {
return Promise.value(token)
} else {
return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global()) { token in
return requestNewAuthToken(for: server).then2 { submitAuthToken($0, for: server) }.map2 { token in
try! Storage.writeSync { transaction in
setAuthToken(for: server, to: token, in: transaction)
}
@ -65,7 +65,7 @@ public class LokiDotNetAPI : NSObject {
let queryParameters = "pubKey=\(getUserHexEncodedPublicKey())"
let url = URL(string: "\(server)/loki/v1/get_challenge?\(queryParameters)")!
let request = TSRequest(url: url)
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map(on: LokiAPI.workQueue) { rawResponse in
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)).map2 { rawResponse in
guard let json = rawResponse as? JSON, let base64EncodedChallenge = json["cipherText64"] as? String, let base64EncodedServerPublicKey = json["serverPubKey64"] as? String,
let challenge = Data(base64Encoded: base64EncodedChallenge), var serverPublicKey = Data(base64Encoded: base64EncodedServerPublicKey) else {
throw LokiDotNetAPIError.parsingFailed
@ -89,7 +89,7 @@ public class LokiDotNetAPI : NSObject {
let url = URL(string: "\(server)/loki/v1/submit_challenge")!
let parameters = [ "pubKey" : getUserHexEncodedPublicKey(), "token" : token ]
let request = TSRequest(url: url, method: "POST", parameters: parameters)
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { _ in token }
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)).map2 { _ in token }
}
// MARK: Public API
@ -157,9 +157,9 @@ public class LokiDotNetAPI : NSObject {
if isProxyingRequired {
attachment.isUploaded = false
attachment.save()
let _ = LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).done { responseObject in
let _ = LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).done2 { responseObject in
parseResponse(responseObject)
}.catch { error in
}.catch2 { error in
seal.reject(error)
}
} else {
@ -187,13 +187,13 @@ public class LokiDotNetAPI : NSObject {
}
}
if server == LokiFileServerAPI.server {
LokiAPI.workQueue.async {
DispatchQueue.global(qos: .userInitiated).async {
proceed(with: "loki") // Uploads to the Loki File Server shouldn't include any personally identifiable information so use a dummy auth token
}
} else {
getAuthToken(for: server).done(on: LokiAPI.workQueue) { token in
getAuthToken(for: server).done2 { token in
proceed(with: token)
}.catch { error in
}.catch2 { error in
print("[Loki] Couldn't upload attachment due to error: \(error).")
seal.reject(error)
}
@ -206,7 +206,7 @@ public class LokiDotNetAPI : NSObject {
internal extension Promise {
internal func handlingInvalidAuthTokenIfNeeded(for server: String) -> Promise<T> {
return recover(on: DispatchQueue.global()) { error -> Promise<T> in
return recover2 { error -> Promise<T> in
if let error = error as? NetworkManagerError, (error.statusCode == 401 || error.statusCode == 403) {
print("[Loki] Group chat auth token for: \(server) expired; dropping it.")
LokiDotNetAPI.clearAuthToken(for: server)

@ -34,11 +34,11 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
public static func getDeviceLinks(associatedWith hexEncodedPublicKeys: Set<String>) -> Promise<Set<DeviceLink>> {
let hexEncodedPublicKeysDescription = "[ \(hexEncodedPublicKeys.joined(separator: ", ")) ]"
print("[Loki] Getting device links for: \(hexEncodedPublicKeysDescription).")
return getAuthToken(for: server).then(on: DispatchQueue.global()) { token -> Promise<Set<DeviceLink>> in
return getAuthToken(for: server).then2 { token -> Promise<Set<DeviceLink>> in
let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1"
let url = URL(string: "\(server)/users?\(queryParameters)")!
let request = TSRequest(url: url)
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global(qos: .userInitiated)) { rawResponse -> Set<DeviceLink> in
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global(qos: .userInitiated)).map2 { rawResponse -> Set<DeviceLink> in
guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else {
print("[Loki] Couldn't parse device links for users: \(hexEncodedPublicKeys) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed
@ -80,7 +80,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
return deviceLink
}
})
}.map(on: DispatchQueue.global()) { deviceLinks in
}.map2 { deviceLinks in
storage.setDeviceLinks(deviceLinks)
return deviceLinks
}
@ -89,7 +89,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
public static func setDeviceLinks(_ deviceLinks: Set<DeviceLink>) -> Promise<Void> {
print("[Loki] Updating device links.")
return getAuthToken(for: server).then { token -> Promise<Void> in
return getAuthToken(for: server).then2 { token -> Promise<Void> in
let isMaster = deviceLinks.contains { $0.master.hexEncodedPublicKey == getUserHexEncodedPublicKey() }
let deviceLinksAsJSON = deviceLinks.map { $0.toJSON() }
let value = !deviceLinksAsJSON.isEmpty ? [ "isPrimary" : isMaster ? 1 : 0, "authorisations" : deviceLinksAsJSON ] : nil
@ -98,9 +98,9 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
let url = URL(string: "\(server)/users/me")!
let request = TSRequest(url: url, method: "PATCH", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return attempt(maxRetryCount: 8, recoveringOn: LokiAPI.workQueue) {
LokiFileServerProxy(for: server).perform(request).map { _ in }
}.handlingInvalidAuthTokenIfNeeded(for: server).recover { error in
return attempt(maxRetryCount: 8) {
LokiFileServerProxy(for: server).perform(request).map2 { _ in }
}.handlingInvalidAuthTokenIfNeeded(for: server).recover2 { error in
print("Couldn't update device links due to error: \(error).")
throw error
}
@ -114,7 +114,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction)
}
deviceLinks.insert(deviceLink)
return setDeviceLinks(deviceLinks).map(on: LokiAPI.workQueue) { _ in
return setDeviceLinks(deviceLinks).map2 { _ in
storage.addDeviceLink(deviceLink)
}
}
@ -126,7 +126,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction)
}
deviceLinks.remove(deviceLink)
return setDeviceLinks(deviceLinks).map(on: LokiAPI.workQueue) { _ in
return setDeviceLinks(deviceLinks).map2 { _ in
storage.removeDeviceLink(deviceLink)
}
}
@ -151,7 +151,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
print("[Loki] Couldn't upload profile picture due to error: \(error).")
return Promise(error: error)
}
return LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).map { responseObject in
return LokiFileServerProxy(for: server).performLokiFileServerNSURLRequest(request as NSURLRequest).map2 { responseObject in
guard let json = responseObject as? JSON, let data = json["data"] as? JSON, let downloadURL = data["url"] as? String else {
print("[Loki] Couldn't parse profile picture from: \(responseObject).")
throw LokiDotNetAPIError.parsingFailed

@ -49,7 +49,7 @@ internal class LokiFileServerProxy : LokiHTTPClient {
DispatchQueue.global(qos: .userInitiated).async {
let uncheckedSymmetricKey = try? Curve25519.generateSharedSecret(fromPublicKey: LokiFileServerProxy.fileServerPublicKey, privateKey: keyPair.privateKey)
guard let symmetricKey = uncheckedSymmetricKey else { return seal.reject(Error.symmetricKeyGenerationFailed) }
LokiAPI.getRandomSnode().then(on: DispatchQueue.global()) { proxy -> Promise<Any> in
LokiAPI.getRandomSnode().then2 { proxy -> Promise<Any> in
let url = "\(proxy.address):\(proxy.port)/file_proxy"
guard let urlAsString = request.url?.absoluteString, let serverURLEndIndex = urlAsString.range(of: server)?.upperBound,
serverURLEndIndex < urlAsString.endIndex else { throw Error.endpointParsingFailed }
@ -102,7 +102,7 @@ internal class LokiFileServerProxy : LokiHTTPClient {
}
task.resume()
return promise
}.map(on: DispatchQueue.global(qos: .userInitiated)) { rawResponse in
}.map2 { rawResponse in
guard let responseAsData = rawResponse as? Data, let responseAsJSON = try? JSONSerialization.jsonObject(with: responseAsData, options: .allowFragments) as? JSON, let base64EncodedCipherText = responseAsJSON["data"] as? String,
let meta = responseAsJSON["meta"] as? JSON, let statusCode = meta["code"] as? Int, let cipherText = Data(base64Encoded: base64EncodedCipherText) else {
print("[Loki] Received an invalid response.")
@ -115,9 +115,9 @@ internal class LokiFileServerProxy : LokiHTTPClient {
let uncheckedJSON = try? JSONSerialization.jsonObject(with: uncheckedJSONAsData, options: .allowFragments) as? JSON
guard let json = uncheckedJSON else { throw HTTPError.networkError(code: -1, response: nil, underlyingError: Error.proxyResponseParsingFailed) }
return json
}.done(on: DispatchQueue.global()) { rawResponse in
}.done2 { rawResponse in
seal.fulfill(rawResponse)
}.catch(on: DispatchQueue.global()) { error in
}.catch2 { error in
print("[Loki] File server proxy request failed with error: \(error.localizedDescription).")
seal.reject(HTTPError.from(error: error) ?? error)
}

@ -10,12 +10,12 @@ public class LokiHTTPClient {
securityPolicy.validatesDomainName = false
result.securityPolicy = securityPolicy
result.responseSerializer = AFHTTPResponseSerializer()
result.completionQueue = DispatchQueue.global()
result.completionQueue = DispatchQueue.global(qos: .userInitiated)
return result
}()
internal func perform(_ request: TSRequest, withCompletionQueue queue: DispatchQueue = DispatchQueue.main) -> LokiAPI.RawResponsePromise {
return TSNetworkManager.shared().perform(request, withCompletionQueue: queue).map { $0.responseObject }.recover { error -> LokiAPI.RawResponsePromise in
return TSNetworkManager.shared().perform(request, withCompletionQueue: queue).map2 { $0.responseObject }.recover2 { error -> LokiAPI.RawResponsePromise in
throw HTTPError.from(error: error) ?? error
}
}

@ -54,13 +54,13 @@ public final class LokiPoller : NSObject {
// MARK: Private API
private func setUpPolling() {
guard !hasStopped else { return }
LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then { [weak self] _ -> Promise<Void> in
LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then2 { [weak self] _ -> Promise<Void> in
guard let strongSelf = self else { return Promise { $0.fulfill(()) } }
strongSelf.usedSnodes.removeAll()
let (promise, seal) = Promise<Void>.pending()
strongSelf.pollNextSnode(seal: seal)
return promise
}.ensure { [weak self] in
}.ensure2 { [weak self] in
guard let strongSelf = self, !strongSelf.hasStopped else { return }
Timer.scheduledTimer(withTimeInterval: LokiPoller.retryInterval, repeats: false) { _ in
guard let strongSelf = self else { return }
@ -77,9 +77,9 @@ public final class LokiPoller : NSObject {
// randomElement() uses the system's default random generator, which is cryptographically secure
let nextSnode = unusedSnodes.randomElement()!
usedSnodes.insert(nextSnode)
poll(nextSnode, seal: seal).done(on: LokiAPI.workQueue) {
poll(nextSnode, seal: seal).done2 {
seal.fulfill(())
}.catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in
}.catch2 { [weak self] error in
if let error = error as? Error, error == .pollLimitReached {
self?.pollCount = 0
} else {
@ -94,7 +94,7 @@ public final class LokiPoller : NSObject {
}
private func poll(_ target: LokiAPITarget, seal longTermSeal: Resolver<Void>) -> Promise<Void> {
return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: LokiAPI.workQueue) { [weak self] rawResponse -> Promise<Void> in
return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then2 { [weak self] rawResponse -> Promise<Void> in
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise { $0.fulfill(()) } }
let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target)
strongSelf.onMessagesReceived(messages)

@ -3,9 +3,7 @@ import PromiseKit
/// See the "Onion Requests" section of [The Session Whitepaper](https://arxiv.org/pdf/2002.04609.pdf) for more information.
public enum OnionRequestAPI {
/// - Note: Must only be modified from `LokiAPI.workQueue`.
public static var guardSnodes: Set<LokiAPITarget> = []
/// - Note: Must only be modified from `LokiAPI.workQueue`.
public static var paths: [Path] = [] // Not a set to ensure we consistently show the same path to the user
private static var snodePool: Set<LokiAPITarget> {
@ -51,11 +49,10 @@ public enum OnionRequestAPI {
/// Tests the given snode. The returned promise errors out if the snode is faulty; the promise is fulfilled otherwise.
private static func testSnode(_ snode: LokiAPITarget) -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending()
let queue = DispatchQueue.global() // No need to block the work queue for this
queue.async {
DispatchQueue.global(qos: .userInitiated).async {
let url = "\(snode.address):\(snode.port)/get_stats/v1"
let timeout: TimeInterval = 3 // Use a shorter timeout for testing
HTTP.execute(.get, url, timeout: timeout).done(on: queue) { rawResponse in
HTTP.execute(.get, url, timeout: timeout).done2 { rawResponse in
guard let json = rawResponse as? JSON, let version = json["version"] as? String else { return seal.reject(Error.missingSnodeVersion) }
if version >= "2.0.0" {
seal.fulfill(())
@ -63,7 +60,7 @@ public enum OnionRequestAPI {
print("[Loki] [Onion Request API] Unsupported snode version: \(version).")
seal.reject(Error.unsupportedSnodeVersion(version))
}
}.catch(on: queue) { error in
}.catch2 { error in
seal.reject(error)
}
}
@ -77,7 +74,7 @@ public enum OnionRequestAPI {
return Promise<Set<LokiAPITarget>> { $0.fulfill(guardSnodes) }
} else {
print("[Loki] [Onion Request API] Populating guard snode cache.")
return LokiAPI.getRandomSnode().then(on: LokiAPI.workQueue) { _ -> Promise<Set<LokiAPITarget>> in // Just used to populate the snode pool
return LokiAPI.getRandomSnode().then2 { _ -> Promise<Set<LokiAPITarget>> in // Just used to populate the snode pool
var unusedSnodes = snodePool // Sync on LokiAPI.workQueue
guard unusedSnodes.count >= guardSnodeCount else { throw Error.insufficientSnodes }
func getGuardSnode() -> Promise<LokiAPITarget> {
@ -86,10 +83,10 @@ public enum OnionRequestAPI {
unusedSnodes.remove(candidate) // All used snodes should be unique
print("[Loki] [Onion Request API] Testing guard snode: \(candidate).")
// Loop until a reliable guard snode is found
return testSnode(candidate).map(on: LokiAPI.workQueue) { candidate }.recover(on: LokiAPI.workQueue) { _ in getGuardSnode() }
return testSnode(candidate).map2 { candidate }.recover2 { _ in getGuardSnode() }
}
let promises = (0..<guardSnodeCount).map { _ in getGuardSnode() }
return when(fulfilled: promises).map(on: LokiAPI.workQueue) { guardSnodes in
return when(fulfilled: promises).map2 { guardSnodes in
let guardSnodesAsSet = Set(guardSnodes)
OnionRequestAPI.guardSnodes = guardSnodesAsSet
return guardSnodesAsSet
@ -105,8 +102,8 @@ public enum OnionRequestAPI {
DispatchQueue.main.async {
NotificationCenter.default.post(name: .buildingPaths, object: nil)
}
return LokiAPI.getRandomSnode().then(on: LokiAPI.workQueue) { _ -> Promise<[Path]> in // Just used to populate the snode pool
return getGuardSnodes().map(on: LokiAPI.workQueue) { guardSnodes -> [Path] in
return LokiAPI.getRandomSnode().then2 { _ -> Promise<[Path]> in // Just used to populate the snode pool
return getGuardSnodes().map2 { guardSnodes -> [Path] in
var unusedSnodes = snodePool.subtracting(guardSnodes)
let pathSnodeCount = guardSnodeCount * pathSize - guardSnodeCount
guard unusedSnodes.count >= pathSnodeCount else { throw Error.insufficientSnodes }
@ -121,7 +118,7 @@ public enum OnionRequestAPI {
print("[Loki] [Onion Request API] Built new onion request path: \(result.prettifiedDescription).")
return result
}
}.map(on: LokiAPI.workQueue) { paths in
}.map2 { paths in
OnionRequestAPI.paths = paths
try! Storage.writeSync { transaction in
print("[Loki] Persisting onion request paths to database.")
@ -155,7 +152,7 @@ public enum OnionRequestAPI {
seal.fulfill(paths.filter { !$0.contains(snode) }.randomElement()!)
}
} else {
return buildPaths().map(on: LokiAPI.workQueue) { paths in
return buildPaths().map2 { paths in
return paths.filter { !$0.contains(snode) }.randomElement()!
}
}
@ -177,10 +174,10 @@ public enum OnionRequestAPI {
var guardSnode: LokiAPITarget!
var targetSnodeSymmetricKey: Data! // Needed by invoke(_:on:with:) to decrypt the response sent back by the target snode
var encryptionResult: EncryptionResult!
return getPath(excluding: snode).then(on: LokiAPI.workQueue) { path -> Promise<EncryptionResult> in
return getPath(excluding: snode).then2 { path -> Promise<EncryptionResult> in
guardSnode = path.first!
// Encrypt in reverse order, i.e. the target snode first
return encrypt(payload, forTargetSnode: snode).then(on: LokiAPI.workQueue) { r -> Promise<EncryptionResult> in
return encrypt(payload, forTargetSnode: snode).then2 { r -> Promise<EncryptionResult> in
targetSnodeSymmetricKey = r.symmetricKey
// Recursively encrypt the layers of the onion (again in reverse order)
encryptionResult = r
@ -191,7 +188,7 @@ public enum OnionRequestAPI {
return Promise<EncryptionResult> { $0.fulfill(encryptionResult) }
} else {
let lhs = path.removeLast()
return OnionRequestAPI.encryptHop(from: lhs, to: rhs, using: encryptionResult).then(on: LokiAPI.workQueue) { r -> Promise<EncryptionResult> in
return OnionRequestAPI.encryptHop(from: lhs, to: rhs, using: encryptionResult).then2 { r -> Promise<EncryptionResult> in
encryptionResult = r
rhs = lhs
return addLayer()
@ -200,7 +197,7 @@ public enum OnionRequestAPI {
}
return addLayer()
}
}.map(on: LokiAPI.workQueue) { _ in (guardSnode, encryptionResult, targetSnodeSymmetricKey) }
}.map2 { _ in (guardSnode, encryptionResult, targetSnodeSymmetricKey) }
}
// MARK: Internal API
@ -208,9 +205,9 @@ public enum OnionRequestAPI {
internal static func sendOnionRequest(invoking method: LokiAPITarget.Method, on snode: LokiAPITarget, with parameters: JSON, associatedWith hexEncodedPublicKey: String) -> Promise<JSON> {
let (promise, seal) = Promise<JSON>.pending()
var guardSnode: LokiAPITarget!
LokiAPI.workQueue.async {
DispatchQueue.global(qos: .userInitiated).async {
let payload: JSON = [ "method" : method.rawValue, "params" : parameters ]
buildOnion(around: payload, targetedAt: snode).done(on: LokiAPI.workQueue) { intermediate in
buildOnion(around: payload, targetedAt: snode).done2 { intermediate in
guardSnode = intermediate.guardSnode
let url = "\(guardSnode.address):\(guardSnode.port)/onion_req"
let finalEncryptionResult = intermediate.finalEncryptionResult
@ -220,7 +217,7 @@ public enum OnionRequestAPI {
"ephemeral_key" : finalEncryptionResult.ephemeralPublicKey.toHexString()
]
let targetSnodeSymmetricKey = intermediate.targetSnodeSymmetricKey
HTTP.execute(.post, url, parameters: parameters).done(on: LokiAPI.workQueue) { rawResponse in
HTTP.execute(.post, url, parameters: parameters).done2 { rawResponse in
guard let json = rawResponse as? JSON, let base64EncodedIVAndCiphertext = json["result"] as? String,
let ivAndCiphertext = Data(base64Encoded: base64EncodedIVAndCiphertext) else { return seal.reject(HTTP.Error.invalidJSON) }
let iv = ivAndCiphertext[0..<Int(ivSize)]
@ -238,14 +235,14 @@ public enum OnionRequestAPI {
} catch (let error) {
seal.reject(error)
}
}.catch(on: LokiAPI.workQueue) { error in
}.catch2 { error in
seal.reject(error)
}
}.catch(on: LokiAPI.workQueue) { error in
}.catch2 { error in
seal.reject(error)
}
}
promise.catch(on: LokiAPI.workQueue) { error in // Must be invoked on LokiAPI.workQueue
promise.catch2 { error in // Must be invoked on LokiAPI.workQueue
guard case HTTP.Error.httpRequestFailed(_, _) = error else { return }
dropAllPaths() // A snode in the path is bad; retry with a different path
dropGuardSnode(guardSnode)
@ -259,7 +256,7 @@ public enum OnionRequestAPI {
private extension Promise where T == JSON {
func handlingErrorsIfNeeded(forTargetSnode snode: LokiAPITarget, associatedWith hexEncodedPublicKey: String) -> Promise<JSON> {
return recover(on: LokiAPI.errorHandlingQueue) { error -> Promise<JSON> in // Must be invoked on LokiAPI.errorHandlingQueue
return recover2 { error -> Promise<JSON> in // Must be invoked on LokiAPI.errorHandlingQueue
// The code below is very similar to that in LokiAPI.handlingSnodeErrorsIfNeeded(for:associatedWith:), but unfortunately slightly
// different due to the fact that OnionRequestAPI uses the newer HTTP API, whereas LokiAPI still uses TSNetworkManager
guard case OnionRequestAPI.Error.httpRequestFailedAtTargetSnode(let statusCode, let json) = error else { throw error }

@ -86,11 +86,11 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} else {
queryParameters += "&count=\(fallbackBatchCount)&include_deleted=0"
}
return getAuthToken(for: server).then { token -> Promise<[LokiPublicChatMessage]> in
return getAuthToken(for: server).then2 { token -> Promise<[LokiPublicChatMessage]> in
let url = URL(string: "\(server)/channels/\(channel)/messages?\(queryParameters)")!
let request = TSRequest(url: url)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map(on: DispatchQueue.global()) { rawResponse in
return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["data"] as? [JSON] else {
print("[Loki] Couldn't parse messages for public chat channel with ID: \(channel) on server: \(server) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed
@ -172,17 +172,16 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
public static func sendMessage(_ message: LokiPublicChatMessage, to channel: UInt64, on server: String) -> Promise<LokiPublicChatMessage> {
print("[Loki] Sending message to public chat channel with ID: \(channel) on server: \(server).")
let (promise, seal) = Promise<LokiPublicChatMessage>.pending()
let queue = DispatchQueue.global()
queue.async { [privateKey = userKeyPair.privateKey] in
DispatchQueue.global(qos: .userInitiated).async { [privateKey = userKeyPair.privateKey] in
guard let signedMessage = message.sign(with: privateKey) else { return seal.reject(LokiDotNetAPIError.signingFailed) }
attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) {
getAuthToken(for: server).then { token -> Promise<LokiPublicChatMessage> in
attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<LokiPublicChatMessage> in
let url = URL(string: "\(server)/channels/\(channel)/messages")!
let parameters = signedMessage.toJSON()
let request = TSRequest(url: url, method: "POST", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
let displayName = userDisplayName
return LokiFileServerProxy(for: server).perform(request).map { rawResponse in
return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in
// ISO8601DateFormatter doesn't support milliseconds before iOS 11
let dateFormatter = DateFormatter()
dateFormatter.dateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
@ -195,9 +194,9 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
return LokiPublicChatMessage(serverID: serverID, hexEncodedPublicKey: getUserHexEncodedPublicKey(), displayName: displayName, profilePicture: signedMessage.profilePicture, body: body, type: publicChatMessageType, timestamp: timestamp, quote: signedMessage.quote, attachments: signedMessage.attachments, signature: signedMessage.signature)
}
}.handlingInvalidAuthTokenIfNeeded(for: server)
}.done { message in
}.done2 { message in
seal.fulfill(message)
}.catch { error in
}.catch2 { error in
seal.reject(error)
}
}
@ -213,11 +212,11 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} else {
queryParameters = "count=\(fallbackBatchCount)"
}
return getAuthToken(for: server).then { token -> Promise<[UInt64]> in
return getAuthToken(for: server).then2 { token -> Promise<[UInt64]> in
let url = URL(string: "\(server)/loki/v1/channel/\(channel)/deletes?\(queryParameters)")!
let request = TSRequest(url: url)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map { rawResponse in
return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in
guard let json = rawResponse as? JSON, let deletions = json["data"] as? [JSON] else {
print("[Loki] Couldn't parse deleted messages for public chat channel with ID: \(channel) on server: \(server) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed
@ -244,12 +243,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
let isModerationRequest = !isSentByUser
print("[Loki] Deleting message with ID: \(messageID) for public chat channel with ID: \(channel) on server: \(server) (isModerationRequest = \(isModerationRequest)).")
let urlAsString = isSentByUser ? "\(server)/channels/\(channel)/messages/\(messageID)" : "\(server)/loki/v1/moderation/message/\(messageID)"
return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
getAuthToken(for: server).then { token -> Promise<Void> in
return attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<Void> in
let url = URL(string: urlAsString)!
let request = TSRequest(url: url, method: "DELETE", parameters: [:])
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).done { result -> Void in
return LokiFileServerProxy(for: server).perform(request).done2 { result -> Void in
print("[Loki] Deleted message with ID: \(messageID) on server: \(server).")
}
}.handlingInvalidAuthTokenIfNeeded(for: server)
@ -262,11 +261,11 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
guard let hexEncodedPublicKeys = displayNameUpdatees[publicChatID] else { return Promise.value(()) }
displayNameUpdatees[publicChatID] = []
print("[Loki] Getting display names for: \(hexEncodedPublicKeys).")
return getAuthToken(for: server).then { token -> Promise<Void> in
return getAuthToken(for: server).then2 { token -> Promise<Void> in
let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1"
let url = URL(string: "\(server)/users?\(queryParameters)")!
let request = TSRequest(url: url)
return LokiFileServerProxy(for: server).perform(request).map { rawResponse in
return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in
guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else {
print("[Loki] Couldn't parse display names for users: \(hexEncodedPublicKeys) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed
@ -292,12 +291,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
public static func setDisplayName(to newDisplayName: String?, on server: String) -> Promise<Void> {
print("[Loki] Updating display name on server: \(server).")
let parameters: JSON = [ "name" : (newDisplayName ?? "") ]
return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
getAuthToken(for: server).then { token -> Promise<Void> in
return attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<Void> in
let url = URL(string: "\(server)/users/me")!
let request = TSRequest(url: url, method: "PATCH", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in
return LokiFileServerProxy(for: server).perform(request).map2 { _ in }.recover2 { error in
print("Couldn't update display name due to error: \(error).")
throw error
}
@ -317,12 +316,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
annotation["value"] = [ "profileKey" : profileKey.base64EncodedString(), "url" : url ]
}
let parameters: JSON = [ "annotations" : [ annotation ] ]
return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
getAuthToken(for: server).then { token -> Promise<Void> in
return attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<Void> in
let url = URL(string: "\(server)/users/me")!
let request = TSRequest(url: url, method: "PATCH", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map { _ in }.recover { error in
return LokiFileServerProxy(for: server).perform(request).map2 { _ in }.recover2 { error in
print("[Loki] Couldn't update profile picture due to error: \(error).")
throw error
}
@ -337,12 +336,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
}
public static func getInfo(for channel: UInt64, on server: String) -> Promise<LokiPublicChatInfo> {
return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
getAuthToken(for: server).then { token -> Promise<LokiPublicChatInfo> in
return attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<LokiPublicChatInfo> in
let url = URL(string: "\(server)/channels/\(channel)?include_annotations=1")!
let request = TSRequest(url: url)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map { rawResponse in
return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in
guard let json = rawResponse as? JSON,
let data = json["data"] as? JSON,
let annotations = data["annotations"] as? [JSON],
@ -366,12 +365,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
}
public static func join(_ channel: UInt64, on server: String) -> Promise<Void> {
return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
getAuthToken(for: server).then { token -> Promise<Void> in
return attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<Void> in
let url = URL(string: "\(server)/channels/\(channel)/subscribe")!
let request = TSRequest(url: url, method: "POST", parameters: [:])
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).done { result -> Void in
return LokiFileServerProxy(for: server).perform(request).done2 { result -> Void in
print("[Loki] Joined channel with ID: \(channel) on server: \(server).")
}
}.handlingInvalidAuthTokenIfNeeded(for: server)
@ -379,12 +378,12 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
}
public static func leave(_ channel: UInt64, on server: String) -> Promise<Void> {
return attempt(maxRetryCount: maxRetryCount, recoveringOn: DispatchQueue.global()) {
getAuthToken(for: server).then { token -> Promise<Void> in
return attempt(maxRetryCount: maxRetryCount) {
getAuthToken(for: server).then2 { token -> Promise<Void> in
let url = URL(string: "\(server)/channels/\(channel)/subscribe")!
let request = TSRequest(url: url, method: "DELETE", parameters: [:])
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).done { result -> Void in
return LokiFileServerProxy(for: server).perform(request).done2 { result -> Void in
print("[Loki] Left channel with ID: \(channel) on server: \(server).")
}
}.handlingInvalidAuthTokenIfNeeded(for: server)
@ -401,16 +400,16 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
let url = URL(string: "\(server)/loki/v1/channels/\(channel)/messages/\(messageID)/report")!
let request = TSRequest(url: url, method: "POST", parameters: [:])
// Only used for the Loki Public Chat which doesn't require authentication
return LokiFileServerProxy(for: server).perform(request).map { _ in }
return LokiFileServerProxy(for: server).perform(request).map2 { _ in }
}
// MARK: Moderators
public static func getModerators(for channel: UInt64, on server: String) -> Promise<Set<String>> {
return getAuthToken(for: server).then { token -> Promise<Set<String>> in
return getAuthToken(for: server).then2 { token -> Promise<Set<String>> in
let url = URL(string: "\(server)/loki/v1/channel/\(channel)/get_moderators")!
let request = TSRequest(url: url)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json", "Authorization" : "Bearer \(token)" ]
return LokiFileServerProxy(for: server).perform(request).map { rawResponse in
return LokiFileServerProxy(for: server).perform(request).map2 { rawResponse in
guard let json = rawResponse as? JSON, let moderators = json["moderators"] as? [String] else {
print("[Loki] Couldn't parse moderators for public chat channel with ID: \(channel) on server: \(server) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed

@ -56,9 +56,9 @@ public final class LokiPublicChatManager : NSObject {
return Promise(error: Error.chatCreationFailed)
}
}
return LokiPublicChatAPI.getAuthToken(for: server).then { token in
return LokiPublicChatAPI.getAuthToken(for: server).then2 { token in
return LokiPublicChatAPI.getInfo(for: channel, on: server)
}.map { channelInfo -> LokiPublicChat in
}.map2 { channelInfo -> LokiPublicChat in
guard let chat = self.addChat(server: server, channel: channel, name: channelInfo.displayName) else { throw Error.chatCreationFailed }
return chat
}

@ -54,7 +54,7 @@ public final class LokiPublicChatPoller : NSObject {
public func pollForNewMessages() -> Promise<Void> {
let publicChat = self.publicChat
let userHexEncodedPublicKey = self.userHexEncodedPublicKey
return LokiPublicChatAPI.getMessages(for: publicChat.channel, on: publicChat.server).done(on: DispatchQueue.global()) { messages in
return LokiPublicChatAPI.getMessages(for: publicChat.channel, on: publicChat.server).done2 { messages in
let uniqueHexEncodedPublicKeys = Set(messages.map { $0.hexEncodedPublicKey })
func proceed() {
let storage = OWSPrimaryStorage.shared()
@ -188,12 +188,12 @@ public final class LokiPublicChatPoller : NSObject {
return timeSinceLastUpdate > MultiDeviceProtocol.deviceLinkUpdateInterval
}
if !hexEncodedPublicKeysToUpdate.isEmpty {
LokiFileServerAPI.getDeviceLinks(associatedWith: hexEncodedPublicKeysToUpdate).done(on: DispatchQueue.global()) { _ in
LokiFileServerAPI.getDeviceLinks(associatedWith: hexEncodedPublicKeysToUpdate).done2 { _ in
proceed()
hexEncodedPublicKeysToUpdate.forEach {
MultiDeviceProtocol.lastDeviceLinkUpdate[$0] = Date() // TODO: Doing this from a global queue seems a bit iffy
}
}.catch(on: DispatchQueue.global()) { error in
}.catch2 { error in
if (error as? LokiDotNetAPI.LokiDotNetAPIError) == LokiDotNetAPI.LokiDotNetAPIError.parsingFailed {
// Don't immediately re-fetch in case of failure due to a parsing error
hexEncodedPublicKeysToUpdate.forEach {
@ -203,7 +203,7 @@ public final class LokiPublicChatPoller : NSObject {
proceed()
}
} else {
DispatchQueue.global().async {
DispatchQueue.global(qos: .userInitiated).async {
proceed()
}
}
@ -212,7 +212,7 @@ public final class LokiPublicChatPoller : NSObject {
private func pollForDeletedMessages() {
let publicChat = self.publicChat
let _ = LokiPublicChatAPI.getDeletedMessageServerIDs(for: publicChat.channel, on: publicChat.server).done(on: DispatchQueue.global()) { deletedMessageServerIDs in
let _ = LokiPublicChatAPI.getDeletedMessageServerIDs(for: publicChat.channel, on: publicChat.server).done2 { deletedMessageServerIDs in
try! Storage.writeSync { transaction in
let deletedMessageIDs = deletedMessageServerIDs.compactMap { OWSPrimaryStorage.shared().getIDForMessage(withServerID: UInt($0), in: transaction) }
deletedMessageIDs.forEach { messageID in

@ -18,7 +18,7 @@ public enum LokiRSSFeedProxy {
let endpoint = endpoints.first { url.lowercased().contains($0.key) }!.value
let url = URL(string: server + "/" + endpoint)!
let request = TSRequest(url: url)
return LokiFileServerProxy(for: server).perform(request).map { response -> String in
return LokiFileServerProxy(for: server).perform(request).map2 { response -> String in
guard let json = response as? JSON, let xml = json["data"] as? String else { throw Error.proxyResponseParsingFailed }
return xml
}

@ -2,14 +2,8 @@
@objc(LKMentionsManager)
public final class MentionsManager : NSObject {
private static var _userHexEncodedPublicKeyCache: [String:Set<String>] = [:]
/// A mapping from thread ID to set of user hex encoded public keys.
@objc public static var userPublicKeyCache: [String:Set<String>] {
get { LokiAPI.stateQueue.sync { _userHexEncodedPublicKeyCache } }
set { LokiAPI.stateQueue.sync { _userHexEncodedPublicKeyCache = newValue } }
}
// TODO: I don't think stateQueue actually helps avoid race conditions
@objc public static var userPublicKeyCache: [String:Set<String>] = [:]
internal static var storage: OWSPrimaryStorage { OWSPrimaryStorage.shared() }

@ -15,12 +15,8 @@ import PromiseKit
@objc(LKMultiDeviceProtocol)
public final class MultiDeviceProtocol : NSObject {
private static var _lastDeviceLinkUpdate: [String:Date] = [:]
/// A mapping from hex encoded public key to date updated.
public static var lastDeviceLinkUpdate: [String:Date] {
get { LokiAPI.stateQueue.sync { _lastDeviceLinkUpdate } }
set { LokiAPI.stateQueue.sync { _lastDeviceLinkUpdate = newValue } }
}
public static var lastDeviceLinkUpdate: [String:Date] = [:]
// TODO: I don't think stateQueue actually helps avoid race conditions

@ -22,7 +22,7 @@ public class LokiSessionResetImplementation : NSObject, SessionResetProtocol {
guard let preKeyMessage = whisperMessage as? PreKeyWhisperMessage else { return }
guard let storedPreKey = storage.getPreKeyRecord(forContact: recipientID, transaction: transaction) else {
print("[Loki] Received a friend request accepted message from a public key for which no pre key bundle was created.")
return // FIXME: This is causing trouble when it shouldn't...
return
}
guard storedPreKey.id == preKeyMessage.prekeyID else {
print("[Loki] Received a `PreKeyWhisperMessage` (friend request accepted message) from an unknown source.")

@ -48,7 +48,7 @@ public final class SyncMessagesProtocol : NSObject {
let friends = Set(hepks).map { SignalAccount(recipientId: $0) }
let syncManager = SSKEnvironment.shared.syncManager
let promises = friends.chunked(by: 3).map { friends -> Promise<Void> in // TODO: Does this always fit?
return Promise(syncManager.syncContacts(for: friends)).map { _ in }
return Promise(syncManager.syncContacts(for: friends)).map2 { _ in }
}
return when(fulfilled: promises)
}
@ -67,7 +67,7 @@ public final class SyncMessagesProtocol : NSObject {
}
let syncManager = SSKEnvironment.shared.syncManager
let promises = groups.map { group -> Promise<Void> in
return Promise(syncManager.syncGroup(for: group)).map { _ in }
return Promise(syncManager.syncGroup(for: group)).map2 { _ in }
}
return when(fulfilled: promises)
}

@ -36,7 +36,7 @@ public final class LokiPushNotificationManager : NSObject {
let url = URL(string: server + "register")!
let request = TSRequest(url: url, method: "POST", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json" ]
let promise = TSNetworkManager.shared().makePromise(request: request).map { _, response in
let promise = TSNetworkManager.shared().makePromise(request: request).map2 { _, response in
guard let json = response as? JSON else {
return print("[Loki] Couldn't register device token.")
}
@ -48,7 +48,7 @@ public final class LokiPushNotificationManager : NSObject {
userDefaults[.isUsingFullAPNs] = false
return
}
promise.catch { error in
promise.catch2 { error in
print("[Loki] Couldn't register device token.")
}
return promise
@ -77,7 +77,7 @@ public final class LokiPushNotificationManager : NSObject {
let url = URL(string: server + "register")!
let request = TSRequest(url: url, method: "POST", parameters: parameters)
request.allHTTPHeaderFields = [ "Content-Type" : "application/json" ]
let promise = TSNetworkManager.shared().makePromise(request: request).map { _, response in
let promise = TSNetworkManager.shared().makePromise(request: request).map2 { _, response in
guard let json = response as? JSON else {
return print("[Loki] Couldn't register device token.")
}
@ -89,7 +89,7 @@ public final class LokiPushNotificationManager : NSObject {
userDefaults[.isUsingFullAPNs] = true
return
}
promise.catch { error in
promise.catch2 { error in
print("[Loki] Couldn't register device token.")
}
return promise

@ -204,7 +204,7 @@ public class LokiP2PAPI : NSObject {
return
}
messageSender.sendPromise(message: message).catch { error in
messageSender.sendPromise(message: message).catch2 { error in
Logger.warn("Failed to send online status to \(thread.contactIdentifier()).")
}.retainUntilComplete()
}

@ -3,7 +3,7 @@ import PromiseKit
/// Retry the promise constructed in `body` up to `maxRetryCount` times.
///
/// - Note: Intentionally explicit about the recovery queue at the call site.
internal func attempt<T>(maxRetryCount: UInt, recoveringOn queue: DispatchQueue, body: @escaping () -> Promise<T>) -> Promise<T> {
internal func attempt<T>(maxRetryCount: UInt, recoveringOn queue: DispatchQueue = .global(qos: .userInitiated), body: @escaping () -> Promise<T>) -> Promise<T> {
var retryCount = 0
func attempt() -> Promise<T> {
return body().recover(on: queue) { error -> Promise<T> in

@ -0,0 +1,76 @@
import PromiseKit
public extension Thenable {
func then2<U>(_ body: @escaping (T) throws -> U) -> Promise<U.T> where U : Thenable {
return then(on: DispatchQueue.global(qos: .userInitiated), body)
}
func map2<U>(_ transform: @escaping (T) throws -> U) -> Promise<U> {
return map(on: DispatchQueue.global(qos: .userInitiated), transform)
}
func done2(_ body: @escaping (T) throws -> Void) -> Promise<Void> {
return done(on: DispatchQueue.global(qos: .userInitiated), body)
}
func get2(_ body: @escaping (T) throws -> Void) -> Promise<T> {
return get(on: DispatchQueue.global(qos: .userInitiated), body)
}
}
public extension Thenable where T: Sequence {
func mapValues2<U>(_ transform: @escaping (T.Iterator.Element) throws -> U) -> Promise<[U]> {
return mapValues(on: DispatchQueue.global(qos: .userInitiated), transform)
}
}
public extension Guarantee {
func then2<U>(_ body: @escaping (T) -> Guarantee<U>) -> Guarantee<U> {
return then(on: DispatchQueue.global(qos: .userInitiated), body)
}
func map2<U>(_ body: @escaping (T) -> U) -> Guarantee<U> {
return map(on: DispatchQueue.global(qos: .userInitiated), body)
}
func done2(_ body: @escaping (T) -> Void) -> Guarantee<Void> {
return done(on: DispatchQueue.global(qos: .userInitiated), body)
}
func get2(_ body: @escaping (T) -> Void) -> Guarantee<T> {
return get(on: DispatchQueue.global(qos: .userInitiated), body)
}
}
public extension CatchMixin {
func catch2(_ body: @escaping (Error) -> Void) -> PMKFinalizer {
return self.catch(on: DispatchQueue.global(qos: .userInitiated), body)
}
func recover2<U: Thenable>(_ body: @escaping(Error) throws -> U) -> Promise<T> where U.T == T {
return recover(on: DispatchQueue.global(qos: .userInitiated), body)
}
func recover2(_ body: @escaping(Error) -> Guarantee<T>) -> Guarantee<T> {
return recover(on: DispatchQueue.global(qos: .userInitiated), body)
}
func ensure2(_ body: @escaping () -> Void) -> Promise<T> {
return ensure(on: DispatchQueue.global(qos: .userInitiated), body)
}
}
public extension CatchMixin where T == Void {
func recover2(_ body: @escaping(Error) -> Void) -> Guarantee<Void> {
return recover(on: DispatchQueue.global(qos: .userInitiated), body)
}
func recover2(_ body: @escaping(Error) throws -> Void) -> Promise<Void> {
return recover(on: DispatchQueue.global(qos: .userInitiated), body)
}
}
Loading…
Cancel
Save