diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index bbdd9d9f8..64e2e36ce 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -45,33 +45,76 @@ public final class BackgroundPoller : NSObject { private static func getMessages(for publicKey: String) -> Promise { return SnodeAPI.getSwarm(for: publicKey).then(on: DispatchQueue.main) { swarm -> Promise in guard let snode = swarm.randomElement() else { throw SnodeAPI.Error.generic } + return attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.main) { return SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise in let messages: [SnodeReceivedMessage] = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey) - let promises = messages.compactMap { message -> Promise? in - // Use a best attempt approach here; we don't want to fail the entire process - // if one of the messages failed to parse - guard - let envelope = SNProtoEnvelope.from(message), - let data = try? envelope.serializedData() - else { return nil } - - let job = MessageReceiveJob( - data: data, - serverHash: message.info.hash, - isBackgroundPoll: true - ) - return job.execute() - } - // Now that the MessageReceiveJob's have been created we can persist the received messages - if !messages.isEmpty { - GRDBStorage.shared.write { db in - messages.forEach { try? $0.info.save(db) } + guard !messages.isEmpty else { return Promise.value(()) } + + var jobsToRun: [Job] = [] + + GRDBStorage.shared.write { db in + var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] + // TODO: Test this updated logic + messages.forEach { message in + guard let envelope = SNProtoEnvelope.from(message) else { return } + + // Extract the threadId and add that to the messageReceive job for + // multi-threading and garbage collection purposes + let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope) + + do { + threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? []) + .appending( + MessageReceiveJob.Details.MessageInfo( + data: try envelope.serializedData(), + serverHash: message.info.hash + ) + ) + + // Persist the received message after the MessageReceiveJob is created + _ = try message.info.saved(db) + } + catch { + SNLog("Failed to deserialize envelope due to error: \(error).") + } } + + threadMessages + .forEach { threadId, threadMessages in + let maybeJob: Job? = Job( + variant: .messageReceive, + behaviour: .runOnce, + threadId: threadId, + details: MessageReceiveJob.Details( + messages: threadMessages, + isBackgroundPoll: false + ) + ) + + guard let job: Job = maybeJob else { return } + + JobRunner.add(db, job: job) + jobsToRun.append(job) + } } - return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects + let promises = jobsToRun.compactMap { job -> Promise? in + let (promise, seal) = Promise.pending() + + // Note: In the background we just want jobs to fail silently + MessageReceiveJob.run( + job, + success: { _, _ in seal.fulfill(()) }, + failure: { _, _, _ in seal.fulfill(()) }, + deferred: { _ in seal.fulfill(()) } + ) + + return promise + } + + return when(fulfilled: promises) } } } diff --git a/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift b/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift index e14d89ff9..5fd482a8d 100644 --- a/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift +++ b/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift @@ -814,8 +814,12 @@ enum _003_YDBToGRDBMigration: Migration { nextRunTimestamp: 0, threadId: threadId, details: MessageReceiveJob.Details( - data: legacyJob.data, - serverHash: legacyJob.serverHash, + messages: [ + MessageReceiveJob.Details.MessageInfo( + data: legacyJob.data, + serverHash: legacyJob.serverHash + ) + ], isBackgroundPoll: legacyJob.isBackgroundPoll ) )?.inserted(db) diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index 59027e7e6..f95d476d3 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -23,33 +23,66 @@ public enum MessageReceiveJob: JobExecutor { return } - var processingError: Error? + var updatedJob: Job = job + var leastSevereError: Error? GRDBStorage.shared.write { db in - do { - let isRetry: Bool = (job.failureCount > 0) - let (message, proto) = try MessageReceiver.parse( - db, - data: details.data, - isRetry: isRetry - ) - message.serverHash = details.serverHash - - try MessageReceiver.handle( - db, - message: message, - associatedWithProto: proto, - openGroupId: nil, - isBackgroundPoll: details.isBackgroundPoll - ) - } - catch { - processingError = error + var remainingMessagesToProcess: [Details.MessageInfo] = [] + + for messageInfo in details.messages { + do { + // Note: The main reason why the 'MessageReceiver.parse' can fail but then succeed + // later on is when we get a closed group message which is signed using a new key + // but haven't received the key yet (the key gets sent directly to the user rather + // than via the closed group so this is unfortunately a possible case) + let isRetry: Bool = (job.failureCount > 0) + let (message, proto) = try MessageReceiver.parse( + db, + data: messageInfo.data, + isRetry: isRetry + ) + message.serverHash = messageInfo.serverHash + + try MessageReceiver.handle( + db, + message: message, + associatedWithProto: proto, + openGroupId: nil, + isBackgroundPoll: details.isBackgroundPoll + ) + } + catch { + // We failed to process this message so add it to the list to re-process + remainingMessagesToProcess.append(messageInfo) + + // If the current message is a permanent failure then override it with the new error (we want + // to retry if there is a single non-permanent error) + switch leastSevereError { + case let error as MessageReceiverError where !error.isRetryable: + leastSevereError = error + + default: break + } + } } + + // If any messages failed to process then we want to update the job to only include + // those failed messages + updatedJob = try job + .with( + details: Details( + messages: remainingMessagesToProcess, + isBackgroundPoll: details.isBackgroundPoll + ) + ) + .defaulting(to: job) + .saved(db) + } + } // Handle the result - switch processingError { + switch leastSevereError { case let error as MessageReceiverError where !error.isRetryable: SNLog("Message receive job permanently failed due to error: \(error)") failure(job, error, true) @@ -68,8 +101,28 @@ public enum MessageReceiveJob: JobExecutor { extension MessageReceiveJob { public struct Details: Codable { - public let data: Data - public let serverHash: String? + public struct MessageInfo: Codable { + public let data: Data + public let serverHash: String? + + public init( + data: Data, + serverHash: String? + ) { + self.data = data + self.serverHash = serverHash + } + } + + public let messages: [MessageInfo] public let isBackgroundPoll: Bool + + public init( + messages: [MessageInfo], + isBackgroundPoll: Bool + ) { + self.messages = messages + self.isBackgroundPoll = isBackgroundPoll + } } } diff --git a/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift b/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift index 37523b470..8740d8191 100644 --- a/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift +++ b/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift @@ -33,6 +33,9 @@ enum _001_InitialSetupMigration: Migration { } try db.create(table: SnodeReceivedMessageInfo.self) { t in + t.column(.id, .integer) + .notNull() + .primaryKey(autoincrement: true) t.column(.key, .text) .notNull() .indexed() @@ -41,7 +44,7 @@ enum _001_InitialSetupMigration: Migration { .notNull() .indexed() - t.primaryKey([.key, .hash]) + t.uniqueKey([.key, .hash]) } } } diff --git a/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift b/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift index 71bebacf4..444c5b03f 100644 --- a/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift +++ b/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift @@ -7,7 +7,6 @@ import SessionUtilitiesKit enum _002_YDBToGRDBMigration: Migration { static let identifier: String = "YDBToGRDBMigration" - // TODO: Autorelease pool??? static func migrate(_ db: Database) throws { // MARK: - OnionRequestPath, Snode Pool & Swarm @@ -126,20 +125,20 @@ enum _002_YDBToGRDBMigration: Migration { try autoreleasepool { try receivedMessageResults.forEach { key, hashes in try hashes.forEach { hash in - try SnodeReceivedMessageInfo( + _ = try SnodeReceivedMessageInfo( key: key, hash: hash, expirationDateMs: 0 - ).insert(db) + ).inserted(db) } } try lastMessageResults.forEach { key, data in - try SnodeReceivedMessageInfo( + _ = try SnodeReceivedMessageInfo( key: key, hash: data.hash, expirationDateMs: ((data.json["expirationDate"] as? Int64) ?? 0) - ).insert(db) + ).inserted(db) } } } diff --git a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift index 7577ec8c0..8010df5d3 100644 --- a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift +++ b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift @@ -4,19 +4,39 @@ import Foundation import GRDB import SessionUtilitiesKit -public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { +public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "snodeReceivedMessageInfo" } public typealias Columns = CodingKeys public enum CodingKeys: String, CodingKey, ColumnExpression { + case id case key case hash case expirationDateMs } + /// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into + /// the database yet this value will be `nil` + public var id: Int64? = nil + + /// The key this message hash is associated to + /// + /// This will be a combination of {address}.{port}.{publicKey} for new rows and just the {publicKey} for legacy rows public let key: String + + /// The is the hash for the received message public let hash: String + + /// This is the timestamp (in milliseconds since epoch) when the message hash should expire + /// + /// **Note:** A value of `0` means this hash should not expire public let expirationDateMs: Int64 + + // MARK: - Custom Database Interaction + + public mutating func didInsert(with rowID: Int64, for column: String?) { + self.id = rowID + } } // MARK: - Convenience @@ -51,12 +71,17 @@ public extension SnodeReceivedMessageInfo { } } + /// This method fetches the last non-expired hash from the database for message retrieval + /// + /// **Note:** This method uses a `write` instead of a read because there is a single write queue for the database and it's very common for + /// this method to be called after the hash value has been updated but before the various `read` threads have been updated, resulting in a + /// pointless fetch for data the app has already received static func fetchLastNotExpired(for snode: Snode, associatedWith publicKey: String) -> SnodeReceivedMessageInfo? { - return GRDBStorage.shared.read { db in - try? SnodeReceivedMessageInfo + return GRDBStorage.shared.write { db in + try SnodeReceivedMessageInfo .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey)) - .order(SnodeReceivedMessageInfo.Columns.expirationDateMs) - .reversed() + .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000)) + .order(SnodeReceivedMessageInfo.Columns.id.desc) .fetchOne(db) } }