mirror of https://github.com/oxen-io/session-ios
Censorship circumvention in Egypt and UAE
* domain fronting * non-websocket message fetching // FREEBIEpull/1/head
parent
bcd371b96c
commit
ddba843d44
@ -0,0 +1,197 @@
|
||||
// Created by Michael Kirk on 12/19/16.
|
||||
// Copyright © 2016 Open Whisper Systems. All rights reserved.
|
||||
|
||||
import Foundation
|
||||
import PromiseKit
|
||||
|
||||
@objc(OWSMessageFetcherJob)
|
||||
class MessageFetcherJob: NSObject {
|
||||
|
||||
let TAG = "[MessageFetcherJob]"
|
||||
var timer: Timer?
|
||||
|
||||
// MARK: injected dependencies
|
||||
let networkManager: TSNetworkManager
|
||||
let messagesManager: TSMessagesManager
|
||||
let messageSender: MessageSender
|
||||
let signalService: OWSSignalService
|
||||
|
||||
// var fallbackTransport = false
|
||||
// ENABLED FOR DEBUG. DO NOT COMMIT!
|
||||
var fallbackTransport = true
|
||||
var runPromises = [Double: Promise<Void>]()
|
||||
|
||||
init(messagesManager: TSMessagesManager, messageSender: MessageSender, networkManager: TSNetworkManager, signalService: OWSSignalService) {
|
||||
self.messagesManager = messagesManager
|
||||
self.networkManager = networkManager
|
||||
self.messageSender = messageSender
|
||||
self.signalService = signalService
|
||||
}
|
||||
|
||||
func runAsync() {
|
||||
Logger.debug("\(TAG) \(#function)")
|
||||
guard signalService.isCensored else {
|
||||
Logger.debug("\(self.TAG) delegating message fetching to SocketManager since we're using normal transport.")
|
||||
TSSocketManager.becomeActive(fromBackgroundExpectMessage: true)
|
||||
return
|
||||
}
|
||||
|
||||
Logger.info("\(TAG) using fallback message fetching.")
|
||||
|
||||
let promiseId = NSDate().timeIntervalSince1970
|
||||
Logger.debug("\(self.TAG) starting promise: \(promiseId)")
|
||||
let runPromise = self.fetchUndeliveredMessages().then { (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool) -> () in
|
||||
for envelope in envelopes {
|
||||
Logger.info("\(self.TAG) received envelope.")
|
||||
self.messagesManager.handleReceivedEnvelope(envelope);
|
||||
|
||||
self.acknowledgeDelivery(envelope: envelope)
|
||||
}
|
||||
if more {
|
||||
Logger.info("\(self.TAG) more messages, so recursing.")
|
||||
// recurse
|
||||
self.runAsync()
|
||||
}
|
||||
}.always {
|
||||
Logger.debug("\(self.TAG) cleaning up promise: \(promiseId)")
|
||||
self.runPromises[promiseId] = nil
|
||||
}
|
||||
|
||||
// maintain reference to make sure it's not de-alloced prematurely.
|
||||
runPromises[promiseId] = runPromise
|
||||
}
|
||||
|
||||
// use in DEBUG or wherever you can't receive push notifications to poll for messages.
|
||||
// Do not use in production.
|
||||
func startRunLoop(timeInterval: Double) {
|
||||
Logger.error("\(TAG) Starting message fetch polling. This should not be used in production.");
|
||||
timer = Timer.scheduledTimer(timeInterval: timeInterval, target: self, selector: #selector(runAsync), userInfo: nil, repeats: true)
|
||||
}
|
||||
|
||||
func stopRunLoop() {
|
||||
timer?.invalidate()
|
||||
timer = nil
|
||||
}
|
||||
|
||||
func parseMessagesResponse(responseObject: Any?) -> (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)? {
|
||||
guard let responseObject = responseObject else {
|
||||
Logger.error("\(self.TAG) response object was surpringly nil")
|
||||
return nil
|
||||
}
|
||||
|
||||
guard let responseDict = responseObject as? [String: Any] else {
|
||||
Logger.error("\(self.TAG) response object was not a dictionary")
|
||||
return nil
|
||||
}
|
||||
|
||||
guard let messageDicts = responseDict["messages"] as? [[String: Any]] else {
|
||||
Logger.error("\(self.TAG) messages object was not a list of dictionaries")
|
||||
return nil
|
||||
}
|
||||
|
||||
let moreMessages = { () -> Bool in
|
||||
if let responseMore = responseDict["more"] as? Bool {
|
||||
return responseMore
|
||||
} else {
|
||||
Logger.warn("\(self.TAG) more object was not a bool. Assuming no more")
|
||||
return false
|
||||
}
|
||||
}()
|
||||
|
||||
let envelopes = messageDicts.map { buildEnvelope(messageDict: $0) }.filter { $0 != nil }.map { $0! }
|
||||
|
||||
return (
|
||||
envelopes: envelopes,
|
||||
more: moreMessages
|
||||
)
|
||||
}
|
||||
|
||||
func buildEnvelope(messageDict: [String: Any]) -> OWSSignalServiceProtosEnvelope? {
|
||||
let builder = OWSSignalServiceProtosEnvelopeBuilder()
|
||||
|
||||
guard let typeInt = messageDict["type"] as? Int32 else {
|
||||
Logger.error("\(TAG) message body didn't have type")
|
||||
return nil
|
||||
}
|
||||
|
||||
guard let type = OWSSignalServiceProtosEnvelopeType(rawValue:typeInt) else {
|
||||
Logger.error("\(TAG) message body type was invalid")
|
||||
return nil
|
||||
}
|
||||
builder.setType(type)
|
||||
|
||||
if let relay = messageDict["relay"] as? String {
|
||||
builder.setRelay(relay)
|
||||
}
|
||||
|
||||
guard let timestamp = messageDict["timestamp"] as? UInt64 else {
|
||||
Logger.error("\(TAG) message body didn't have timestamp")
|
||||
return nil
|
||||
}
|
||||
builder.setTimestamp(timestamp)
|
||||
|
||||
guard let source = messageDict["source"] as? String else {
|
||||
Logger.error("\(TAG) message body didn't have source")
|
||||
return nil
|
||||
}
|
||||
builder.setSource(source)
|
||||
|
||||
guard let sourceDevice = messageDict["sourceDevice"] as? UInt32 else {
|
||||
Logger.error("\(TAG) message body didn't have sourceDevice")
|
||||
return nil
|
||||
}
|
||||
builder.setSourceDevice(sourceDevice)
|
||||
|
||||
if let encodedLegacyMessage = messageDict["message"] as? String {
|
||||
Logger.debug("\(TAG) message body had legacyMessage")
|
||||
if let legacyMessage = Data(base64Encoded: encodedLegacyMessage) {
|
||||
builder.setLegacyMessage(legacyMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if let encodedContent = messageDict["content"] as? String {
|
||||
Logger.debug("\(TAG) message body had content")
|
||||
if let content = Data(base64Encoded: encodedContent) {
|
||||
builder.setContent(content)
|
||||
}
|
||||
}
|
||||
|
||||
return builder.build()
|
||||
}
|
||||
|
||||
func fetchUndeliveredMessages() -> Promise<(envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)> {
|
||||
return Promise { fulfill, reject in
|
||||
let messagesRequest = OWSGetMessagesRequest()
|
||||
|
||||
self.networkManager.makeRequest(
|
||||
messagesRequest,
|
||||
success: { (task: URLSessionDataTask?, responseObject: Any?) -> () in
|
||||
guard let (envelopes, more) = self.parseMessagesResponse(responseObject: responseObject) else {
|
||||
Logger.error("\(self.TAG) response object had unexpected content")
|
||||
return reject(OWSErrorMakeUnableToProcessServerResponseError())
|
||||
}
|
||||
|
||||
fulfill((envelopes: envelopes, more: more))
|
||||
},
|
||||
failure: { (task: URLSessionDataTask?, error: Error?) in
|
||||
guard let error = error else {
|
||||
Logger.error("\(self.TAG) error was surpringly nil. sheesh rough day.")
|
||||
return reject(OWSErrorMakeUnableToProcessServerResponseError())
|
||||
}
|
||||
|
||||
reject(error)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func acknowledgeDelivery(envelope: OWSSignalServiceProtosEnvelope) {
|
||||
let request = OWSAcknowledgeMessageDeliveryRequest(source: envelope.source, timestamp: envelope.timestamp)
|
||||
self.networkManager.makeRequest(request,
|
||||
success: { (task: URLSessionDataTask?, responseObject: Any?) -> () in
|
||||
Logger.debug("\(self.TAG) acknowledged delivery for message at timestamp: \(envelope.timestamp)")
|
||||
},
|
||||
failure: { (task: URLSessionDataTask?, error: Error?) in
|
||||
Logger.debug("\(self.TAG) acknowledging delivery for message at timestamp: \(envelope.timestamp) failed with error: \(error)")
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue