diff --git a/Session/Calls/Call Management/SessionCall.swift b/Session/Calls/Call Management/SessionCall.swift index 30d60b92b..42fcdbdb3 100644 --- a/Session/Calls/Call Management/SessionCall.swift +++ b/Session/Calls/Call Management/SessionCall.swift @@ -387,7 +387,7 @@ public final class SessionCall: CurrentCallProtocol, WebRTCSessionDelegate { using: dependencies ) }, - completion: { _, _ in + completion: { _ in Singleton.callManager.suspendDatabaseIfCallEndedInBackground() } ) diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index eff54ee4f..0fbd1d514 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -269,7 +269,7 @@ extension ConversationVC: updates: { db in db[.isGiphyEnabled] = true }, - completion: { _, _ in + completion: { _ in DispatchQueue.main.async { self?.handleGIFButtonTapped() } diff --git a/Session/Conversations/ConversationVC.swift b/Session/Conversations/ConversationVC.swift index b255442e2..0eb05a8fd 100644 --- a/Session/Conversations/ConversationVC.swift +++ b/Session/Conversations/ConversationVC.swift @@ -554,12 +554,15 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa ) && viewModel.threadData.threadIsNoteToSelf == false && viewModel.threadData.threadShouldBeVisible == false && - !LibSession.conversationInConfig( - threadId: threadId, - threadVariant: viewModel.threadData.threadVariant, - visibleOnly: false, - using: viewModel.dependencies - ) + !Storage.shared.read({ [dependencies = viewModel.dependencies, threadVariant = viewModel.threadData.threadVariant] db in + LibSession.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: false, + using: dependencies + ) + }).defaulting(to: false) { Storage.shared.writeAsync { db in _ = try SessionThread // Intentionally use `deleteAll` here instead of `deleteOrLeave` @@ -659,7 +662,7 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa // PagedDatabaseObserver won't have them so we need to force a re-fetch of the current // data to ensure everything is up to date if didReturnFromBackground { - DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 0.01) { + DispatchQueue.global(qos: .background).async { self?.viewModel.pagedDataObserver?.reload() } } diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index be83298a5..5a18e72cd 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -198,7 +198,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate, NavigatableStateHold ) // Run the initial query on a background thread so we don't block the push transition - DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.01) { [weak self] in + DispatchQueue.global(qos: .userInitiated).async { [weak self] in // If we don't have a `initialFocusedInfo` then default to `.pageBefore` (it'll query // from a `0` offset) switch (focusedInteractionInfo ?? initialData?.initialUnreadInteractionInfo) { diff --git a/Session/Meta/AppDelegate.swift b/Session/Meta/AppDelegate.swift index 6415a28c5..06286f526 100644 --- a/Session/Meta/AppDelegate.swift +++ b/Session/Meta/AppDelegate.swift @@ -567,7 +567,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD /// There is a warning which can happen on launch because the Database read can be blocked by another database operation /// which could result in this blocking the main thread, as a result we want to check the identity exists on a background thread /// and then return to the main thread only when required - DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 0.01) { [weak self] in + DispatchQueue.global(qos: .default).async { [weak self] in guard Identity.userExists() else { return } self?.enableBackgroundRefreshIfNecessary() @@ -682,7 +682,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD /// We want to start observing the changes for the 'HomeVC' and want to wait until we actually get data back before we /// continue as we don't want to show a blank home screen - DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.01) { + DispatchQueue.global(qos: .userInitiated).async { viewController.startObservingChanges() { populateHomeScreenTimer.invalidate() @@ -722,7 +722,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD /// On application startup the `Storage.read` can be slightly slow while GRDB spins up it's database /// read pools (up to a few seconds), since this read is blocking we want to dispatch it to run async to ensure /// we don't block user interaction while it's running - DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 0.01) { + DispatchQueue.global(qos: .default).async { let unreadCount: Int = Storage.shared .read { db in try Interaction.fetchUnreadCount(db) } .defaulting(to: 0) @@ -817,10 +817,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD /// Start the pollers on a background thread so that any database queries they need to run don't /// block the main thread - /// - /// **Note:** We add a delay of `0.01` to prevent potential database re-entrancy if this is triggered - /// within the completion block of a database transaction, this gives it the time to complete the transaction - DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 0.01) { [weak self] in + DispatchQueue.global(qos: .background).async { [weak self] in self?.poller.start() guard shouldStartGroupPollers else { return } diff --git a/Session/Notifications/PushRegistrationManager.swift b/Session/Notifications/PushRegistrationManager.swift index 4384fc9a4..2ea1c1c67 100644 --- a/Session/Notifications/PushRegistrationManager.swift +++ b/Session/Notifications/PushRegistrationManager.swift @@ -292,11 +292,9 @@ public class PushRegistrationManager: NSObject, PKPushRegistryDelegate, PushRegi dependencies.storage.resumeDatabaseAccess() LibSession.resumeNetworkAccess() - let maybeCall: SessionCall? = Storage.shared.read { [dependencies = self.dependencies] db in - var call: SessionCall? = nil - + let maybeCall: SessionCall? = Storage.shared.read { [dependencies = self.dependencies] db -> SessionCall? in do { - call = SessionCall( + let call: SessionCall = SessionCall( db, for: caller, uuid: uuid, @@ -309,12 +307,14 @@ public class PushRegistrationManager: NSObject, PKPushRegistryDelegate, PushRegi .filter(Interaction.Columns.messageUuid == uuid) .fetchOne(db) - call?.callInteractionId = interaction?.id - } catch { + call.callInteractionId = interaction?.id + return call + } + catch { SNLog("[Calls] Failed to create call due to error: \(error)") } - return call + return nil } guard let call: SessionCall = maybeCall else { diff --git a/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift b/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift index 5e74c2e4e..6c1294bac 100644 --- a/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift +++ b/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift @@ -343,12 +343,10 @@ public enum GarbageCollectionJob: JobExecutor { .deleteAll(db) } }, - completion: { _, _ in + completion: { _ in // Dispatch async so we can swap from the write queue to a read one (we are done - // writing), this has to be done after a slight delay to ensure the transaction - // provided by the completion block completes first (ie. so we don't hit - // re-entrancy issues) - queue.asyncAfter(deadline: .now() + 0.01) { + // writing) + queue.async { // Retrieve a list of all valid attachmnet and avatar file paths struct FileInfo { let attachmentLocalRelativePaths: Set diff --git a/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift b/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift index 72a0f3dad..965962635 100644 --- a/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift +++ b/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift @@ -50,17 +50,16 @@ public enum UpdateProfilePictureJob: JobExecutor { queue: queue, displayPictureUpdate: (profilePictureData.map { .currentUserUploadImageData($0) } ?? .none), success: { db in - // Need to call the 'success' closure asynchronously on the queue after a slight - // delay to prevent a reentrancy issue as it will write to the database and this - // closure is already called within another database write - queue.asyncAfter(deadline: .now() + 0.01) { + queue.async { SNLog("[UpdateProfilePictureJob] Profile successfully updated") success(job, false, dependencies) } }, failure: { error in - SNLog("[UpdateProfilePictureJob] Failed to update profile") - failure(job, error, false, dependencies) + queue.async { + SNLog("[UpdateProfilePictureJob] Failed to update profile") + failure(job, error, false, dependencies) + } } ) } diff --git a/SessionMessagingKit/LibSession/Config Handling/LibSession+Shared.swift b/SessionMessagingKit/LibSession/Config Handling/LibSession+Shared.swift index c2cefa26f..b78a2dc7d 100644 --- a/SessionMessagingKit/LibSession/Config Handling/LibSession+Shared.swift +++ b/SessionMessagingKit/LibSession/Config Handling/LibSession+Shared.swift @@ -443,8 +443,8 @@ public extension LibSession { return contact.priority case .community: - let maybeUrlInfo: OpenGroupUrlInfo? = Storage.shared - .read { db in try OpenGroupUrlInfo.fetchAll(db, ids: [threadId]) }? + let maybeUrlInfo: OpenGroupUrlInfo? = (try? OpenGroupUrlInfo + .fetchAll(db, ids: [threadId]))? .first guard @@ -454,7 +454,7 @@ public extension LibSession { else { return LibSession.defaultNewThreadPriority } var community: ugroups_community_info = ugroups_community_info() - let result: Bool = user_groups_get_community(conf, &community, &cBaseUrl, &cRoom) + _ = user_groups_get_community(conf, &community, &cBaseUrl, &cRoom) LibSessionError.clear(conf) return community.priority @@ -559,7 +559,7 @@ public extension LibSession { } static func conversationInConfig( - _ db: Database? = nil, + _ db: Database, threadId: String, threadVariant: SessionThread.Variant, visibleOnly: Bool, @@ -585,7 +585,7 @@ public extension LibSession { return dependencies.caches[.libSession] .config(for: configVariant, publicKey: userPublicKey) .wrappedValue - .map { conf in + .map { conf -> Bool in guard var cThreadId: [CChar] = threadId.cString(using: .utf8) else { return false } switch threadVariant { @@ -611,8 +611,8 @@ public extension LibSession { return (!visibleOnly || LibSession.shouldBeVisible(priority: contact.priority)) case .community: - let maybeUrlInfo: OpenGroupUrlInfo? = Storage.shared - .read { db in try OpenGroupUrlInfo.fetchAll(db, ids: [threadId]) }? + let maybeUrlInfo: OpenGroupUrlInfo? = (try? OpenGroupUrlInfo + .fetchAll(db, ids: [threadId]))? .first guard diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index d8b8405c6..1b6c5f931 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -1054,30 +1054,28 @@ public final class MessageSender { guard !rowIds.isEmpty else { return error } - // Note: We need to dispatch this after a small 0.01 delay to prevent any potential - // re-entrancy issues since the 'asyncMigrate' returns a result containing a DB instance - // within a transaction - DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 0.01, using: dependencies) { - dependencies.storage.write { db in - switch destination { - case .syncMessage: - try Interaction - .filter(rowIds.contains(Column.rowID)) - .updateAll( - db, - Interaction.Columns.state.set(to: Interaction.State.failedToSync), - Interaction.Columns.mostRecentFailureText.set(to: "\(error)") - ) - - default: - try Interaction - .filter(rowIds.contains(Column.rowID)) - .updateAll( - db, - Interaction.Columns.state.set(to: Interaction.State.failed), - Interaction.Columns.mostRecentFailureText.set(to: "\(error)") - ) - } + /// If we have affected rows then we should update them with the latest error text + /// + /// **Note:** We `writeAsync` here as performing a syncronous `write` results in a reentrancy assertion + dependencies.storage.writeAsync { db in + switch destination { + case .syncMessage: + try Interaction + .filter(rowIds.contains(Column.rowID)) + .updateAll( + db, + Interaction.Columns.state.set(to: Interaction.State.failedToSync), + Interaction.Columns.mostRecentFailureText.set(to: "\(error)") + ) + + default: + try Interaction + .filter(rowIds.contains(Column.rowID)) + .updateAll( + db, + Interaction.Columns.state.set(to: Interaction.State.failed), + Interaction.Columns.mostRecentFailureText.set(to: "\(error)") + ) } } diff --git a/SessionMessagingKit/Utilities/ProfileManager.swift b/SessionMessagingKit/Utilities/ProfileManager.swift index 248865c90..5f6f72b3f 100644 --- a/SessionMessagingKit/Utilities/ProfileManager.swift +++ b/SessionMessagingKit/Utilities/ProfileManager.swift @@ -91,7 +91,7 @@ public struct ProfileManager { .filter(id: profile.id) .updateAll(db, Profile.Columns.profilePictureFileName.set(to: nil)) }, - completion: { _, _ in + completion: { _ in // Try to re-download the avatar if it has a URL if let profilePictureUrl: String = profile.profilePictureUrl, !profilePictureUrl.isEmpty { // FIXME: Refactor avatar downloading to be a proper Job so we can avoid this diff --git a/SessionSnodeKit/LibSession/LibSession+Networking.swift b/SessionSnodeKit/LibSession/LibSession+Networking.swift index a65813518..98d8b9e07 100644 --- a/SessionSnodeKit/LibSession/LibSession+Networking.swift +++ b/SessionSnodeKit/LibSession/LibSession+Networking.swift @@ -574,10 +574,9 @@ private extension LibSession { return Log.error("[LibSession] CallbackWrapper called with null context.") } - /// Dispatch async so we don't block libSession's internals with Swift logic (which can block other requests), we - /// add the `0.01` delay to ensure the closure isn't executed immediately + /// Dispatch async so we don't block libSession's internals with Swift logic (which can block other requests) let wrapper: CallbackWrapper = Unmanaged>.fromOpaque(ctx).takeRetainedValue() - DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 0.01) { [wrapper] in + DispatchQueue.global(qos: .default).async { [wrapper] in wrapper.resultPublisher.send(output) } } diff --git a/SessionUtilitiesKit/Database/Storage.swift b/SessionUtilitiesKit/Database/Storage.swift index 4bd7ef0e2..2323566e4 100644 --- a/SessionUtilitiesKit/Database/Storage.swift +++ b/SessionUtilitiesKit/Database/Storage.swift @@ -29,6 +29,9 @@ open class Storage { /// When attempting to do a write the transaction will wait this long to acquite a lock before failing private static let writeTransactionStartTimeout: TimeInterval = 5 + /// If a transaction takes longer than this duration then we should fail the transaction rather than keep hanging + private static let transactionDeadlockTimeoutSeconds: Int = 5 + private static var sharedDatabaseDirectoryPath: String { "\(FileManager.default.appSharedDataDirectoryPath)/database" } private static var databasePath: String { "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)" } private static var databasePathShm: String { "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)-shm" } @@ -322,17 +325,16 @@ open class Storage { guard async else { return migrationCompleted(Result(try migrator.migrate(dbWriter))) } migrator.asyncMigrate(dbWriter) { result in - let finalResult: Swift.Result = { + let finalResult: Result = { switch result { case .failure(let error): return .failure(error) case .success: return .success(()) } }() - // Note: We need to dispatch this after a small 0.01 delay to prevent any potential - // re-entrancy issues since the 'asyncMigrate' returns a result containing a DB instance - // within a transaction - DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.01, using: dependencies) { + // Note: We need to dispatch this to the next run toop to prevent blocking if the callback + // performs subsequent database operations + DispatchQueue.global(qos: .userInitiated).async(using: dependencies) { migrationCompleted(finalResult) } } @@ -535,13 +537,16 @@ open class Storage { static func logIfNeeded(_ error: Error, isWrite: Bool) { switch error { - case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT: + case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT, DatabaseError.SQLITE_ERROR: let message: String = ((error as? DatabaseError)?.message ?? "Unknown") Log.error("[Storage] Database \(isWrite ? "write" : "read") failed due to error: \(message)") case StorageError.databaseSuspended: Log.error("[Storage] Database \(isWrite ? "write" : "read") failed as the database is suspended.") + case StorageError.transactionDeadlockTimeout: + Log.critical("[Storage] Database \(isWrite ? "write" : "read") failed due to a potential synchronous query deadlock timeout.") + default: break } } @@ -557,71 +562,158 @@ open class Storage { } } - private static func perform( - info: CallInfo, - updates: @escaping (Database) throws -> T - ) -> (Database) throws -> T { - return { db in - guard info.storage?.isSuspended == false else { throw StorageError.databaseSuspended } - - let timer: TransactionTimer = TransactionTimer.start(duration: Storage.slowTransactionThreshold, info: info) - defer { timer.stop() } - - // Get the result - let result: T = try updates(db) - - // Update the state flags - switch info.isWrite { - case true: info.storage?.hasSuccessfullyWritten = true - case false: info.storage?.hasSuccessfullyRead = true + // MARK: - Operations + + private static func track( + _ db: Database, + _ info: CallInfo, + _ operation: @escaping (Database) throws -> T + ) throws -> T { + guard info.storage?.isSuspended == false else { throw StorageError.databaseSuspended } + + let timer: TransactionTimer = TransactionTimer.start( + duration: Storage.slowTransactionThreshold, + info: info + ) + defer { timer.stop() } + + // Get the result + let result: T = try operation(db) + + // Update the state flags + switch info.isWrite { + case true: info.storage?.hasSuccessfullyWritten = true + case false: info.storage?.hasSuccessfullyRead = true + } + + return result + } + + /// This function manually performs `read`/`write` operations in either a synchronous or asyncronous way using a semaphore to + /// block the syncrhonous version because `GRDB` has an internal assertion when using it's built-in synchronous `read`/`write` + /// functions to prevent reentrancy which is unsupported + /// + /// Unfortunately this results in the code getting messy when trying to chain multiple database transactions (even + /// when using `db.afterNextTransaction`) which is somewhat unintuitive + /// + /// The `async` variants don't need to worry about this reentrancy issue so instead we route we use those for all operations instead + /// and just block the thread when we want to perform a synchronous operation + @discardableResult private static func performOperation( + _ info: CallInfo, + _ operation: @escaping (Database) throws -> T, + _ completion: ((Result) -> Void)? = nil + ) -> Result { + var result: Result = .failure(StorageError.invalidQueryResult) + let semaphore: DispatchSemaphore? = (info.isAsync ? nil : DispatchSemaphore(value: 0)) + let logErrorIfNeeded: (Result) -> () = { result in + switch result { + case .success: break + case .failure(let error): StorageState.logIfNeeded(error, isWrite: info.isWrite) } + } + + /// Perform the actual operation + switch (StorageState(info.storage), info.isWrite) { + case (.invalid(let error), _): + result = .failure(error) + semaphore?.signal() - return result + case (.valid(let dbWriter), true): + dbWriter.asyncWrite( + { db in result = .success(try Storage.track(db, info, operation)) }, + completion: { _, dbResult in + switch dbResult { + case .success: break + case .failure(let error): result = .failure(error) + } + semaphore?.signal() + + if info.isAsync { logErrorIfNeeded(result) } + completion?(result) + } + ) + + case (.valid(let dbWriter), false): + dbWriter.asyncRead { dbResult in + do { + switch dbResult { + case .failure(let error): throw error + case .success(let db): result = .success(try Storage.track(db, info, operation)) + } + } catch { + result = .failure(error) + } + semaphore?.signal() + + if info.isAsync { logErrorIfNeeded(result) } + completion?(result) + } + } + + /// If this is a synchronous operation then `semaphore` will exist and will block here waiting on the signal from one of the + /// above closures to be sent + let semaphoreResult: DispatchTimeoutResult? = semaphore?.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds)) + + /// If the transaction timed out then log the error and report a failure + guard semaphoreResult != .timedOut else { + StorageState.logIfNeeded(StorageError.transactionDeadlockTimeout, isWrite: info.isWrite) + return .failure(StorageError.transactionDeadlockTimeout) + } + + if !info.isAsync { logErrorIfNeeded(result) } + return result + } + + private func performPublisherOperation( + _ fileName: String, + _ functionName: String, + _ lineNumber: Int, + isWrite: Bool, + _ operation: @escaping (Database) throws -> T + ) -> AnyPublisher { + switch StorageState(self) { + case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false) + case .valid: + /// **Note:** GRDB does have `readPublisher`/`writePublisher` functions but it appears to asynchronously + /// trigger both the `output` and `complete` closures at the same time which causes a lot of unexpected + /// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code + /// for more information see https://github.com/groue/GRDB.swift/issues/1334) + /// + /// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled + /// which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does + let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, .syncWrite) + return Deferred { + Future { resolver in + resolver(Storage.performOperation(info, operation)) + } + }.eraseToAnyPublisher() } } // MARK: - Functions @discardableResult public func write( - fileName: String = #file, - functionName: String = #function, - lineNumber: Int = #line, + fileName file: String = #file, + functionName funcN: String = #function, + lineNumber line: Int = #line, using dependencies: Dependencies = Dependencies(), updates: @escaping (Database) throws -> T? ) -> T? { - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true) - case .valid(let dbWriter): - let info: CallInfo = CallInfo(fileName, functionName, lineNumber, true, self) - do { return try dbWriter.write(Storage.perform(info: info, updates: updates)) } - catch { return StorageState.logIfNeeded(error, isWrite: true) } + switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncWrite), updates) { + case .failure: return nil + case .success(let result): return result } } open func writeAsync( - fileName: String = #file, - functionName: String = #function, - lineNumber: Int = #line, + fileName file: String = #file, + functionName funcN: String = #function, + lineNumber line: Int = #line, using dependencies: Dependencies = Dependencies(), updates: @escaping (Database) throws -> T, - completion: @escaping (Database, Swift.Result) throws -> Void = { _, _ in } + completion: @escaping (Result) -> Void = { _ in } ) { - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true) - case .valid(let dbWriter): - let info: CallInfo = CallInfo(fileName, functionName, lineNumber, true, self) - dbWriter.asyncWrite( - Storage.perform(info: info, updates: updates), - completion: { db, result in - switch result { - case .failure(let error): StorageState.logIfNeeded(error, isWrite: true) - default: break - } - - try? completion(db, result) - } - ) - } + Storage.performOperation(CallInfo(self, file, funcN, line, .asyncWrite), updates, completion) } open func writePublisher( @@ -631,50 +723,19 @@ open class Storage { using dependencies: Dependencies = Dependencies(), updates: @escaping (Database) throws -> T ) -> AnyPublisher { - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true) - case .valid: - /// **Note:** GRDB does have a `writePublisher` method but it appears to asynchronously trigger - /// both the `output` and `complete` closures at the same time which causes a lot of unexpected - /// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code - /// for more information see https://github.com/groue/GRDB.swift/issues/1334) - /// - /// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled - /// which behaves in a much more expected way than the GRDB `writePublisher` does - let info: CallInfo = CallInfo(fileName, functionName, lineNumber, true, self) - return Deferred { - Future { [weak self] resolver in - /// The `StorageState` may have changed between the creation of the publisher and it actually - /// being executed so we need to check again - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true) - case .valid(let dbWriter): - do { - resolver(Result.success(try dbWriter.write(Storage.perform(info: info, updates: updates)))) - } - catch { - StorageState.logIfNeeded(error, isWrite: true) - resolver(Result.failure(error)) - } - } - } - }.eraseToAnyPublisher() - } + return performPublisherOperation(fileName, functionName, lineNumber, isWrite: true, updates) } @discardableResult public func read( - fileName: String = #file, - functionName: String = #function, - lineNumber: Int = #line, + fileName file: String = #file, + functionName funcN: String = #function, + lineNumber line: Int = #line, using dependencies: Dependencies = Dependencies(), _ value: @escaping (Database) throws -> T? ) -> T? { - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false) - case .valid(let dbWriter): - let info: CallInfo = CallInfo(fileName, functionName, lineNumber, false, self) - do { return try dbWriter.read(Storage.perform(info: info, updates: value)) } - catch { return StorageState.logIfNeeded(error, isWrite: false) } + switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncRead), value) { + case .failure: return nil + case .success(let result): return result } } @@ -685,35 +746,7 @@ open class Storage { using dependencies: Dependencies = Dependencies(), value: @escaping (Database) throws -> T ) -> AnyPublisher { - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false) - case .valid: - /// **Note:** GRDB does have a `readPublisher` method but it appears to asynchronously trigger - /// both the `output` and `complete` closures at the same time which causes a lot of unexpected - /// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code - /// for more information see https://github.com/groue/GRDB.swift/issues/1334) - /// - /// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled - /// which behaves in a much more expected way than the GRDB `readPublisher` does - let info: CallInfo = CallInfo(fileName, functionName, lineNumber, false, self) - return Deferred { - Future { [weak self] resolver in - /// The `StorageState` may have changed between the creation of the publisher and it actually - /// being executed so we need to check again - switch StorageState(self) { - case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false) - case .valid(let dbWriter): - do { - resolver(Result.success(try dbWriter.read(Storage.perform(info: info, updates: value)))) - } - catch { - StorageState.logIfNeeded(error, isWrite: false) - resolver(Result.failure(error)) - } - } - } - }.eraseToAnyPublisher() - } + return performPublisherOperation(fileName, functionName, lineNumber, isWrite: false, value) } /// Rever to the `ValueObservation.start` method for full documentation @@ -904,11 +937,18 @@ public extension Storage { private extension Storage { class CallInfo { + enum Behaviour { + case syncRead + case asyncRead + case syncWrite + case asyncWrite + } + + weak var storage: Storage? let file: String let function: String let line: Int - let isWrite: Bool - weak var storage: Storage? + let behaviour: Behaviour var callInfo: String { let fileInfo: String = (file.components(separatedBy: "/").last.map { "\($0):\(line) - " } ?? "") @@ -916,18 +956,31 @@ private extension Storage { return "\(fileInfo)\(function)" } + var isWrite: Bool { + switch behaviour { + case .syncWrite, .asyncWrite: return true + case .syncRead, .asyncRead: return false + } + } + var isAsync: Bool { + switch behaviour { + case .asyncRead, .asyncWrite: return true + case .syncRead, .syncWrite: return false + } + } + init( + _ storage: Storage?, _ file: String, _ function: String, _ line: Int, - _ isWrite: Bool, - _ storage: Storage? + _ behaviour: Behaviour ) { + self.storage = storage self.file = file self.function = function self.line = line - self.isWrite = isWrite - self.storage = storage + self.behaviour = behaviour } } } diff --git a/SessionUtilitiesKit/Database/StorageError.swift b/SessionUtilitiesKit/Database/StorageError.swift index 6e3f63de9..31981be8d 100644 --- a/SessionUtilitiesKit/Database/StorageError.swift +++ b/SessionUtilitiesKit/Database/StorageError.swift @@ -13,6 +13,8 @@ public enum StorageError: Error { case keySpecCreationFailed case keySpecInaccessible case decodingFailed + case invalidQueryResult + case transactionDeadlockTimeout case failedToSave case objectNotFound diff --git a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift index cd540ee66..a8841305d 100644 --- a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift +++ b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift @@ -180,12 +180,9 @@ public class PagedDatabaseObserver: TransactionObserver where return [] } - // This looks odd but if we just use `commitProcessingQueue.async` then the code can - // get executed immediately wihch can result in a new transaction being started whilst - // we are still within the transaction wrapping `databaseDidCommit` (which we don't - // want), by adding this tiny 0.01 delay we should be giving it enough time to finish - // processing the current transaction - commitProcessingQueue.asyncAfter(deadline: .now() + 0.01) { [weak self] in + // Dispatch to the `commitProcessingQueue` so we don't block the database `write` queue + // when updatind the data + commitProcessingQueue.async { [weak self] in self?.processDatabaseCommit(committedChanges: committedChanges) } } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 4247e8cf1..7f42bba46 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -1298,10 +1298,7 @@ public final class JobQueue: Hashable { // thread and do so by creating a number of background queues to run the jobs on, if this // function was called on the wrong queue then we need to dispatch to the correct one guard DispatchQueue.with(key: queueKey, matches: queueContext, using: dependencies) else { - // Note: We need to dispatch this after a small 0.01 delay to prevent any potential - // re-entrancy issues since the `start` function can be called within an existing - // database transaction (eg. via `db.afterNextTransactionNestedOnce`) - internalQueue.asyncAfter(deadline: .now() + 0.01, using: dependencies) { [weak self] in + internalQueue.async(using: dependencies) { [weak self] in self?.start(forceWhenAlreadyRunning: forceWhenAlreadyRunning, using: dependencies) } return diff --git a/_SharedTestUtilities/SynchronousStorage.swift b/_SharedTestUtilities/SynchronousStorage.swift index e6ee0fb78..c1da90c7f 100644 --- a/_SharedTestUtilities/SynchronousStorage.swift +++ b/_SharedTestUtilities/SynchronousStorage.swift @@ -119,14 +119,14 @@ class SynchronousStorage: Storage { lineNumber: Int = #line, using dependencies: Dependencies = Dependencies(), updates: @escaping (Database) throws -> T, - completion: @escaping (Database, Result) throws -> Void + completion: @escaping (Result) -> Void ) { do { let result: T = try write(using: dependencies, updates: updates) ?? { throw StorageError.failedToSave }() - write(using: dependencies) { db in try completion(db, Result.success(result)) } + completion(Result.success(result)) } catch { - write(using: dependencies) { db in try completion(db, Result.failure(error)) } + completion(Result.failure(error)) } }