pull/156/head
gmbnt 5 years ago
parent e55e0784e3
commit 90933c7856

@ -92,11 +92,11 @@ public extension LokiAPI {
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works // All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
let (promise, seal) = Promise<LokiAPITarget>.pending() let (promise, seal) = Promise<LokiAPITarget>.pending()
func getVersion(for snode: LokiAPITarget) -> Promise<String> { func getVersion(for snode: LokiAPITarget) -> Promise<String> {
let url = URL(string: "\(snode.address):\(snode.port)/get_stats/v1")!
let request = TSRequest(url: url)
if let version = snodeVersion[snode] { if let version = snodeVersion[snode] {
return Promise { $0.fulfill(version) } return Promise { $0.fulfill(version) }
} else { } else {
let url = URL(string: "\(snode.address):\(snode.port)/get_stats/v1")!
let request = TSRequest(url: url)
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { intermediate in return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { intermediate in
let rawResponse = intermediate.responseObject let rawResponse = intermediate.responseObject
guard let json = rawResponse as? JSON, let version = json["version"] as? String else { throw LokiAPIError.missingSnodeVersion } guard let json = rawResponse as? JSON, let version = json["version"] as? String else { throw LokiAPIError.missingSnodeVersion }
@ -105,8 +105,8 @@ public extension LokiAPI {
} }
} }
} }
getRandomSnode().done(on: DispatchQueue.global()) { snode in getRandomSnode().then(on: DispatchQueue.global()) { snode -> Promise<LokiAPITarget> in
getVersion(for: snode).then(on: DispatchQueue.global()) { version -> Promise<LokiAPITarget> in return getVersion(for: snode).then(on: DispatchQueue.global()) { version -> Promise<LokiAPITarget> in
if version >= "2.0.2" { if version >= "2.0.2" {
print("[Loki] Using file server proxy with version number \(version).") print("[Loki] Using file server proxy with version number \(version).")
return Promise { $0.fulfill(snode) } return Promise { $0.fulfill(snode) }
@ -117,6 +117,8 @@ public extension LokiAPI {
}.recover(on: DispatchQueue.global()) { error in }.recover(on: DispatchQueue.global()) { error in
return getFileServerProxy() return getFileServerProxy()
} }
}.done(on: DispatchQueue.global()) { snode in
seal.fulfill(snode)
}.catch(on: DispatchQueue.global()) { error in }.catch(on: DispatchQueue.global()) { error in
seal.reject(error) seal.reject(error)
} }

@ -9,7 +9,6 @@ public final class LokiPoller : NSObject {
private var usedSnodes = Set<LokiAPITarget>() private var usedSnodes = Set<LokiAPITarget>()
// MARK: Settings // MARK: Settings
private static let pollInterval: TimeInterval = 1
private static let retryInterval: TimeInterval = 4 private static let retryInterval: TimeInterval = 4
// MARK: Initialization // MARK: Initialization
@ -24,7 +23,7 @@ public final class LokiPoller : NSObject {
print("[Loki] Started polling.") print("[Loki] Started polling.")
hasStarted = true hasStarted = true
hasStopped = false hasStopped = false
openConnections() setUpPolling()
} }
@objc public func stopIfNeeded() { @objc public func stopIfNeeded() {
@ -36,7 +35,7 @@ public final class LokiPoller : NSObject {
} }
// MARK: Private API // MARK: Private API
private func openConnections() { private func setUpPolling() {
guard !hasStopped else { return } guard !hasStopped else { return }
LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then { [weak self] _ -> Promise<Void> in LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then { [weak self] _ -> Promise<Void> in
guard let strongSelf = self else { return Promise { $0.fulfill(()) } } guard let strongSelf = self else { return Promise { $0.fulfill(()) } }
@ -45,10 +44,10 @@ public final class LokiPoller : NSObject {
strongSelf.pollNextSnode(seal: seal) strongSelf.pollNextSnode(seal: seal)
return promise return promise
}.ensure { [weak self] in }.ensure { [weak self] in
guard let strongSelf = self else { return } guard let strongSelf = self, !strongSelf.hasStopped else { return }
Timer.scheduledTimer(withTimeInterval: LokiPoller.retryInterval, repeats: false) { _ in Timer.scheduledTimer(withTimeInterval: LokiPoller.retryInterval, repeats: false) { _ in
guard let strongSelf = self else { return } guard let strongSelf = self else { return }
strongSelf.openConnections() strongSelf.setUpPolling()
} }
} }
} }
@ -62,7 +61,9 @@ public final class LokiPoller : NSObject {
let nextSnode = unusedSnodes.randomElement()! let nextSnode = unusedSnodes.randomElement()!
usedSnodes.insert(nextSnode) usedSnodes.insert(nextSnode)
print("[Loki] Polling \(nextSnode).") print("[Loki] Polling \(nextSnode).")
poll(nextSnode, seal: seal).catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in poll(nextSnode, seal: seal).done(on: DispatchQueue.global()) {
seal.fulfill(())
}.catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in
print("[Loki] Polling \(nextSnode) failed; dropping it and switching to next snode.") print("[Loki] Polling \(nextSnode) failed; dropping it and switching to next snode.")
LokiAPI.dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey) LokiAPI.dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey)
self?.pollNextSnode(seal: seal) self?.pollNextSnode(seal: seal)
@ -72,12 +73,12 @@ public final class LokiPoller : NSObject {
} }
} }
private func poll(_ target: LokiAPITarget, seal: Resolver<Void>) -> Promise<Void> { private func poll(_ target: LokiAPITarget, seal longTermSeal: Resolver<Void>) -> Promise<Void> {
return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise<Void> in return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise<Void> in
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) } guard let strongSelf = self, !strongSelf.hasStopped else { return Promise { $0.fulfill(()) } }
let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target) let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target)
strongSelf.onMessagesReceived(messages) strongSelf.onMessagesReceived(messages)
return strongSelf.poll(target, seal: seal) return strongSelf.poll(target, seal: longTermSeal)
} }
} }
} }

Loading…
Cancel
Save