mirror of https://github.com/oxen-io/session-ios
commit
a19315cfeb
@ -1 +1 @@
|
||||
Subproject commit 960a820c76a95a64cfb1c6ad721c68f73a8f27b9
|
||||
Subproject commit 8b30c2d91fe7f9743350dd30521b5ca74e78766c
|
@ -1,49 +0,0 @@
|
||||
import PromiseKit
|
||||
|
||||
internal extension LokiAPI {
|
||||
|
||||
private static let receivedMessageHashValuesKey = "receivedMessageHashValuesKey"
|
||||
private static let receivedMessageHashValuesCollection = "receivedMessageHashValuesCollection"
|
||||
|
||||
internal static func getLastMessageHashValue(for target: LokiAPITarget) -> String? {
|
||||
var result: String? = nil
|
||||
// Uses a read/write connection because getting the last message hash value also removes expired messages as needed
|
||||
// TODO: This shouldn't be the case; a getter shouldn't have an unexpected side effect
|
||||
storage.dbReadWriteConnection.readWrite { transaction in
|
||||
result = storage.getLastMessageHash(forServiceNode: target.address, transaction: transaction)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
internal static func setLastMessageHashValue(for target: LokiAPITarget, hashValue: String, expirationDate: UInt64) {
|
||||
storage.dbReadWriteConnection.readWrite { transaction in
|
||||
storage.setLastMessageHash(forServiceNode: target.address, hash: hashValue, expiresAt: expirationDate, transaction: transaction)
|
||||
}
|
||||
}
|
||||
|
||||
internal static func getReceivedMessageHashValues() -> Set<String>? {
|
||||
var result: Set<String>? = nil
|
||||
storage.dbReadConnection.read { transaction in
|
||||
result = transaction.object(forKey: receivedMessageHashValuesKey, inCollection: receivedMessageHashValuesCollection) as! Set<String>?
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
internal static func setReceivedMessageHashValues(to receivedMessageHashValues: Set<String>) {
|
||||
storage.dbReadWriteConnection.readWrite { transaction in
|
||||
transaction.setObject(receivedMessageHashValues, forKey: receivedMessageHashValuesKey, inCollection: receivedMessageHashValuesCollection)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal extension Promise {
|
||||
|
||||
internal func recoveringNetworkErrorsIfNeeded() -> Promise<T> {
|
||||
return recover() { error -> Promise<T> in
|
||||
switch error {
|
||||
case NetworkManagerError.taskError(_, let underlyingError): throw underlyingError
|
||||
default: throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,116 +0,0 @@
|
||||
import PromiseKit
|
||||
|
||||
private typealias Callback = () -> Void
|
||||
|
||||
public extension LokiAPI {
|
||||
private static var isLongPolling = false
|
||||
private static var shouldStopPolling = false
|
||||
private static var usedSnodes = [LokiAPITarget]()
|
||||
private static var cancels = [Callback]()
|
||||
|
||||
/// Start long polling.
|
||||
/// This will send a notification if new messages were received
|
||||
@objc public static func startLongPollingIfNeeded() {
|
||||
guard !isLongPolling else { return }
|
||||
isLongPolling = true
|
||||
shouldStopPolling = false
|
||||
|
||||
print("[Loki] Started long polling.")
|
||||
|
||||
longPoll()
|
||||
}
|
||||
|
||||
/// Stop long polling
|
||||
@objc public static func stopLongPolling() {
|
||||
shouldStopPolling = true
|
||||
isLongPolling = false
|
||||
usedSnodes.removeAll()
|
||||
cancelAllPromises()
|
||||
|
||||
print("[Loki] Stopped long polling.")
|
||||
}
|
||||
|
||||
/// The long polling loop
|
||||
private static func longPoll() {
|
||||
// This is here so we can stop the infinite loop
|
||||
guard !shouldStopPolling else { return }
|
||||
|
||||
getSwarm(for: userHexEncodedPublicKey).then { _ -> Guarantee<[Result<Void>]> in
|
||||
var promises = [Promise<Void>]()
|
||||
let connections = 3
|
||||
for i in 0..<connections {
|
||||
let (promise, cancel) = openConnection()
|
||||
promises.append(promise)
|
||||
cancels.append(cancel)
|
||||
}
|
||||
return when(resolved: promises)
|
||||
}.done { _ in
|
||||
// Since all promises are complete, we can clear the cancels
|
||||
cancelAllPromises()
|
||||
|
||||
// Keep long polling until it is stopped
|
||||
longPoll()
|
||||
}.retainUntilComplete()
|
||||
}
|
||||
|
||||
private static func cancelAllPromises() {
|
||||
cancels.forEach { cancel in cancel() }
|
||||
cancels.removeAll()
|
||||
}
|
||||
|
||||
private static func getUnusedSnodes() -> [LokiAPITarget] {
|
||||
let snodes = LokiAPI.swarmCache[userHexEncodedPublicKey] ?? []
|
||||
return snodes.filter { !usedSnodes.contains($0) }
|
||||
}
|
||||
|
||||
/// Open a connection to an unused snode and get messages from it
|
||||
private static func openConnection() -> (Promise<Void>, cancel: Callback) {
|
||||
var isCancelled = false
|
||||
|
||||
let cancel = {
|
||||
isCancelled = true
|
||||
}
|
||||
|
||||
func connectToNextSnode() -> Promise<Void> {
|
||||
guard let nextSnode = getUnusedSnodes().first else {
|
||||
// We don't have anymore unused snodes
|
||||
return Promise.value(())
|
||||
}
|
||||
|
||||
// Add the snode to the used array
|
||||
usedSnodes.append(nextSnode)
|
||||
|
||||
func getMessagesInfinitely(from target: LokiAPITarget) -> Promise<Void> {
|
||||
// The only way to exit the infinite loop is to throw an error 3 times or cancel
|
||||
return getRawMessages(from: target, usingLongPolling: true).then { rawResponse -> Promise<Void> in
|
||||
// Check if we need to abort
|
||||
guard !isCancelled else { throw PMKError.cancelled }
|
||||
|
||||
// Process the messages
|
||||
let messages = parseRawMessagesResponse(rawResponse, from: target)
|
||||
|
||||
// Send our messages as a notification
|
||||
NotificationCenter.default.post(name: .newMessagesReceived, object: nil, userInfo: ["messages": messages])
|
||||
|
||||
// Continue fetching if we haven't cancelled
|
||||
return getMessagesInfinitely(from: target)
|
||||
}.retryingIfNeeded(maxRetryCount: 3)
|
||||
}
|
||||
|
||||
// Keep getting messages for this snode
|
||||
// If we errored out then connect to the next snode
|
||||
return getMessagesInfinitely(from: nextSnode).recover { _ -> Promise<Void> in
|
||||
// Cancelled, so just return successfully
|
||||
guard !isCancelled else { return Promise.value(()) }
|
||||
|
||||
// Connect to the next snode if we haven't cancelled
|
||||
// We also need to remove the cached snode so we don't contact it again
|
||||
dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey)
|
||||
return connectToNextSnode()
|
||||
}
|
||||
}
|
||||
|
||||
// Keep connecting to snodes
|
||||
return (connectToNextSnode(), cancel)
|
||||
}
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
import PromiseKit
|
||||
|
||||
@objc(LKLongPoller)
|
||||
public final class LokiLongPoller : NSObject {
|
||||
private let onMessagesReceived: ([SSKProtoEnvelope]) -> Void
|
||||
private let storage = OWSPrimaryStorage.shared()
|
||||
private var hasStarted = false
|
||||
private var hasStopped = false
|
||||
private var connections = Set<Promise<Void>>()
|
||||
private var usedSnodes = Set<LokiAPITarget>()
|
||||
|
||||
// MARK: Settings
|
||||
private let connectionCount = 3
|
||||
private let retryInterval: TimeInterval = 4
|
||||
|
||||
// MARK: Convenience
|
||||
private var userHexEncodedPublicKey: String { return OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey }
|
||||
|
||||
// MARK: Initialization
|
||||
@objc public init(onMessagesReceived: @escaping ([SSKProtoEnvelope]) -> Void) {
|
||||
self.onMessagesReceived = onMessagesReceived
|
||||
super.init()
|
||||
}
|
||||
|
||||
// MARK: Public API
|
||||
@objc public func startIfNeeded() {
|
||||
guard !hasStarted else { return }
|
||||
print("[Loki] Started long polling.")
|
||||
hasStarted = true
|
||||
hasStopped = false
|
||||
openConnections()
|
||||
}
|
||||
|
||||
@objc public func stopIfNeeded() {
|
||||
guard !hasStopped else { return }
|
||||
print("[Loki] Stopped long polling.")
|
||||
hasStarted = false
|
||||
hasStopped = true
|
||||
usedSnodes.removeAll()
|
||||
}
|
||||
|
||||
// MARK: Private API
|
||||
private func openConnections() {
|
||||
guard !hasStopped else { return }
|
||||
LokiAPI.getSwarm(for: userHexEncodedPublicKey).then { [weak self] _ -> Guarantee<[Result<Void>]> in
|
||||
guard let strongSelf = self else { return Guarantee.value([Result<Void>]()) }
|
||||
strongSelf.usedSnodes.removeAll()
|
||||
let connections: [Promise<Void>] = (0..<strongSelf.connectionCount).map { _ in
|
||||
let (promise, seal) = Promise<Void>.pending()
|
||||
strongSelf.openConnectionToNextSnode(seal: seal)
|
||||
return promise
|
||||
}
|
||||
strongSelf.connections = Set(connections)
|
||||
return when(resolved: connections)
|
||||
}.ensure { [weak self] in
|
||||
guard let strongSelf = self else { return }
|
||||
Timer.scheduledTimer(withTimeInterval: strongSelf.retryInterval, repeats: false) { _ in
|
||||
guard let strongSelf = self else { return }
|
||||
strongSelf.openConnections()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func openConnectionToNextSnode(seal: Resolver<Void>) {
|
||||
let swarm = LokiAPI.swarmCache[userHexEncodedPublicKey] ?? []
|
||||
let userHexEncodedPublicKey = self.userHexEncodedPublicKey
|
||||
let unusedSnodes = Set(swarm).subtracting(usedSnodes)
|
||||
if !unusedSnodes.isEmpty {
|
||||
let nextSnode = unusedSnodes.randomElement()!
|
||||
usedSnodes.insert(nextSnode)
|
||||
print("[Loki] Opening long polling connection to \(nextSnode).")
|
||||
longPoll(nextSnode, seal: seal).catch { [weak self] error in
|
||||
print("[Loki] Long polling connection to \(nextSnode) failed; dropping it and switching to next snode.")
|
||||
LokiAPI.dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey)
|
||||
self?.openConnectionToNextSnode(seal: seal)
|
||||
}
|
||||
} else {
|
||||
seal.fulfill(())
|
||||
}
|
||||
}
|
||||
|
||||
private func longPoll(_ target: LokiAPITarget, seal: Resolver<Void>) -> Promise<Void> {
|
||||
return LokiAPI.getRawMessages(from: target, usingLongPolling: true).then { [weak self] rawResponse -> Promise<Void> in
|
||||
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) }
|
||||
let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target)
|
||||
strongSelf.onMessagesReceived(messages)
|
||||
return strongSelf.longPoll(target, seal: seal)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue