|
|
|
@ -8,15 +8,14 @@ import PromiseKit
|
|
|
|
|
@objc(OWSMessageFetcherJob)
|
|
|
|
|
class MessageFetcherJob: NSObject {
|
|
|
|
|
|
|
|
|
|
let TAG = "[MessageFetcherJob]"
|
|
|
|
|
var timer: Timer?
|
|
|
|
|
private let TAG = "[MessageFetcherJob]"
|
|
|
|
|
|
|
|
|
|
// MARK: injected dependencies
|
|
|
|
|
let networkManager: TSNetworkManager
|
|
|
|
|
let messageReceiver: OWSMessageReceiver
|
|
|
|
|
let signalService: OWSSignalService
|
|
|
|
|
private var timer: Timer?
|
|
|
|
|
|
|
|
|
|
var runPromises = [Double: Promise<Void>]()
|
|
|
|
|
// MARK: injected dependencies
|
|
|
|
|
private let networkManager: TSNetworkManager
|
|
|
|
|
private let messageReceiver: OWSMessageReceiver
|
|
|
|
|
private let signalService: OWSSignalService
|
|
|
|
|
|
|
|
|
|
init(messageReceiver: OWSMessageReceiver, networkManager: TSNetworkManager, signalService: OWSSignalService) {
|
|
|
|
|
self.messageReceiver = messageReceiver
|
|
|
|
@ -24,53 +23,58 @@ class MessageFetcherJob: NSObject {
|
|
|
|
|
self.signalService = signalService
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func runAsync() {
|
|
|
|
|
Logger.debug("\(TAG) \(#function)")
|
|
|
|
|
guard signalService.isCensorshipCircumventionActive else {
|
|
|
|
|
public func run() -> Promise<Void> {
|
|
|
|
|
Logger.debug("\(TAG) in \(#function)")
|
|
|
|
|
|
|
|
|
|
guard signalService.isCensorshipCircumventionActive else {
|
|
|
|
|
Logger.debug("\(self.TAG) delegating message fetching to SocketManager since we're using normal transport.")
|
|
|
|
|
TSSocketManager.requestSocketOpen()
|
|
|
|
|
return
|
|
|
|
|
return Promise(value: ())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Logger.info("\(TAG) using fallback message fetching.")
|
|
|
|
|
Logger.info("\(TAG) fetching messages via REST.")
|
|
|
|
|
|
|
|
|
|
let promiseId = NSDate().timeIntervalSince1970
|
|
|
|
|
Logger.debug("\(self.TAG) starting promise: \(promiseId)")
|
|
|
|
|
let runPromise = self.fetchUndeliveredMessages().then { (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool) -> Void in
|
|
|
|
|
let promise = self.fetchUndeliveredMessages().then { (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool) -> Promise<Void> in
|
|
|
|
|
for envelope in envelopes {
|
|
|
|
|
Logger.info("\(self.TAG) received envelope.")
|
|
|
|
|
self.messageReceiver.handleReceivedEnvelope(envelope)
|
|
|
|
|
self.acknowledgeDelivery(envelope: envelope)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if more {
|
|
|
|
|
Logger.info("\(self.TAG) more messages, so recursing.")
|
|
|
|
|
// recurse
|
|
|
|
|
self.runAsync()
|
|
|
|
|
Logger.info("\(self.TAG) fetching more messages.")
|
|
|
|
|
return self.run()
|
|
|
|
|
} else {
|
|
|
|
|
// All finished
|
|
|
|
|
return Promise(value: ())
|
|
|
|
|
}
|
|
|
|
|
}.always {
|
|
|
|
|
Logger.debug("\(self.TAG) cleaning up promise: \(promiseId)")
|
|
|
|
|
self.runPromises[promiseId] = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// maintain reference to make sure it's not de-alloced prematurely.
|
|
|
|
|
runPromises[promiseId] = runPromise
|
|
|
|
|
promise.retainUntilComplete()
|
|
|
|
|
|
|
|
|
|
return promise
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@objc func run() -> AnyPromise {
|
|
|
|
|
return AnyPromise(run())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// use in DEBUG or wherever you can't receive push notifications to poll for messages.
|
|
|
|
|
// Do not use in production.
|
|
|
|
|
func startRunLoop(timeInterval: Double) {
|
|
|
|
|
public func startRunLoop(timeInterval: Double) {
|
|
|
|
|
Logger.error("\(TAG) Starting message fetch polling. This should not be used in production.")
|
|
|
|
|
timer = WeakTimer.scheduledTimer(timeInterval: timeInterval, target: self, userInfo: nil, repeats: true) {[weak self] _ in
|
|
|
|
|
self?.runAsync()
|
|
|
|
|
let _: Promise<Void>? = self?.run()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func stopRunLoop() {
|
|
|
|
|
public func stopRunLoop() {
|
|
|
|
|
timer?.invalidate()
|
|
|
|
|
timer = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func parseMessagesResponse(responseObject: Any?) -> (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)? {
|
|
|
|
|
private func parseMessagesResponse(responseObject: Any?) -> (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)? {
|
|
|
|
|
guard let responseObject = responseObject else {
|
|
|
|
|
Logger.error("\(self.TAG) response object was surpringly nil")
|
|
|
|
|
return nil
|
|
|
|
@ -103,7 +107,7 @@ class MessageFetcherJob: NSObject {
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func buildEnvelope(messageDict: [String: Any]) -> OWSSignalServiceProtosEnvelope? {
|
|
|
|
|
private func buildEnvelope(messageDict: [String: Any]) -> OWSSignalServiceProtosEnvelope? {
|
|
|
|
|
let builder = OWSSignalServiceProtosEnvelopeBuilder()
|
|
|
|
|
|
|
|
|
|
guard let typeInt = messageDict["type"] as? Int32 else {
|
|
|
|
@ -156,7 +160,7 @@ class MessageFetcherJob: NSObject {
|
|
|
|
|
return builder.build()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func fetchUndeliveredMessages() -> Promise<(envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)> {
|
|
|
|
|
private func fetchUndeliveredMessages() -> Promise<(envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)> {
|
|
|
|
|
return Promise { fulfill, reject in
|
|
|
|
|
let messagesRequest = OWSGetMessagesRequest()
|
|
|
|
|
|
|
|
|
@ -181,7 +185,7 @@ class MessageFetcherJob: NSObject {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func acknowledgeDelivery(envelope: OWSSignalServiceProtosEnvelope) {
|
|
|
|
|
private func acknowledgeDelivery(envelope: OWSSignalServiceProtosEnvelope) {
|
|
|
|
|
let request = OWSAcknowledgeMessageDeliveryRequest(source: envelope.source, timestamp: envelope.timestamp)
|
|
|
|
|
self.networkManager.makeRequest(request,
|
|
|
|
|
success: { (_: URLSessionDataTask?, _: Any?) -> Void in
|
|
|
|
|