From 94742c80ec1285033a9722ba4363a3fca31e4485 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Tue, 26 Apr 2022 17:31:50 +1000 Subject: [PATCH] Further work on the JobRunner Fixed an issue where the hash retrieved when fetching messages from the service node might not be the latest one Updated the MessageReceiveJob to batch process messages (on failure only the failed messages will retry) --- Session/Utilities/BackgroundPoller.swift | 83 ++++++++++++---- .../Migrations/_003_YDBToGRDBMigration.swift | 8 +- .../Jobs/Types/MessageReceiveJob.swift | 99 ++++++++++++++----- .../_001_InitialSetupMigration.swift | 5 +- .../Migrations/_002_YDBToGRDBMigration.swift | 9 +- .../Models/SnodeReceivedMessageInfo.swift | 35 ++++++- 6 files changed, 183 insertions(+), 56 deletions(-) diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index bbdd9d9f8..64e2e36ce 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -45,33 +45,76 @@ public final class BackgroundPoller : NSObject { private static func getMessages(for publicKey: String) -> Promise { return SnodeAPI.getSwarm(for: publicKey).then(on: DispatchQueue.main) { swarm -> Promise in guard let snode = swarm.randomElement() else { throw SnodeAPI.Error.generic } + return attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.main) { return SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise in let messages: [SnodeReceivedMessage] = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey) - let promises = messages.compactMap { message -> Promise? in - // Use a best attempt approach here; we don't want to fail the entire process - // if one of the messages failed to parse - guard - let envelope = SNProtoEnvelope.from(message), - let data = try? envelope.serializedData() - else { return nil } - - let job = MessageReceiveJob( - data: data, - serverHash: message.info.hash, - isBackgroundPoll: true - ) - return job.execute() - } - // Now that the MessageReceiveJob's have been created we can persist the received messages - if !messages.isEmpty { - GRDBStorage.shared.write { db in - messages.forEach { try? $0.info.save(db) } + guard !messages.isEmpty else { return Promise.value(()) } + + var jobsToRun: [Job] = [] + + GRDBStorage.shared.write { db in + var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] + // TODO: Test this updated logic + messages.forEach { message in + guard let envelope = SNProtoEnvelope.from(message) else { return } + + // Extract the threadId and add that to the messageReceive job for + // multi-threading and garbage collection purposes + let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope) + + do { + threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? []) + .appending( + MessageReceiveJob.Details.MessageInfo( + data: try envelope.serializedData(), + serverHash: message.info.hash + ) + ) + + // Persist the received message after the MessageReceiveJob is created + _ = try message.info.saved(db) + } + catch { + SNLog("Failed to deserialize envelope due to error: \(error).") + } } + + threadMessages + .forEach { threadId, threadMessages in + let maybeJob: Job? = Job( + variant: .messageReceive, + behaviour: .runOnce, + threadId: threadId, + details: MessageReceiveJob.Details( + messages: threadMessages, + isBackgroundPoll: false + ) + ) + + guard let job: Job = maybeJob else { return } + + JobRunner.add(db, job: job) + jobsToRun.append(job) + } } - return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects + let promises = jobsToRun.compactMap { job -> Promise? in + let (promise, seal) = Promise.pending() + + // Note: In the background we just want jobs to fail silently + MessageReceiveJob.run( + job, + success: { _, _ in seal.fulfill(()) }, + failure: { _, _, _ in seal.fulfill(()) }, + deferred: { _ in seal.fulfill(()) } + ) + + return promise + } + + return when(fulfilled: promises) } } } diff --git a/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift b/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift index e14d89ff9..5fd482a8d 100644 --- a/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift +++ b/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift @@ -814,8 +814,12 @@ enum _003_YDBToGRDBMigration: Migration { nextRunTimestamp: 0, threadId: threadId, details: MessageReceiveJob.Details( - data: legacyJob.data, - serverHash: legacyJob.serverHash, + messages: [ + MessageReceiveJob.Details.MessageInfo( + data: legacyJob.data, + serverHash: legacyJob.serverHash + ) + ], isBackgroundPoll: legacyJob.isBackgroundPoll ) )?.inserted(db) diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index 59027e7e6..f95d476d3 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -23,33 +23,66 @@ public enum MessageReceiveJob: JobExecutor { return } - var processingError: Error? + var updatedJob: Job = job + var leastSevereError: Error? GRDBStorage.shared.write { db in - do { - let isRetry: Bool = (job.failureCount > 0) - let (message, proto) = try MessageReceiver.parse( - db, - data: details.data, - isRetry: isRetry - ) - message.serverHash = details.serverHash - - try MessageReceiver.handle( - db, - message: message, - associatedWithProto: proto, - openGroupId: nil, - isBackgroundPoll: details.isBackgroundPoll - ) - } - catch { - processingError = error + var remainingMessagesToProcess: [Details.MessageInfo] = [] + + for messageInfo in details.messages { + do { + // Note: The main reason why the 'MessageReceiver.parse' can fail but then succeed + // later on is when we get a closed group message which is signed using a new key + // but haven't received the key yet (the key gets sent directly to the user rather + // than via the closed group so this is unfortunately a possible case) + let isRetry: Bool = (job.failureCount > 0) + let (message, proto) = try MessageReceiver.parse( + db, + data: messageInfo.data, + isRetry: isRetry + ) + message.serverHash = messageInfo.serverHash + + try MessageReceiver.handle( + db, + message: message, + associatedWithProto: proto, + openGroupId: nil, + isBackgroundPoll: details.isBackgroundPoll + ) + } + catch { + // We failed to process this message so add it to the list to re-process + remainingMessagesToProcess.append(messageInfo) + + // If the current message is a permanent failure then override it with the new error (we want + // to retry if there is a single non-permanent error) + switch leastSevereError { + case let error as MessageReceiverError where !error.isRetryable: + leastSevereError = error + + default: break + } + } } + + // If any messages failed to process then we want to update the job to only include + // those failed messages + updatedJob = try job + .with( + details: Details( + messages: remainingMessagesToProcess, + isBackgroundPoll: details.isBackgroundPoll + ) + ) + .defaulting(to: job) + .saved(db) + } + } // Handle the result - switch processingError { + switch leastSevereError { case let error as MessageReceiverError where !error.isRetryable: SNLog("Message receive job permanently failed due to error: \(error)") failure(job, error, true) @@ -68,8 +101,28 @@ public enum MessageReceiveJob: JobExecutor { extension MessageReceiveJob { public struct Details: Codable { - public let data: Data - public let serverHash: String? + public struct MessageInfo: Codable { + public let data: Data + public let serverHash: String? + + public init( + data: Data, + serverHash: String? + ) { + self.data = data + self.serverHash = serverHash + } + } + + public let messages: [MessageInfo] public let isBackgroundPoll: Bool + + public init( + messages: [MessageInfo], + isBackgroundPoll: Bool + ) { + self.messages = messages + self.isBackgroundPoll = isBackgroundPoll + } } } diff --git a/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift b/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift index 37523b470..8740d8191 100644 --- a/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift +++ b/SessionSnodeKit/Database/Migrations/_001_InitialSetupMigration.swift @@ -33,6 +33,9 @@ enum _001_InitialSetupMigration: Migration { } try db.create(table: SnodeReceivedMessageInfo.self) { t in + t.column(.id, .integer) + .notNull() + .primaryKey(autoincrement: true) t.column(.key, .text) .notNull() .indexed() @@ -41,7 +44,7 @@ enum _001_InitialSetupMigration: Migration { .notNull() .indexed() - t.primaryKey([.key, .hash]) + t.uniqueKey([.key, .hash]) } } } diff --git a/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift b/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift index 71bebacf4..444c5b03f 100644 --- a/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift +++ b/SessionSnodeKit/Database/Migrations/_002_YDBToGRDBMigration.swift @@ -7,7 +7,6 @@ import SessionUtilitiesKit enum _002_YDBToGRDBMigration: Migration { static let identifier: String = "YDBToGRDBMigration" - // TODO: Autorelease pool??? static func migrate(_ db: Database) throws { // MARK: - OnionRequestPath, Snode Pool & Swarm @@ -126,20 +125,20 @@ enum _002_YDBToGRDBMigration: Migration { try autoreleasepool { try receivedMessageResults.forEach { key, hashes in try hashes.forEach { hash in - try SnodeReceivedMessageInfo( + _ = try SnodeReceivedMessageInfo( key: key, hash: hash, expirationDateMs: 0 - ).insert(db) + ).inserted(db) } } try lastMessageResults.forEach { key, data in - try SnodeReceivedMessageInfo( + _ = try SnodeReceivedMessageInfo( key: key, hash: data.hash, expirationDateMs: ((data.json["expirationDate"] as? Int64) ?? 0) - ).insert(db) + ).inserted(db) } } } diff --git a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift index 7577ec8c0..8010df5d3 100644 --- a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift +++ b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift @@ -4,19 +4,39 @@ import Foundation import GRDB import SessionUtilitiesKit -public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { +public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "snodeReceivedMessageInfo" } public typealias Columns = CodingKeys public enum CodingKeys: String, CodingKey, ColumnExpression { + case id case key case hash case expirationDateMs } + /// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into + /// the database yet this value will be `nil` + public var id: Int64? = nil + + /// The key this message hash is associated to + /// + /// This will be a combination of {address}.{port}.{publicKey} for new rows and just the {publicKey} for legacy rows public let key: String + + /// The is the hash for the received message public let hash: String + + /// This is the timestamp (in milliseconds since epoch) when the message hash should expire + /// + /// **Note:** A value of `0` means this hash should not expire public let expirationDateMs: Int64 + + // MARK: - Custom Database Interaction + + public mutating func didInsert(with rowID: Int64, for column: String?) { + self.id = rowID + } } // MARK: - Convenience @@ -51,12 +71,17 @@ public extension SnodeReceivedMessageInfo { } } + /// This method fetches the last non-expired hash from the database for message retrieval + /// + /// **Note:** This method uses a `write` instead of a read because there is a single write queue for the database and it's very common for + /// this method to be called after the hash value has been updated but before the various `read` threads have been updated, resulting in a + /// pointless fetch for data the app has already received static func fetchLastNotExpired(for snode: Snode, associatedWith publicKey: String) -> SnodeReceivedMessageInfo? { - return GRDBStorage.shared.read { db in - try? SnodeReceivedMessageInfo + return GRDBStorage.shared.write { db in + try SnodeReceivedMessageInfo .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey)) - .order(SnodeReceivedMessageInfo.Columns.expirationDateMs) - .reversed() + .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000)) + .order(SnodeReceivedMessageInfo.Columns.id.desc) .fetchOne(db) } }