From e29758e401e3e7bc644963905435f98ae204a00b Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Thu, 13 Mar 2025 16:45:45 +1100 Subject: [PATCH 1/5] Updated GRDB and refactored internal Storage operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Updated to GRDB 7.3.0 (from 6.29.3) • Updated the ConfigMessageReceiveJob and MessageReceiveJob to use the `writeAsync` function (instead of the blocking `write` function) so that they aren't subject to the `Storage.transactionDeadlockTimeoutSeconds` • Refactored the `Storage.performOperation` and `Storage.performPublisherOperation` to rely on the new cancellable async/await `Task` logic that GRDB 7 supports (as apparently the other async methods don't support cancellation...) • Cleaned up some "Sendable" related warnings • Minor tweaks to `Log.assertOnMainThread` to make it a little more readable --- Session.xcodeproj/project.pbxproj | 2 +- .../xcshareddata/swiftpm/Package.resolved | 4 +- Session/Conversations/ConversationVC.swift | 15 +- Session/Home/HomeVC.swift | 5 +- .../DocumentTitleViewController.swift | 6 +- .../MediaTileViewController.swift | 6 +- .../Shared/Types/PagedObservationSource.swift | 5 +- .../Jobs/ConfigMessageReceiveJob.swift | 31 +- .../Jobs/MessageReceiveJob.swift | 119 +++---- SessionUtilitiesKit/Database/Storage.swift | 300 ++++++++---------- .../Database/StorageError.swift | 5 + SessionUtilitiesKit/General/Logging.swift | 29 +- 12 files changed, 278 insertions(+), 249 deletions(-) diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index d36be785d..ec830e84f 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -10262,7 +10262,7 @@ repositoryURL = "https://github.com/session-foundation/session-grdb-swift.git"; requirement = { kind = upToNextMajorVersion; - minimumVersion = 106.29.3; + minimumVersion = 107.3.0; }; }; FD756BEE2D06686500BD7199 /* XCRemoteSwiftPackageReference "session-lucide" */ = { diff --git a/Session.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Session.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index e3a76718a..3d5e300d3 100644 --- a/Session.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Session.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -96,8 +96,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/session-foundation/session-grdb-swift.git", "state" : { - "revision" : "b3643613f1e0f392fa41072ee499da93b4c06b67", - "version" : "106.29.3" + "revision" : "c69f8bf8a7ede8727c20f7c36eeffd3f55598487", + "version" : "107.3.0" } }, { diff --git a/Session/Conversations/ConversationVC.swift b/Session/Conversations/ConversationVC.swift index 829b67f6b..2f303b22f 100644 --- a/Session/Conversations/ConversationVC.swift +++ b/Session/Conversations/ConversationVC.swift @@ -422,7 +422,10 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa using: dependencies ) - dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver) + /// Dispatch adding the database observation to a background thread + DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in + dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver) + } super.init(nibName: nil, bundle: nil) } @@ -704,14 +707,18 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa // Stop observing changes self?.stopObservingChanges() - dependencies[singleton: .storage].removeObserver(self?.viewModel.pagedDataObserver) + DispatchQueue.global(qos: .userInitiated).async { + dependencies[singleton: .storage].removeObserver(self?.viewModel.pagedDataObserver) + } // Swap the observing to the updated thread let newestVisibleMessageId: Int64? = self?.fullyVisibleCellViewModels()?.last?.id self?.viewModel.swapToThread(updatedThreadId: unblindedId, focussedMessageId: newestVisibleMessageId) - // Start observing changes again - dependencies[singleton: .storage].addObserver(self?.viewModel.pagedDataObserver) + /// Start observing changes again (on a background thread) + DispatchQueue.global(qos: .userInitiated).async { + dependencies[singleton: .storage].addObserver(self?.viewModel.pagedDataObserver) + } self?.startObservingChanges() return } diff --git a/Session/Home/HomeVC.swift b/Session/Home/HomeVC.swift index 5419046d5..f6646f519 100644 --- a/Session/Home/HomeVC.swift +++ b/Session/Home/HomeVC.swift @@ -31,7 +31,10 @@ public final class HomeVC: BaseVC, LibSessionRespondingViewController, UITableVi init(using dependencies: Dependencies) { self.viewModel = HomeViewModel(using: dependencies) - dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver) + /// Dispatch adding the database observation to a background thread + DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in + dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver) + } super.init(nibName: nil, bundle: nil) } diff --git a/Session/Media Viewing & Editing/DocumentTitleViewController.swift b/Session/Media Viewing & Editing/DocumentTitleViewController.swift index 9faa3d9df..450f26619 100644 --- a/Session/Media Viewing & Editing/DocumentTitleViewController.swift +++ b/Session/Media Viewing & Editing/DocumentTitleViewController.swift @@ -34,7 +34,11 @@ public class DocumentTileViewController: UIViewController, UITableViewDelegate, init(viewModel: MediaGalleryViewModel, using dependencies: Dependencies) { self.dependencies = dependencies self.viewModel = viewModel - dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver) + + /// Dispatch adding the database observation to a background thread + DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in + dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver) + } super.init(nibName: nil, bundle: nil) } diff --git a/Session/Media Viewing & Editing/MediaTileViewController.swift b/Session/Media Viewing & Editing/MediaTileViewController.swift index 384ae688d..497356645 100644 --- a/Session/Media Viewing & Editing/MediaTileViewController.swift +++ b/Session/Media Viewing & Editing/MediaTileViewController.swift @@ -44,7 +44,11 @@ public class MediaTileViewController: UIViewController, UICollectionViewDataSour init(viewModel: MediaGalleryViewModel, using dependencies: Dependencies) { self.dependencies = dependencies self.viewModel = viewModel - dependencies[singleton: .storage].addObserver(viewModel.pagedDataObserver) + + /// Dispatch adding the database observation to a background thread + DispatchQueue.global(qos: .userInitiated).async { [weak viewModel] in + dependencies[singleton: .storage].addObserver(viewModel?.pagedDataObserver) + } super.init(nibName: nil, bundle: nil) } diff --git a/Session/Shared/Types/PagedObservationSource.swift b/Session/Shared/Types/PagedObservationSource.swift index 33c0da78e..9337cc2c8 100644 --- a/Session/Shared/Types/PagedObservationSource.swift +++ b/Session/Shared/Types/PagedObservationSource.swift @@ -17,7 +17,10 @@ protocol PagedObservationSource { extension PagedObservationSource { public func didInit(using dependencies: Dependencies) { - dependencies[singleton: .storage].addObserver(pagedDataObserver) + /// Dispatch adding the database observation to a background thread + DispatchQueue.global(qos: .userInitiated).async { [weak pagedDataObserver] in + dependencies[singleton: .storage].addObserver(pagedDataObserver) + } } } diff --git a/SessionMessagingKit/Jobs/ConfigMessageReceiveJob.swift b/SessionMessagingKit/Jobs/ConfigMessageReceiveJob.swift index ce94a9409..db74dfa73 100644 --- a/SessionMessagingKit/Jobs/ConfigMessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/ConfigMessageReceiveJob.swift @@ -55,11 +55,8 @@ public enum ConfigMessageReceiveJob: JobExecutor { return failure(job, JobRunnerError.missingRequiredDetails, true) } - var lastError: Error? - - dependencies[singleton: .storage].write { db in - // Send any SharedConfigMessages to the LibSession to handle it - do { + dependencies[singleton: .storage].writeAsync( + updates: { db in try dependencies.mutate(cache: .libSession) { cache in try cache.handleConfigMessages( db, @@ -67,19 +64,19 @@ public enum ConfigMessageReceiveJob: JobExecutor { messages: details.messages ) } - } - catch { lastError = error } - } - - // Handle the result - switch lastError { - case .some(let error): - Log.error(.cat, "Couldn't receive config message due to error: \(error)") - removeDependencyOnMessageReceiveJobs() - failure(job, error, true) + }, + completion: { result in + // Handle the result + switch result { + case .failure(let error): + Log.error(.cat, "Couldn't receive config message due to error: \(error)") + removeDependencyOnMessageReceiveJobs() + failure(job, error, true) - case .none: success(job, false) - } + case .success: success(job, false) + } + } + ) } } diff --git a/SessionMessagingKit/Jobs/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/MessageReceiveJob.swift index b4de9ba33..53e7f2f0c 100644 --- a/SessionMessagingKit/Jobs/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/MessageReceiveJob.swift @@ -51,65 +51,74 @@ public enum MessageReceiveJob: JobExecutor { } } - dependencies[singleton: .storage].write { db in - for (messageInfo, protoContent) in messageData { - do { - try MessageReceiver.handle( - db, - threadId: threadId, - threadVariant: messageInfo.threadVariant, - message: messageInfo.message, - serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, - associatedWithProto: protoContent, - using: dependencies - ) + dependencies[singleton: .storage].writeAsync( + updates: { db -> Error? in + for (messageInfo, protoContent) in messageData { + do { + try MessageReceiver.handle( + db, + threadId: threadId, + threadVariant: messageInfo.threadVariant, + message: messageInfo.message, + serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, + associatedWithProto: protoContent, + using: dependencies + ) + } + catch { + // If the current message is a permanent failure then override it with the + // new error (we want to retry if there is a single non-permanent error) + switch error { + // Ignore duplicate and self-send errors (these will usually be caught during + // parsing but sometimes can get past and conflict at database insertion - eg. + // for open group messages) we also don't bother logging as it results in + // excessive logging which isn't useful) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + DatabaseError.SQLITE_CONSTRAINT, // Sometimes thrown for UNIQUE + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + case let receiverError as MessageReceiverError where !receiverError.isRetryable: + Log.error(.cat, "Permanently failed message due to error: \(error)") + continue + + default: + Log.error(.cat, "Couldn't receive message due to error: \(error)") + lastError = error + + // We failed to process this message but it is a retryable error + // so add it to the list to re-process + remainingMessagesToProcess.append(messageInfo) + } + } } - catch { - // If the current message is a permanent failure then override it with the - // new error (we want to retry if there is a single non-permanent error) - switch error { - // Ignore duplicate and self-send errors (these will usually be caught during - // parsing but sometimes can get past and conflict at database insertion - eg. - // for open group messages) we also don't bother logging as it results in - // excessive logging which isn't useful) - case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, - DatabaseError.SQLITE_CONSTRAINT, // Sometimes thrown for UNIQUE - MessageReceiverError.duplicateMessage, - MessageReceiverError.duplicateControlMessage, - MessageReceiverError.selfSend: - break - - case let receiverError as MessageReceiverError where !receiverError.isRetryable: - Log.error(.cat, "Permanently failed message due to error: \(error)") - continue + + // If any messages failed to process then we want to update the job to only include + // those failed messages + guard !remainingMessagesToProcess.isEmpty else { return nil } + + updatedJob = try job + .with(details: Details(messages: remainingMessagesToProcess)) + .defaulting(to: job) + .upserted(db) + + return lastError + }, + completion: { result in + // TODO: [REFACTOR] Need to test this!!! + // Handle the result + switch result { + case .failure(let error): failure(updatedJob, error, false) + case .success(.some(let error as MessageReceiverError)) where !error.isRetryable: + failure(updatedJob, error, true) - default: - Log.error(.cat, "Couldn't receive message due to error: \(error)") - lastError = error - - // We failed to process this message but it is a retryable error - // so add it to the list to re-process - remainingMessagesToProcess.append(messageInfo) - } + case .success(.some(let error)): failure(updatedJob, error, false) + case .success: success(updatedJob, false) } } - - // If any messages failed to process then we want to update the job to only include - // those failed messages - guard !remainingMessagesToProcess.isEmpty else { return } - - updatedJob = try job - .with(details: Details(messages: remainingMessagesToProcess)) - .defaulting(to: job) - .upserted(db) - } - - // Handle the result - switch lastError { - case let error as MessageReceiverError where !error.isRetryable: failure(updatedJob, error, true) - case .some(let error): failure(updatedJob, error, false) - case .none: success(updatedJob, false) - } + ) } } diff --git a/SessionUtilitiesKit/Database/Storage.swift b/SessionUtilitiesKit/Database/Storage.swift index 42b9f2a71..b81fa1edf 100644 --- a/SessionUtilitiesKit/Database/Storage.swift +++ b/SessionUtilitiesKit/Database/Storage.swift @@ -53,6 +53,9 @@ open class Storage { private static let writeTransactionStartTimeout: TimeInterval = 5 /// If a transaction takes longer than this duration then we should fail the transaction rather than keep hanging + /// + /// **Note:** This timeout only applies to synchronous operations (the assumption being that if we know an operation is going to + /// take a long time then we should probably be handling it asynchronously rather than a synchronous way) private static let transactionDeadlockTimeoutSeconds: Int = 5 private static var sharedDatabaseDirectoryPath: String { "\(SessionFileManager.nonInjectedAppSharedDataDirectoryPath)/database" } @@ -142,11 +145,6 @@ open class Storage { var config = Configuration() config.label = Storage.queuePrefix config.maximumReaderCount = 10 /// Increase the max read connection limit - Default is 5 - - /// It seems we should do this per https://github.com/groue/GRDB.swift/pull/1485 but with this change - /// we then need to define how long a write transaction should wait for before timing out (read transactions always run - /// in`DEFERRED` mode so won't be affected by these settings) - config.defaultTransactionKind = .immediate config.busyMode = .timeout(Storage.writeTransactionStartTimeout) /// Load in the SQLCipher keys @@ -551,6 +549,13 @@ open class Storage { case valid(DatabaseWriter) case invalid(Error) + var forcedError: Error { + switch self { + case .valid: return StorageError.validStorageIncorrectlyHandledAsError + case .invalid(let error): return error + } + } + init(_ storage: Storage?) { switch (storage?.isSuspended, storage?.isValid, storage?.dbWriter) { case (true, _, _): self = .invalid(StorageError.databaseSuspended) @@ -559,38 +564,46 @@ open class Storage { } } - static func logIfNeeded(_ error: Error, isWrite: Bool) { + fileprivate static func logIfNeeded(_ error: Error, info: Storage.CallInfo) { + let action: String = (info.isWrite ? "write" : "read") + switch error { case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT, DatabaseError.SQLITE_ERROR: let message: String = ((error as? DatabaseError)?.message ?? "Unknown") - Log.error(.storage, "Database \(isWrite ? "write" : "read") failed due to error: \(message)") + Log.error(.storage, "Database \(action) failed due to error: \(message) - [ \(info.callInfo) ]") case StorageError.databaseInvalid: - Log.error(.storage, "Database \(isWrite ? "write" : "read") failed as the database is invalid.") + Log.error(.storage, "Database \(action) failed as the database is invalid - [ \(info.callInfo) ]") case StorageError.databaseSuspended: - Log.error(.storage, "Database \(isWrite ? "write" : "read") failed as the database is suspended.") + Log.error(.storage, "Database \(action) failed as the database is suspended - [ \(info.callInfo) ]") case StorageError.transactionDeadlockTimeout: - Log.critical("[Storage] Database \(isWrite ? "write" : "read") failed due to a potential synchronous query deadlock timeout.") + Log.critical(.storage, "Database \(action) failed due to a potential synchronous query deadlock timeout - [ \(info.callInfo) ]") default: break } } - static func logIfNeeded(_ error: Error, isWrite: Bool) -> T? { - logIfNeeded(error, isWrite: isWrite) + fileprivate static func logIfNeeded(_ error: Error, info: Storage.CallInfo) -> T? { + logIfNeeded(error, info: info) return nil } - static func logIfNeeded(_ error: Error, isWrite: Bool) -> AnyPublisher { - logIfNeeded(error, isWrite: isWrite) + fileprivate static func logIfNeeded(_ error: Error, info: Storage.CallInfo) -> AnyPublisher { + logIfNeeded(error, info: info) return Fail(error: error).eraseToAnyPublisher() } } // MARK: - Operations + /// Internal type to wrap the database operation `Task` so it can be cancelled when used with `Combine` (since GRDB doesn't + /// actually handle publishers publishers) + final class TaskHolder { + var task: Task<(), Never>? + } + private static func track( _ db: Database, _ info: CallInfo, @@ -634,122 +647,108 @@ open class Storage { _ dependencies: Dependencies, _ operation: @escaping (Database) throws -> T, _ asyncCompletion: ((Result) -> Void)? = nil - ) -> Result { - // A serial queue for synchronizing completion updates. - let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue") + ) -> (result: Result, task: Task<(), Never>?) { + /// Ensure we are in a valid state + let storageState: StorageState = StorageState(info.storage) + + guard case .valid(let dbWriter) = storageState else { + if info.isAsync { asyncCompletion?(.failure(storageState.forcedError)) } + return (.failure(storageState.forcedError), nil) + } - weak var queryDb: Database? - var didTimeout: Bool = false + /// Setup required variables + let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue") + let semaphore: DispatchSemaphore = DispatchSemaphore(value: 0) var operationResult: Result? - let semaphore: DispatchSemaphore? = (info.isAsync ? nil : DispatchSemaphore(value: 0)) let logErrorIfNeeded: (Result) -> Result = { result in switch result { case .success: break - case .failure(let error): StorageState.logIfNeeded(error, isWrite: info.isWrite) + case .failure(let error): StorageState.logIfNeeded(error, info: info) } return result } + /// Convenience function to remove duplication func completeOperation(with result: Result) { syncQueue.sync { - guard !didTimeout && operationResult == nil else { return } + guard operationResult == nil else { return } operationResult = result - semaphore?.signal() + semaphore.signal() - // For async operations, log and invoke the completion closure. + /// For async operations, log and invoke the completion closure. if info.isAsync { asyncCompletion?(logErrorIfNeeded(result)) } } } - /// Perform the actual operation - switch (StorageState(info.storage), info.isWrite) { - case (.invalid(let error), _): completeOperation(with: .failure(error)) - case (.valid(let dbWriter), true): - dbWriter.asyncWrite( - { db in - syncQueue.sync { queryDb = db } - defer { syncQueue.sync { queryDb = nil } } - + let task: Task<(), Never> = Task { + return await withThrowingTaskGroup(of: T.self) { group in + /// Add the task to perform the actual database operation + group.addTask { + let trackedOperation: @Sendable (Database) throws -> T = { db in if dependencies[feature: .forceSlowDatabaseQueries] { Thread.sleep(forTimeInterval: 1) } return try Storage.track(db, info, operation) - }, - completion: { _, dbResult in completeOperation(with: dbResult) } - ) + } + + return (info.isWrite ? + try await dbWriter.write(trackedOperation) : + try await dbWriter.read(trackedOperation) + ) + } - case (.valid(let dbWriter), false): - dbWriter.asyncRead { dbResult in - do { - switch dbResult { - case .failure(let error): throw error - case .success(let db): - syncQueue.sync { queryDb = db } - defer { syncQueue.sync { queryDb = nil } } - - if dependencies[feature: .forceSlowDatabaseQueries] { - Thread.sleep(forTimeInterval: 1) - } - - completeOperation(with: .success(try Storage.track(db, info, operation))) + /// If this is a syncronous task then we want to the operation to timeout to ensure we don't unintentionally + /// create a deadlock + if !info.isAsync { + group.addTask { + let timeoutNanoseconds: UInt64 = UInt64(Storage.transactionDeadlockTimeoutSeconds * 1_000_000_000) + + /// If the debugger is attached then we want to have a lot of shorter sleep iterations as the clock doesn't get + /// paused when stopped on a breakpoint (and we don't want to end up having a bunch of false positive + /// database timeouts while debugging code) + /// + /// **Note:** `isDebuggerAttached` will always return `false` in production builds + if isDebuggerAttached() { + let numIterations: UInt64 = 50 + + for _ in (0..? = await group.nextResult() + group.cancelAll() + completeOperation(with: result ?? .failure(StorageError.invalidQueryResult)) } - - return logErrorIfNeeded(operationResult ?? .failure(StorageError.transactionDeadlockTimeout)) } /// For the `async` operation the returned value should be ignored so just return the `invalidQueryResult` error - return .failure(StorageError.invalidQueryResult) + guard !info.isAsync else { + return (.failure(StorageError.invalidQueryResult), task) + } + + /// Block until we have a result + semaphore.wait() + return (logErrorIfNeeded(operationResult ?? .failure(StorageError.transactionDeadlockTimeout)), task) } private func performPublisherOperation( @@ -759,59 +758,33 @@ open class Storage { isWrite: Bool, _ operation: @escaping (Database) throws -> T ) -> AnyPublisher { + let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, (isWrite ? .asyncWrite : .asyncRead)) + switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false) + case .invalid(let error): return StorageState.logIfNeeded(error, info: info) case .valid: /// **Note:** GRDB does have `readPublisher`/`writePublisher` functions but it appears to asynchronously /// trigger both the `output` and `complete` closures at the same time which causes a lot of unexpected /// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code /// for more information see https://github.com/groue/GRDB.swift/issues/1334) /// - /// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled - /// which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does - let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, .syncWrite) + /// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduler + /// (which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does) + /// and hooking that into our `performOperation` function which uses the GRDB async/await functions that support + /// cancellation (as we want to support cancellation as well) + let holder: TaskHolder = TaskHolder() + return Deferred { [dependencies] in Future { resolver in - resolver(Storage.performOperation(info, dependencies, operation)) + let (_, task) = Storage.performOperation(info, dependencies, operation) { result in + resolver(result) + } + holder.task = task } - }.eraseToAnyPublisher() - } - } - - private static func debugWait(semaphore: DispatchSemaphore, info: CallInfo) -> DispatchTimeoutResult { - let pollQueue: DispatchQueue = DispatchQueue(label: "com.session.debugWaitTimer.\(UUID().uuidString)") - let standardPollInterval: DispatchTimeInterval = .milliseconds(100) - var iterations: Int = 0 - let maxIterations: Int = ((Storage.transactionDeadlockTimeoutSeconds * 1000) / standardPollInterval.milliseconds) - let pollCompletionSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0) - - /// Stagger the size of the `pollIntervals` to avoid holding up the thread in case the query resolves very quickly (this - /// means the timeout will occur ~500ms early but helps prevent false main thread lag appearing when debugging that wouldn't - /// affect production) - let pollIntervals: [DispatchTimeInterval] = [ - .milliseconds(5), .milliseconds(5), .milliseconds(10), .milliseconds(10), .milliseconds(10), - standardPollInterval - ] - - func pollSemaphore() { - iterations += 1 - - guard iterations < maxIterations && semaphore.wait(timeout: .now()) != .success else { - pollCompletionSemaphore.signal() - return - } - - let nextInterval: DispatchTimeInterval = pollIntervals[min(iterations, pollIntervals.count - 1)] - pollQueue.asyncAfter(deadline: .now() + nextInterval) { - pollSemaphore() - } + } + .handleEvents(receiveCancel: { holder.task?.cancel() }) + .eraseToAnyPublisher() } - - /// Poll the semaphore in a background queue - pollQueue.asyncAfter(deadline: .now() + pollIntervals[0]) { pollSemaphore() } - pollCompletionSemaphore.wait() // Wait indefinitely for the timer semaphore - - return (iterations >= 50 ? .timedOut : .success) } // MARK: - Functions @@ -823,8 +796,8 @@ open class Storage { updates: @escaping (Database) throws -> T? ) -> T? { switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncWrite), dependencies, updates) { - case .failure: return nil - case .success(let result): return result + case (.failure, _): return nil + case (.success(let result), _): return result } } @@ -854,8 +827,8 @@ open class Storage { _ value: @escaping (Database) throws -> T? ) -> T? { switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncRead), dependencies, value) { - case .failure: return nil - case .success(let result): return result + case (.failure, _): return nil + case (.success(let result), _): return result } } @@ -896,30 +869,32 @@ open class Storage { ) } + /// Add a database observation + /// + /// **Note:** This function **MUST NOT** be called from the main thread public func addObserver(_ observer: TransactionObserver?) { guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return } guard let observer: TransactionObserver = observer else { return } - // Note: This actually triggers a write to the database so can be blocked by other - // writes, since it's usually called on the main thread when creating a view controller - // this can result in the UI hanging - to avoid this we dispatch (and hope there isn't - // negative impact) - DispatchQueue.global(qos: .default).async { - dbWriter.add(transactionObserver: observer) - } + /// This actually triggers a write to the database so can be blocked by other writes so shouldn't be called on the main thread, + /// we don't dispatch to an async thread in here because `TransactionObserver` isn't `Sendable` so instead just require + /// that it isn't called on the main thread + Log.assertNotOnMainThread() + dbWriter.add(transactionObserver: observer) } + /// Remove a database observation + /// + /// **Note:** This function **MUST NOT** be called from the main thread public func removeObserver(_ observer: TransactionObserver?) { guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return } guard let observer: TransactionObserver = observer else { return } - // Note: This actually triggers a write to the database so can be blocked by other - // writes, since it's usually called on the main thread when creating a view controller - // this can result in the UI hanging - to avoid this we dispatch (and hope there isn't - // negative impact) - DispatchQueue.global(qos: .default).async { - dbWriter.remove(transactionObserver: observer) - } + /// This actually triggers a write to the database so can be blocked by other writes so shouldn't be called on the main thread, + /// we don't dispatch to an async thread in here because `TransactionObserver` isn't `Sendable` so instead just require + /// that it isn't called on the main thread + Log.assertNotOnMainThread() + dbWriter.remove(transactionObserver: observer) } } @@ -1028,7 +1003,7 @@ private extension Storage { result?.timer = nil let action: String = (info.isWrite ? "write" : "read") - Log.warn("[Storage] Slow \(action) taking longer than \(Storage.slowTransactionThreshold, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]") + Log.warn(.storage, "Slow \(action) taking longer than \(Storage.slowTransactionThreshold, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]") result?.wasSlowTransaction = true } result.timer?.resume() @@ -1044,7 +1019,7 @@ private extension Storage { let end: CFTimeInterval = CACurrentMediaTime() let action: String = (info.isWrite ? "write" : "read") - Log.warn("[Storage] Slow \(action) completed after \(end - start, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]") + Log.warn(.storage, "Slow \(action) completed after \(end - start, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]") } } } @@ -1159,13 +1134,18 @@ public extension Storage { } } -#if DEBUG +/// Function to determine if the debugger is attached +/// +/// **Note:** Only contains logic when `DEBUG` is defined, otherwise it always returns false func isDebuggerAttached() -> Bool { +#if DEBUG var info = kinfo_proc() var size = MemoryLayout.stride var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid()] let sysctlResult = sysctl(&mib, UInt32(mib.count), &info, &size, nil, 0) guard sysctlResult == 0 else { return false } return (info.kp_proc.p_flag & P_TRACED) != 0 -} +#else + return false #endif +} diff --git a/SessionUtilitiesKit/Database/StorageError.swift b/SessionUtilitiesKit/Database/StorageError.swift index 31981be8d..058456750 100644 --- a/SessionUtilitiesKit/Database/StorageError.swift +++ b/SessionUtilitiesKit/Database/StorageError.swift @@ -14,7 +14,12 @@ public enum StorageError: Error { case keySpecInaccessible case decodingFailed case invalidQueryResult + + /// This error is thrown when a synchronous operation takes longer than `Storage.transactionDeadlockTimeoutSeconds`, + /// the assumption being that if we know an operation is going to take a long time then we should probably be handling it asynchronously + /// rather than a synchronous way case transactionDeadlockTimeout + case validStorageIncorrectlyHandledAsError case failedToSave case objectNotFound diff --git a/SessionUtilitiesKit/General/Logging.swift b/SessionUtilitiesKit/General/Logging.swift index 92a552064..110ade7bb 100644 --- a/SessionUtilitiesKit/General/Logging.swift +++ b/SessionUtilitiesKit/General/Logging.swift @@ -335,12 +335,29 @@ public enum Log { function: StaticString = #function, line: UInt = #line ) { - guard !Thread.isMainThread else { return } - - let filename: String = URL(fileURLWithPath: "\(file)").lastPathComponent - let formattedMessage: String = "[\(filename):\(line) \(function)] Must be on main thread." - custom(.critical, [], formattedMessage, file: file, function: function, line: line) - assertionFailure(formattedMessage) + switch Thread.isMainThread { + case true: return + case false: + let filename: String = URL(fileURLWithPath: "\(file)").lastPathComponent + let formattedMessage: String = "[\(filename):\(line) \(function)] Must be on main thread." + custom(.critical, [], formattedMessage, file: file, function: function, line: line) + assertionFailure(formattedMessage) + } + } + + public static func assertNotOnMainThread( + file: StaticString = #file, + function: StaticString = #function, + line: UInt = #line + ) { + switch Thread.isMainThread { + case false: return + case true: + let filename: String = URL(fileURLWithPath: "\(file)").lastPathComponent + let formattedMessage: String = "[\(filename):\(line) \(function)] Must NOT be on main thread." + custom(.critical, [], formattedMessage, file: file, function: function, line: line) + assertionFailure(formattedMessage) + } } public static func custom( From e11375fdebf6fc36af190577336b1622fb96e824 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 14 Mar 2025 09:30:29 +1100 Subject: [PATCH 2/5] Fixed the broken unit tests --- SessionMessagingKit/Open Groups/OpenGroupAPI.swift | 4 ++-- SessionMessagingKitTests/Open Groups/OpenGroupAPISpec.swift | 1 + SessionSnodeKit/Types/PreparedRequest.swift | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/SessionMessagingKit/Open Groups/OpenGroupAPI.swift b/SessionMessagingKit/Open Groups/OpenGroupAPI.swift index 7afb36218..e274a5941 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupAPI.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupAPI.swift @@ -321,7 +321,7 @@ public enum OpenGroupAPI { using: dependencies ) .signed(db, with: OpenGroupAPI.signRequest, using: dependencies) - .map { (info: ResponseInfoType, response: Network.BatchResponseMap) -> CapabilitiesAndRoomResponse in + .tryMap { (info: ResponseInfoType, response: Network.BatchResponseMap) -> CapabilitiesAndRoomResponse in let maybeCapabilities: Network.BatchSubResponse? = (response[.capabilities] as? Network.BatchSubResponse) let maybeRoomResponse: Any? = response.data .first(where: { key, _ in @@ -372,7 +372,7 @@ public enum OpenGroupAPI { using: dependencies ) .signed(db, with: OpenGroupAPI.signRequest, using: dependencies) - .map { (info: ResponseInfoType, response: Network.BatchResponseMap) -> CapabilitiesAndRoomsResponse in + .tryMap { (info: ResponseInfoType, response: Network.BatchResponseMap) -> CapabilitiesAndRoomsResponse in let maybeCapabilities: Network.BatchSubResponse? = (response[.capabilities] as? Network.BatchSubResponse) let maybeRooms: Network.BatchSubResponse<[Room]>? = response.data .first(where: { key, _ in diff --git a/SessionMessagingKitTests/Open Groups/OpenGroupAPISpec.swift b/SessionMessagingKitTests/Open Groups/OpenGroupAPISpec.swift index 595dbcde2..87f362c45 100644 --- a/SessionMessagingKitTests/Open Groups/OpenGroupAPISpec.swift +++ b/SessionMessagingKitTests/Open Groups/OpenGroupAPISpec.swift @@ -17,6 +17,7 @@ class OpenGroupAPISpec: QuickSpec { @TestState var dependencies: TestDependencies! = TestDependencies { dependencies in dependencies.dateNow = Date(timeIntervalSince1970: 1234567890) + dependencies.forceSynchronous = true } @TestState(singleton: .storage, in: dependencies) var mockStorage: Storage! = SynchronousStorage( customWriter: try! DatabaseQueue(), diff --git a/SessionSnodeKit/Types/PreparedRequest.swift b/SessionSnodeKit/Types/PreparedRequest.swift index c6cf77171..224e8340f 100644 --- a/SessionSnodeKit/Types/PreparedRequest.swift +++ b/SessionSnodeKit/Types/PreparedRequest.swift @@ -488,7 +488,7 @@ public extension Network.PreparedRequest { /// Due to the way prepared requests work we need to cast between different types and as a result can't avoid potentially /// throwing when mapping so the `map` function just calls through to the `tryMap` function, but we have both to make /// the interface more consistent for dev use - func map(transform: @escaping (ResponseInfoType, R) throws -> O) -> Network.PreparedRequest { + func map(transform: @escaping (ResponseInfoType, R) -> O) -> Network.PreparedRequest { return tryMap(transform: transform) } From 846aa695c26cb692d987477d4aeff75408d9bbf0 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 14 Mar 2025 10:14:40 +1100 Subject: [PATCH 3/5] Updated searching to use a publisher and cancel it (instead of db interrupt) --- .../Conversations/ConversationSearch.swift | 49 ++++----- Session/Conversations/ConversationVC.swift | 4 +- .../GlobalSearchViewController.swift | 100 ++++++++---------- .../Utilities/Database+Utilities.swift | 6 -- 4 files changed, 70 insertions(+), 89 deletions(-) diff --git a/Session/Conversations/ConversationSearch.swift b/Session/Conversations/ConversationSearch.swift index 1e2e59c11..ca6515ca8 100644 --- a/Session/Conversations/ConversationSearch.swift +++ b/Session/Conversations/ConversationSearch.swift @@ -1,6 +1,7 @@ // Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import UIKit +import Combine import GRDB import SignalUtilitiesKit import SessionUIKit @@ -85,31 +86,25 @@ extension ConversationSearchController: UISearchResultsUpdating { } let threadId: String = self.threadId - - DispatchQueue.global(qos: .default).async { [weak self] in - let results: [Interaction.TimestampInfo]? = dependencies[singleton: .storage].read { db -> [Interaction.TimestampInfo] in - self?.resultsBar.willStartSearching(readConnection: db) - - return try Interaction.idsForTermWithin( + let searchCancellable: AnyCancellable = dependencies[singleton: .storage] + .readPublisher { db -> [Interaction.TimestampInfo] in + try Interaction.idsForTermWithin( threadId: threadId, pattern: try SessionThreadViewModel.pattern(db, searchTerm: searchText) ) .fetchAll(db) } - - // If we didn't get results back then we most likely interrupted the query so - // should ignore the results (if there are no results we would succeed and get - // an empty array back) - guard let results: [Interaction.TimestampInfo] = results else { return } - - DispatchQueue.main.async { - guard let strongSelf = self else { return } - - self?.resultsBar.stopLoading() - self?.resultsBar.updateResults(results: results, visibleItemIds: self?.delegate?.currentVisibleIds()) - self?.delegate?.conversationSearchController(strongSelf, didUpdateSearchResults: results, searchText: searchText) - } - } + .subscribe(on: DispatchQueue.global(qos: .default), using: dependencies) + .receive(on: DispatchQueue.main, using: dependencies) + .sink( + receiveCompletion: { _ in }, + receiveValue: { [weak self] results in + self?.resultsBar.stopLoading() + self?.resultsBar.updateResults(results: results, visibleItemIds: self?.delegate?.currentVisibleIds()) + self?.delegate?.conversationSearchController(self, didUpdateSearchResults: results, searchText: searchText) + } + ) + self.resultsBar.willStartSearching(searchCancellable: searchCancellable) } } @@ -138,7 +133,7 @@ protocol SearchResultsBarDelegate: AnyObject { public final class SearchResultsBar: UIView { @ThreadSafe private var hasResults: Bool = false @ThreadSafeObject private var results: [Interaction.TimestampInfo] = [] - @ThreadSafeObject private var readConnection: Database? = nil + @ThreadSafeObject private var currentSearchCancellable: AnyCancellable? = nil var currentIndex: Int? weak var resultsBarDelegate: SearchResultsBarDelegate? @@ -275,8 +270,7 @@ public final class SearchResultsBar: UIView { // MARK: - Content - /// This method will be called within a DB read block - func willStartSearching(readConnection: Database) { + func willStartSearching(searchCancellable: AnyCancellable) { let hasNoExistingResults: Bool = hasResults DispatchQueue.main.async { [weak self] in @@ -287,8 +281,8 @@ public final class SearchResultsBar: UIView { self?.startLoading() } - self.readConnection?.interrupt() - self._readConnection.set(to: readConnection) + currentSearchCancellable?.cancel() + _currentSearchCancellable.set(to: searchCancellable) } func updateResults(results: [Interaction.TimestampInfo]?, visibleItemIds: [Int64]?) { @@ -311,7 +305,6 @@ public final class SearchResultsBar: UIView { return 0 }() - self._readConnection.set(to: nil) self._results.performUpdate { _ in (results ?? []) } self.hasResults = (results != nil) @@ -366,6 +359,6 @@ public final class SearchResultsBar: UIView { public protocol ConversationSearchControllerDelegate: UISearchControllerDelegate { func conversationSearchControllerDependencies() -> Dependencies func currentVisibleIds() -> [Int64] - func conversationSearchController(_ conversationSearchController: ConversationSearchController, didUpdateSearchResults results: [Interaction.TimestampInfo]?, searchText: String?) - func conversationSearchController(_ conversationSearchController: ConversationSearchController, didSelectInteractionInfo: Interaction.TimestampInfo) + func conversationSearchController(_ conversationSearchController: ConversationSearchController?, didUpdateSearchResults results: [Interaction.TimestampInfo]?, searchText: String?) + func conversationSearchController(_ conversationSearchController: ConversationSearchController?, didSelectInteractionInfo: Interaction.TimestampInfo) } diff --git a/Session/Conversations/ConversationVC.swift b/Session/Conversations/ConversationVC.swift index 2f303b22f..54c77a844 100644 --- a/Session/Conversations/ConversationVC.swift +++ b/Session/Conversations/ConversationVC.swift @@ -1968,12 +1968,12 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa func conversationSearchControllerDependencies() -> Dependencies { return viewModel.dependencies } func currentVisibleIds() -> [Int64] { return (fullyVisibleCellViewModels() ?? []).map { $0.id } } - func conversationSearchController(_ conversationSearchController: ConversationSearchController, didUpdateSearchResults results: [Interaction.TimestampInfo]?, searchText: String?) { + func conversationSearchController(_ conversationSearchController: ConversationSearchController?, didUpdateSearchResults results: [Interaction.TimestampInfo]?, searchText: String?) { viewModel.lastSearchedText = searchText tableView.reloadRows(at: tableView.indexPathsForVisibleRows ?? [], with: UITableView.RowAnimation.none) } - func conversationSearchController(_ conversationSearchController: ConversationSearchController, didSelectInteractionInfo interactionInfo: Interaction.TimestampInfo) { + func conversationSearchController(_ conversationSearchController: ConversationSearchController?, didSelectInteractionInfo interactionInfo: Interaction.TimestampInfo) { scrollToInteractionIfNeeded(with: interactionInfo, focusBehaviour: .highlight) } diff --git a/Session/Home/GlobalSearch/GlobalSearchViewController.swift b/Session/Home/GlobalSearch/GlobalSearchViewController.swift index 7b8fe65fa..91163761c 100644 --- a/Session/Home/GlobalSearch/GlobalSearchViewController.swift +++ b/Session/Home/GlobalSearch/GlobalSearchViewController.swift @@ -1,6 +1,7 @@ // Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import UIKit +import Combine import GRDB import DifferenceKit import SessionUIKit @@ -106,7 +107,7 @@ class GlobalSearchViewController: BaseVC, LibSessionRespondingViewController, UI ) }() - @ThreadSafeObject private var readConnection: Database? = nil + @ThreadSafeObject private var currentSearchCancellable: AnyCancellable? = nil private lazy var searchResultSet: SearchResultData = defaultSearchResults private var termForCurrentSearchResultSet: String = "" private var lastSearchText: String? @@ -256,61 +257,54 @@ class GlobalSearchViewController: BaseVC, LibSessionRespondingViewController, UI guard force || lastSearchText != searchText else { return } lastSearchText = searchText - - DispatchQueue.global(qos: .default).async { [weak self, dependencies] in - self?.readConnection?.interrupt() - - let result: Result<[SectionModel], Error>? = dependencies[singleton: .storage].read { db -> Result<[SectionModel], Error> in - self?._readConnection.set(to: db) + currentSearchCancellable?.cancel() + + _currentSearchCancellable.set(to: dependencies[singleton: .storage] + .readPublisher { [dependencies] db -> [SectionModel] in + let userSessionId: SessionId = dependencies[cache: .general].sessionId + let contactsAndGroupsResults: [SessionThreadViewModel] = try SessionThreadViewModel + .contactsAndGroupsQuery( + userSessionId: userSessionId, + pattern: try SessionThreadViewModel.pattern(db, searchTerm: searchText), + searchTerm: searchText + ) + .fetchAll(db) + Thread.sleep(forTimeInterval: 1) + let messageResults: [SessionThreadViewModel] = try SessionThreadViewModel + .messagesQuery( + userSessionId: userSessionId, + pattern: try SessionThreadViewModel.pattern(db, searchTerm: searchText) + ) + .fetchAll(db) - do { - let userSessionId: SessionId = dependencies[cache: .general].sessionId - let contactsAndGroupsResults: [SessionThreadViewModel] = try SessionThreadViewModel - .contactsAndGroupsQuery( - userSessionId: userSessionId, - pattern: try SessionThreadViewModel.pattern(db, searchTerm: searchText), - searchTerm: searchText - ) - .fetchAll(db) - let messageResults: [SessionThreadViewModel] = try SessionThreadViewModel - .messagesQuery( - userSessionId: userSessionId, - pattern: try SessionThreadViewModel.pattern(db, searchTerm: searchText) - ) - .fetchAll(db) - - return .success([ - ArraySection(model: .contactsAndGroups, elements: contactsAndGroupsResults), - ArraySection(model: .messages, elements: messageResults) - ]) - } - catch { - // Don't log the 'interrupt' error as that's just the user typing too fast - if (error as? DatabaseError)?.resultCode != DatabaseError.SQLITE_INTERRUPT { - SNLog("[GlobalSearch] Failed to find results due to error: \(error)") - } - - return .failure(error) - } + return [ + ArraySection(model: .contactsAndGroups, elements: contactsAndGroupsResults), + ArraySection(model: .messages, elements: messageResults) + ] } - self?._readConnection.set(to: nil) - - DispatchQueue.main.async { - switch result { - case .success(let sections): - self?.termForCurrentSearchResultSet = searchText - self?.searchResultSet = SearchResultData( - state: (sections.map { $0.elements.count }.reduce(0, +) > 0) ? .results : .none, - data: sections - ) - self?.isLoading = false - self?.tableView.reloadData() - self?.refreshTimer = nil - - default: break + .subscribe(on: DispatchQueue.global(qos: .default), using: dependencies) + .receive(on: DispatchQueue.main, using: dependencies) + .sink( + receiveCompletion: { result in + /// Cancelling the search results in `receiveCompletion` not getting called so we can just log any + /// errors we get without needing to filter out "cancelled search" cases + switch result { + case .finished: break + case .failure(let error): + SNLog("[GlobalSearch] Failed to find results due to error: \(error)") + } + }, + receiveValue: { [weak self] sections in + self?.termForCurrentSearchResultSet = searchText + self?.searchResultSet = SearchResultData( + state: (sections.map { $0.elements.count }.reduce(0, +) > 0) ? .results : .none, + data: sections + ) + self?.isLoading = false + self?.tableView.reloadData() + self?.refreshTimer = nil } - } - } + )) } @objc func cancel() { diff --git a/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift b/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift index 1c9a606e1..add7923f4 100644 --- a/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift +++ b/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift @@ -21,12 +21,6 @@ public extension Database { return try makeFTS5Pattern(rawPattern: rawPattern, forTable: table.databaseTableName) } - func interrupt() { - guard sqliteConnection != nil else { return } - - sqlite3_interrupt(sqliteConnection) - } - /// This is a custom implementation of the `afterNextTransaction` method which executes the closures within their own /// transactions to allow for nesting of 'afterNextTransaction' actions /// From 69c60b0090aba04b5e9f6a8f977e31fd86d14b91 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 14 Mar 2025 10:42:05 +1100 Subject: [PATCH 4/5] Track current db tasks and cancel when suspending --- SessionUtilitiesKit/Database/Storage.swift | 29 ++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/SessionUtilitiesKit/Database/Storage.swift b/SessionUtilitiesKit/Database/Storage.swift index b81fa1edf..8a2da7c24 100644 --- a/SessionUtilitiesKit/Database/Storage.swift +++ b/SessionUtilitiesKit/Database/Storage.swift @@ -91,6 +91,9 @@ open class Storage { /// This property gets set the first time we successfully write to the database public private(set) var hasSuccessfullyWritten: Bool = false + /// This property keeps track of all current database tasks and can be used when suspending the database to explicitly + /// cancel any currently running tasks + @ThreadSafeObject private var currentTasks: Set> = [] // MARK: - Initialization @@ -483,7 +486,12 @@ open class Storage { guard !isSuspended else { return } isSuspended = true - Log.info(.storage, "Database access suspended.") + Log.info(.storage, "Database access suspended - cancelling \(currentTasks.count) running task(s).") + + /// Before triggering an `interrupt` (which will forcibly kill in-progress database queries) we want to try to cancel all + /// database tasks to give them a small chance to resolve cleanly before we take a brute-force approach + currentTasks.forEach { $0.cancel() } + _currentTasks.performUpdate { _ in [] } /// Interrupt any open transactions (if this function is called then we are expecting that all processes have finished running /// and don't actually want any more transactions to occur) @@ -660,6 +668,7 @@ open class Storage { let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue") let semaphore: DispatchSemaphore = DispatchSemaphore(value: 0) var operationResult: Result? + var operationTask: Task<(), Never>? let logErrorIfNeeded: (Result) -> Result = { result in switch result { case .success: break @@ -673,6 +682,7 @@ open class Storage { func completeOperation(with result: Result) { syncQueue.sync { guard operationResult == nil else { return } + info.storage?.removeTask(operationTask) operationResult = result semaphore.signal() @@ -741,6 +751,10 @@ open class Storage { } } + /// Store the task in case we want to + info.storage?.addTask(task) + operationTask = task + /// For the `async` operation the returned value should be ignored so just return the `invalidQueryResult` error guard !info.isAsync else { return (.failure(StorageError.invalidQueryResult), task) @@ -782,13 +796,24 @@ open class Storage { holder.task = task } } - .handleEvents(receiveCancel: { holder.task?.cancel() }) + .handleEvents(receiveCancel: { [weak self] in + holder.task?.cancel() + self?.removeTask(holder.task) + }) .eraseToAnyPublisher() } } // MARK: - Functions + private func addTask(_ task: Task<(), Never>) { + _currentTasks.performUpdate { $0.inserting(task) } + } + + private func removeTask(_ task: Task<(), Never>?) { + _currentTasks.performUpdate { $0.removing(task) } + } + @discardableResult public func write( fileName file: String = #file, functionName funcN: String = #function, From 975fc52b5dcc98ca7b0dd57a5f8ff8a32c1d876c Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Mon, 17 Mar 2025 11:07:32 +1100 Subject: [PATCH 5/5] Removed a left over TODO --- SessionMessagingKit/Jobs/MessageReceiveJob.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/SessionMessagingKit/Jobs/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/MessageReceiveJob.swift index 53e7f2f0c..df2980c8a 100644 --- a/SessionMessagingKit/Jobs/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/MessageReceiveJob.swift @@ -107,7 +107,6 @@ public enum MessageReceiveJob: JobExecutor { return lastError }, completion: { result in - // TODO: [REFACTOR] Need to test this!!! // Handle the result switch result { case .failure(let error): failure(updatedJob, error, false)