From 438bbccdfa7ca35da1c8f0ddc160a8c12bf97328 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Mon, 7 Dec 2020 11:21:24 +1100 Subject: [PATCH] Fix background polling --- Session/Utilities/BackgroundPoller.swift | 40 ++++++++++++++++--- .../Jobs/MessageReceiveJob.swift | 20 ++++++++-- .../MessageReceiver+Handling.swift | 8 ++-- .../Pollers/ClosedGroupPoller.swift | 2 +- .../Pollers/OpenGroupPoller.swift | 25 ++++++++++-- .../Sending & Receiving/Pollers/Poller.swift | 2 +- .../NotificationServiceExtension.swift | 2 +- 7 files changed, 80 insertions(+), 19 deletions(-) diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index 9ec7c4761..31fcabd66 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -1,23 +1,23 @@ import PromiseKit +import SessionSnodeKit @objc(LKBackgroundPoller) public final class BackgroundPoller : NSObject { private static var closedGroupPoller: ClosedGroupPoller! + private static var promises: [Promise] = [] private override init() { } @objc(pollWithCompletionHandler:) public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) { - var promises: [Promise] = [] - // TODO TODO TODO -// promises.append(AppEnvironment.shared.messageFetcherJob.run()) // FIXME: It'd be nicer to just use Poller directly - closedGroupPoller = ClosedGroupPoller() - promises.append(contentsOf: closedGroupPoller.pollOnce()) + promises = [] + promises.append(pollForMessages()) + promises.append(contentsOf: pollForClosedGroupMessages()) let openGroups: [String:OpenGroup] = Storage.shared.getAllUserOpenGroups() openGroups.values.forEach { openGroup in let poller = OpenGroupPoller(for: openGroup) poller.stop() - promises.append(poller.pollForNewMessages()) + promises.append(poller.pollForNewMessages(isBackgroundPoll: true)) } when(resolved: promises).done { _ in completionHandler(.newData) @@ -25,4 +25,32 @@ public final class BackgroundPoller : NSObject { completionHandler(.failed) } } + + private static func pollForMessages() -> Promise { + let userPublicKey = getUserHexEncodedPublicKey() + return getMessages(for: userPublicKey) + } + + private static func pollForClosedGroupMessages() -> [Promise] { + let publicKeys = Storage.shared.getUserClosedGroupPublicKeys() + return publicKeys.map { getMessages(for: $0) } + } + + private static func getMessages(for publicKey: String) -> Promise { + return SnodeAPI.getSwarm(for: publicKey).then2 { swarm -> Promise in + guard let snode = swarm.randomElement() else { throw SnodeAPI.Error.generic } + return SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise in + let messages = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey) + let promises = messages.compactMap { json -> Promise? in + // Use a best attempt approach here; we don't want to fail the entire process if one of the + // messages failed to parse. + guard let envelope = SNProtoEnvelope.from(json), + let data = try? envelope.serializedData() else { return nil } + let job = MessageReceiveJob(data: data, isBackgroundPoll: true) + return job.execute() + } + return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects + } + } + } } diff --git a/SessionMessagingKit/Jobs/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/MessageReceiveJob.swift index 74192a582..b8895d12f 100644 --- a/SessionMessagingKit/Jobs/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/MessageReceiveJob.swift @@ -1,9 +1,11 @@ import SessionUtilitiesKit +import PromiseKit public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility public let data: Data public let openGroupMessageServerID: UInt64? public let openGroupID: String? + public let isBackgroundPoll: Bool public var delegate: JobDelegate? public var id: String? public var failureCount: UInt = 0 @@ -13,10 +15,11 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC public static let maxFailureCount: UInt = 10 // MARK: Initialization - public init(data: Data, openGroupMessageServerID: UInt64? = nil, openGroupID: String? = nil) { + public init(data: Data, openGroupMessageServerID: UInt64? = nil, openGroupID: String? = nil, isBackgroundPoll: Bool) { self.data = data self.openGroupMessageServerID = openGroupMessageServerID self.openGroupID = openGroupID + self.isBackgroundPoll = isBackgroundPoll #if DEBUG if openGroupMessageServerID != nil { assert(openGroupID != nil) } if openGroupID != nil { assert(openGroupMessageServerID != nil) } @@ -26,10 +29,12 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC // MARK: Coding public init?(coder: NSCoder) { guard let data = coder.decodeObject(forKey: "data") as! Data?, - let id = coder.decodeObject(forKey: "id") as! String? else { return nil } + let id = coder.decodeObject(forKey: "id") as! String?, + let isBackgroundPoll = coder.decodeObject(forKey: "isBackgroundPoll") as! Bool? else { return nil } self.data = data self.openGroupMessageServerID = coder.decodeObject(forKey: "openGroupMessageServerID") as! UInt64? self.openGroupID = coder.decodeObject(forKey: "openGroupID") as! String? + self.isBackgroundPoll = isBackgroundPoll self.id = id self.failureCount = coder.decodeObject(forKey: "failureCount") as! UInt? ?? 0 } @@ -38,17 +43,24 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC coder.encode(data, forKey: "data") coder.encode(openGroupMessageServerID, forKey: "openGroupMessageServerID") coder.encode(openGroupID, forKey: "openGroupID") + coder.encode(isBackgroundPoll, forKey: "isBackgroundPoll") coder.encode(id, forKey: "id") coder.encode(failureCount, forKey: "failureCount") } // MARK: Running public func execute() { + let _: Promise = execute() + } + + public func execute() -> Promise { + let (promise, seal) = Promise.pending() SNMessagingKitConfiguration.shared.storage.withAsync({ transaction in // Intentionally capture self do { let (message, proto) = try MessageReceiver.parse(self.data, openGroupMessageServerID: self.openGroupMessageServerID, using: transaction) - try MessageReceiver.handle(message, associatedWithProto: proto, openGroupID: self.openGroupID, using: transaction) + try MessageReceiver.handle(message, associatedWithProto: proto, openGroupID: self.openGroupID, isBackgroundPoll: self.isBackgroundPoll, using: transaction) self.handleSuccess() + seal.fulfill(()) } catch { SNLog("Couldn't receive message due to error: \(error).") if let error = error as? MessageReceiver.Error, !error.isRetryable { @@ -56,8 +68,10 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC } else { self.handleFailure(error: error) } + seal.fulfill(()) // The promise is just used to keep track of when we're done } }, completion: { }) + return promise } private func handleSuccess() { diff --git a/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift b/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift index 31e093258..e72f002fc 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift @@ -7,13 +7,13 @@ extension MessageReceiver { return SSKEnvironment.shared.blockingManager.isRecipientIdBlocked(publicKey) } - public static func handle(_ message: Message, associatedWithProto proto: SNProtoContent, openGroupID: String?, using transaction: Any) throws { + public static func handle(_ message: Message, associatedWithProto proto: SNProtoContent, openGroupID: String?, isBackgroundPoll: Bool, using transaction: Any) throws { switch message { case let message as ReadReceipt: handleReadReceipt(message, using: transaction) case let message as TypingIndicator: handleTypingIndicator(message, using: transaction) case let message as ClosedGroupUpdate: handleClosedGroupUpdate(message, using: transaction) case let message as ExpirationTimerUpdate: handleExpirationTimerUpdate(message, using: transaction) - case let message as VisibleMessage: try handleVisibleMessage(message, associatedWithProto: proto, openGroupID: openGroupID, using: transaction) + case let message as VisibleMessage: try handleVisibleMessage(message, associatedWithProto: proto, openGroupID: openGroupID, isBackgroundPoll: isBackgroundPoll, using: transaction) default: fatalError() } } @@ -136,7 +136,7 @@ extension MessageReceiver { } @discardableResult - public static func handleVisibleMessage(_ message: VisibleMessage, associatedWithProto proto: SNProtoContent, openGroupID: String?, using transaction: Any) throws -> String { + public static func handleVisibleMessage(_ message: VisibleMessage, associatedWithProto proto: SNProtoContent, openGroupID: String?, isBackgroundPoll: Bool, using transaction: Any) throws -> String { let storage = SNMessagingKitConfiguration.shared.storage let transaction = transaction as! YapDatabaseReadWriteTransaction var isMainAppAndActive = false @@ -206,7 +206,7 @@ extension MessageReceiver { cancelTypingIndicatorsIfNeeded(for: message.sender!) } // Notify the user if needed - guard isMainAppAndActive, let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction), + guard (isMainAppAndActive || isBackgroundPoll), let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction), let thread = TSThread.fetch(uniqueId: threadID, transaction: transaction) else { return tsIncomingMessageID } SSKEnvironment.shared.notificationsManager!.notifyUser(for: tsIncomingMessage, in: thread, transaction: transaction) return tsIncomingMessageID diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index ca2ae693e..4b339dd7c 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -67,7 +67,7 @@ public final class ClosedGroupPoller : NSObject { guard let envelope = SNProtoEnvelope.from(json) else { return } do { let data = try envelope.serializedData() - let job = MessageReceiveJob(data: data) + let job = MessageReceiveJob(data: data, isBackgroundPoll: false) Storage.write { transaction in SessionMessagingKit.JobQueue.shared.add(job, using: transaction) } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift index 32f382e90..995c1cc9c 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift @@ -51,11 +51,18 @@ public final class OpenGroupPoller : NSObject { @discardableResult public func pollForNewMessages() -> Promise { + return pollForNewMessages(isBackgroundPoll: false) + } + + @discardableResult + public func pollForNewMessages(isBackgroundPoll: Bool) -> Promise { guard !self.isPolling else { return Promise.value(()) } self.isPolling = true let openGroup = self.openGroup let userPublicKey = getUserHexEncodedPublicKey() - return OpenGroupAPI.getMessages(for: openGroup.channel, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { messages in + let (promise, seal) = Promise.pending() + promise.retainUntilComplete() + OpenGroupAPI.getMessages(for: openGroup.channel, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { messages in self.isPolling = false // Sorting the messages by timestamp before importing them fixes an issue where messages that quote older messages can't find those older messages messages.sorted { $0.serverTimestamp < $1.serverTimestamp }.forEach { message in @@ -153,11 +160,23 @@ public final class OpenGroupPoller : NSObject { Storage.write { transaction in Storage.shared.setOpenGroupDisplayName(to: senderDisplayName, for: senderPublicKey, inOpenGroupWithID: openGroup.id, using: transaction) let messageServerID = message.serverID - let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: messageServerID, openGroupID: openGroup.id) - SessionMessagingKit.JobQueue.shared.add(job, using: transaction) + let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: messageServerID, openGroupID: openGroup.id, isBackgroundPoll: isBackgroundPoll) + if isBackgroundPoll { + job.execute().done(on: DispatchQueue.global(qos: .userInitiated)) { + seal.fulfill(()) + }.catch(on: DispatchQueue.global(qos: .userInitiated)) { _ in + seal.fulfill(()) // The promise is just used to keep track of when we're done + } + } else { + SessionMessagingKit.JobQueue.shared.add(job, using: transaction) + seal.fulfill(()) + } } } + }.catch(on: DispatchQueue.global(qos: .userInitiated)) { _ in + seal.fulfill(()) // The promise is just used to keep track of when we're done } + return promise } private func pollForDeletedMessages() { diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 082e3a7fc..f182c744e 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -97,7 +97,7 @@ public final class Poller : NSObject { guard let envelope = SNProtoEnvelope.from(json) else { return } do { let data = try envelope.serializedData() - let job = MessageReceiveJob(data: data) + let job = MessageReceiveJob(data: data, isBackgroundPoll: false) Storage.write { transaction in SessionMessagingKit.JobQueue.shared.add(job, using: transaction) } diff --git a/SessionNotificationServiceExtension/NotificationServiceExtension.swift b/SessionNotificationServiceExtension/NotificationServiceExtension.swift index 225957db8..4306575e9 100644 --- a/SessionNotificationServiceExtension/NotificationServiceExtension.swift +++ b/SessionNotificationServiceExtension/NotificationServiceExtension.swift @@ -43,7 +43,7 @@ public final class NotificationServiceExtension : UNNotificationServiceExtension var userInfo: [String:Any] = [ NotificationServiceExtension.isFromRemoteKey : true ] switch message { case let visibleMessage as VisibleMessage: - let tsIncomingMessageID = try MessageReceiver.handleVisibleMessage(visibleMessage, associatedWithProto: proto, openGroupID: nil, using: transaction) + let tsIncomingMessageID = try MessageReceiver.handleVisibleMessage(visibleMessage, associatedWithProto: proto, openGroupID: nil, isBackgroundPoll: false, using: transaction) guard let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction) else { return self.handleFailure(for: notificationContent) }