Merge pull request #366 from mpretty-cyro/fix/database-reentrancy-handling

Fixed a few issues related to database reentrancy
pull/1061/head
Morgan Pretty 2 months ago committed by GitHub
commit d200878162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -387,7 +387,7 @@ public final class SessionCall: CurrentCallProtocol, WebRTCSessionDelegate {
using: dependencies
)
},
completion: { _, _ in
completion: { _ in
Singleton.callManager.suspendDatabaseIfCallEndedInBackground()
}
)

@ -269,7 +269,7 @@ extension ConversationVC:
updates: { db in
db[.isGiphyEnabled] = true
},
completion: { _, _ in
completion: { _ in
DispatchQueue.main.async {
self?.handleGIFButtonTapped()
}

@ -554,12 +554,15 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa
) &&
viewModel.threadData.threadIsNoteToSelf == false &&
viewModel.threadData.threadShouldBeVisible == false &&
!LibSession.conversationInConfig(
threadId: threadId,
threadVariant: viewModel.threadData.threadVariant,
visibleOnly: false,
using: viewModel.dependencies
)
!Storage.shared.read({ [dependencies = viewModel.dependencies, threadVariant = viewModel.threadData.threadVariant] db in
LibSession.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: false,
using: dependencies
)
}).defaulting(to: false)
{
Storage.shared.writeAsync { db in
_ = try SessionThread // Intentionally use `deleteAll` here instead of `deleteOrLeave`
@ -659,7 +662,7 @@ final class ConversationVC: BaseVC, LibSessionRespondingViewController, Conversa
// PagedDatabaseObserver won't have them so we need to force a re-fetch of the current
// data to ensure everything is up to date
if didReturnFromBackground {
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 0.01) {
DispatchQueue.global(qos: .background).async {
self?.viewModel.pagedDataObserver?.reload()
}
}

@ -198,7 +198,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate, NavigatableStateHold
)
// Run the initial query on a background thread so we don't block the push transition
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.01) { [weak self] in
DispatchQueue.global(qos: .userInitiated).async { [weak self] in
// If we don't have a `initialFocusedInfo` then default to `.pageBefore` (it'll query
// from a `0` offset)
switch (focusedInteractionInfo ?? initialData?.initialUnreadInteractionInfo) {

@ -567,7 +567,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
/// There is a warning which can happen on launch because the Database read can be blocked by another database operation
/// which could result in this blocking the main thread, as a result we want to check the identity exists on a background thread
/// and then return to the main thread only when required
DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 0.01) { [weak self] in
DispatchQueue.global(qos: .default).async { [weak self] in
guard Identity.userExists() else { return }
self?.enableBackgroundRefreshIfNecessary()
@ -682,7 +682,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
/// We want to start observing the changes for the 'HomeVC' and want to wait until we actually get data back before we
/// continue as we don't want to show a blank home screen
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.01) {
DispatchQueue.global(qos: .userInitiated).async {
viewController.startObservingChanges() {
populateHomeScreenTimer.invalidate()
@ -722,7 +722,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
/// On application startup the `Storage.read` can be slightly slow while GRDB spins up it's database
/// read pools (up to a few seconds), since this read is blocking we want to dispatch it to run async to ensure
/// we don't block user interaction while it's running
DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 0.01) {
DispatchQueue.global(qos: .default).async {
let unreadCount: Int = Storage.shared
.read { db in try Interaction.fetchUnreadCount(db) }
.defaulting(to: 0)
@ -817,10 +817,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
/// Start the pollers on a background thread so that any database queries they need to run don't
/// block the main thread
///
/// **Note:** We add a delay of `0.01` to prevent potential database re-entrancy if this is triggered
/// within the completion block of a database transaction, this gives it the time to complete the transaction
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 0.01) { [weak self] in
DispatchQueue.global(qos: .background).async { [weak self] in
self?.poller.start()
guard shouldStartGroupPollers else { return }

@ -292,11 +292,9 @@ public class PushRegistrationManager: NSObject, PKPushRegistryDelegate, PushRegi
dependencies.storage.resumeDatabaseAccess()
LibSession.resumeNetworkAccess()
let maybeCall: SessionCall? = Storage.shared.read { [dependencies = self.dependencies] db in
var call: SessionCall? = nil
let maybeCall: SessionCall? = Storage.shared.read { [dependencies = self.dependencies] db -> SessionCall? in
do {
call = SessionCall(
let call: SessionCall = SessionCall(
db,
for: caller,
uuid: uuid,
@ -309,12 +307,14 @@ public class PushRegistrationManager: NSObject, PKPushRegistryDelegate, PushRegi
.filter(Interaction.Columns.messageUuid == uuid)
.fetchOne(db)
call?.callInteractionId = interaction?.id
} catch {
call.callInteractionId = interaction?.id
return call
}
catch {
SNLog("[Calls] Failed to create call due to error: \(error)")
}
return call
return nil
}
guard let call: SessionCall = maybeCall else {

@ -343,12 +343,10 @@ public enum GarbageCollectionJob: JobExecutor {
.deleteAll(db)
}
},
completion: { _, _ in
completion: { _ in
// Dispatch async so we can swap from the write queue to a read one (we are done
// writing), this has to be done after a slight delay to ensure the transaction
// provided by the completion block completes first (ie. so we don't hit
// re-entrancy issues)
queue.asyncAfter(deadline: .now() + 0.01) {
// writing)
queue.async {
// Retrieve a list of all valid attachmnet and avatar file paths
struct FileInfo {
let attachmentLocalRelativePaths: Set<String>

@ -50,17 +50,16 @@ public enum UpdateProfilePictureJob: JobExecutor {
queue: queue,
displayPictureUpdate: (profilePictureData.map { .currentUserUploadImageData($0) } ?? .none),
success: { db in
// Need to call the 'success' closure asynchronously on the queue after a slight
// delay to prevent a reentrancy issue as it will write to the database and this
// closure is already called within another database write
queue.asyncAfter(deadline: .now() + 0.01) {
queue.async {
SNLog("[UpdateProfilePictureJob] Profile successfully updated")
success(job, false, dependencies)
}
},
failure: { error in
SNLog("[UpdateProfilePictureJob] Failed to update profile")
failure(job, error, false, dependencies)
queue.async {
SNLog("[UpdateProfilePictureJob] Failed to update profile")
failure(job, error, false, dependencies)
}
}
)
}

@ -443,8 +443,8 @@ public extension LibSession {
return contact.priority
case .community:
let maybeUrlInfo: OpenGroupUrlInfo? = Storage.shared
.read { db in try OpenGroupUrlInfo.fetchAll(db, ids: [threadId]) }?
let maybeUrlInfo: OpenGroupUrlInfo? = (try? OpenGroupUrlInfo
.fetchAll(db, ids: [threadId]))?
.first
guard
@ -454,7 +454,7 @@ public extension LibSession {
else { return LibSession.defaultNewThreadPriority }
var community: ugroups_community_info = ugroups_community_info()
let result: Bool = user_groups_get_community(conf, &community, &cBaseUrl, &cRoom)
_ = user_groups_get_community(conf, &community, &cBaseUrl, &cRoom)
LibSessionError.clear(conf)
return community.priority
@ -559,7 +559,7 @@ public extension LibSession {
}
static func conversationInConfig(
_ db: Database? = nil,
_ db: Database,
threadId: String,
threadVariant: SessionThread.Variant,
visibleOnly: Bool,
@ -585,7 +585,7 @@ public extension LibSession {
return dependencies.caches[.libSession]
.config(for: configVariant, publicKey: userPublicKey)
.wrappedValue
.map { conf in
.map { conf -> Bool in
guard var cThreadId: [CChar] = threadId.cString(using: .utf8) else { return false }
switch threadVariant {
@ -611,8 +611,8 @@ public extension LibSession {
return (!visibleOnly || LibSession.shouldBeVisible(priority: contact.priority))
case .community:
let maybeUrlInfo: OpenGroupUrlInfo? = Storage.shared
.read { db in try OpenGroupUrlInfo.fetchAll(db, ids: [threadId]) }?
let maybeUrlInfo: OpenGroupUrlInfo? = (try? OpenGroupUrlInfo
.fetchAll(db, ids: [threadId]))?
.first
guard

@ -1054,30 +1054,28 @@ public final class MessageSender {
guard !rowIds.isEmpty else { return error }
// Note: We need to dispatch this after a small 0.01 delay to prevent any potential
// re-entrancy issues since the 'asyncMigrate' returns a result containing a DB instance
// within a transaction
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 0.01, using: dependencies) {
dependencies.storage.write { db in
switch destination {
case .syncMessage:
try Interaction
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
Interaction.Columns.state.set(to: Interaction.State.failedToSync),
Interaction.Columns.mostRecentFailureText.set(to: "\(error)")
)
default:
try Interaction
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
Interaction.Columns.state.set(to: Interaction.State.failed),
Interaction.Columns.mostRecentFailureText.set(to: "\(error)")
)
}
/// If we have affected rows then we should update them with the latest error text
///
/// **Note:** We `writeAsync` here as performing a syncronous `write` results in a reentrancy assertion
dependencies.storage.writeAsync { db in
switch destination {
case .syncMessage:
try Interaction
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
Interaction.Columns.state.set(to: Interaction.State.failedToSync),
Interaction.Columns.mostRecentFailureText.set(to: "\(error)")
)
default:
try Interaction
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
Interaction.Columns.state.set(to: Interaction.State.failed),
Interaction.Columns.mostRecentFailureText.set(to: "\(error)")
)
}
}

@ -91,7 +91,7 @@ public struct ProfileManager {
.filter(id: profile.id)
.updateAll(db, Profile.Columns.profilePictureFileName.set(to: nil))
},
completion: { _, _ in
completion: { _ in
// Try to re-download the avatar if it has a URL
if let profilePictureUrl: String = profile.profilePictureUrl, !profilePictureUrl.isEmpty {
// FIXME: Refactor avatar downloading to be a proper Job so we can avoid this

@ -574,10 +574,9 @@ private extension LibSession {
return Log.error("[LibSession] CallbackWrapper called with null context.")
}
/// Dispatch async so we don't block libSession's internals with Swift logic (which can block other requests), we
/// add the `0.01` delay to ensure the closure isn't executed immediately
/// Dispatch async so we don't block libSession's internals with Swift logic (which can block other requests)
let wrapper: CallbackWrapper<Output> = Unmanaged<CallbackWrapper<Output>>.fromOpaque(ctx).takeRetainedValue()
DispatchQueue.global(qos: .default).asyncAfter(deadline: .now() + 0.01) { [wrapper] in
DispatchQueue.global(qos: .default).async { [wrapper] in
wrapper.resultPublisher.send(output)
}
}

@ -29,6 +29,9 @@ open class Storage {
/// When attempting to do a write the transaction will wait this long to acquite a lock before failing
private static let writeTransactionStartTimeout: TimeInterval = 5
/// If a transaction takes longer than this duration then we should fail the transaction rather than keep hanging
private static let transactionDeadlockTimeoutSeconds: Int = 5
private static var sharedDatabaseDirectoryPath: String { "\(FileManager.default.appSharedDataDirectoryPath)/database" }
private static var databasePath: String { "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)" }
private static var databasePathShm: String { "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)-shm" }
@ -322,17 +325,16 @@ open class Storage {
guard async else { return migrationCompleted(Result(try migrator.migrate(dbWriter))) }
migrator.asyncMigrate(dbWriter) { result in
let finalResult: Swift.Result<Void, Error> = {
let finalResult: Result<Void, Error> = {
switch result {
case .failure(let error): return .failure(error)
case .success: return .success(())
}
}()
// Note: We need to dispatch this after a small 0.01 delay to prevent any potential
// re-entrancy issues since the 'asyncMigrate' returns a result containing a DB instance
// within a transaction
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.01, using: dependencies) {
// Note: We need to dispatch this to the next run toop to prevent blocking if the callback
// performs subsequent database operations
DispatchQueue.global(qos: .userInitiated).async(using: dependencies) {
migrationCompleted(finalResult)
}
}
@ -535,13 +537,16 @@ open class Storage {
static func logIfNeeded(_ error: Error, isWrite: Bool) {
switch error {
case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT:
case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT, DatabaseError.SQLITE_ERROR:
let message: String = ((error as? DatabaseError)?.message ?? "Unknown")
Log.error("[Storage] Database \(isWrite ? "write" : "read") failed due to error: \(message)")
case StorageError.databaseSuspended:
Log.error("[Storage] Database \(isWrite ? "write" : "read") failed as the database is suspended.")
case StorageError.transactionDeadlockTimeout:
Log.critical("[Storage] Database \(isWrite ? "write" : "read") failed due to a potential synchronous query deadlock timeout.")
default: break
}
}
@ -557,71 +562,158 @@ open class Storage {
}
}
private static func perform<T>(
info: CallInfo,
updates: @escaping (Database) throws -> T
) -> (Database) throws -> T {
return { db in
guard info.storage?.isSuspended == false else { throw StorageError.databaseSuspended }
let timer: TransactionTimer = TransactionTimer.start(duration: Storage.slowTransactionThreshold, info: info)
defer { timer.stop() }
// Get the result
let result: T = try updates(db)
// Update the state flags
switch info.isWrite {
case true: info.storage?.hasSuccessfullyWritten = true
case false: info.storage?.hasSuccessfullyRead = true
// MARK: - Operations
private static func track<T>(
_ db: Database,
_ info: CallInfo,
_ operation: @escaping (Database) throws -> T
) throws -> T {
guard info.storage?.isSuspended == false else { throw StorageError.databaseSuspended }
let timer: TransactionTimer = TransactionTimer.start(
duration: Storage.slowTransactionThreshold,
info: info
)
defer { timer.stop() }
// Get the result
let result: T = try operation(db)
// Update the state flags
switch info.isWrite {
case true: info.storage?.hasSuccessfullyWritten = true
case false: info.storage?.hasSuccessfullyRead = true
}
return result
}
/// This function manually performs `read`/`write` operations in either a synchronous or asyncronous way using a semaphore to
/// block the syncrhonous version because `GRDB` has an internal assertion when using it's built-in synchronous `read`/`write`
/// functions to prevent reentrancy which is unsupported
///
/// Unfortunately this results in the code getting messy when trying to chain multiple database transactions (even
/// when using `db.afterNextTransaction`) which is somewhat unintuitive
///
/// The `async` variants don't need to worry about this reentrancy issue so instead we route we use those for all operations instead
/// and just block the thread when we want to perform a synchronous operation
@discardableResult private static func performOperation<T>(
_ info: CallInfo,
_ operation: @escaping (Database) throws -> T,
_ completion: ((Result<T, Error>) -> Void)? = nil
) -> Result<T, Error> {
var result: Result<T, Error> = .failure(StorageError.invalidQueryResult)
let semaphore: DispatchSemaphore? = (info.isAsync ? nil : DispatchSemaphore(value: 0))
let logErrorIfNeeded: (Result<T, Error>) -> () = { result in
switch result {
case .success: break
case .failure(let error): StorageState.logIfNeeded(error, isWrite: info.isWrite)
}
}
/// Perform the actual operation
switch (StorageState(info.storage), info.isWrite) {
case (.invalid(let error), _):
result = .failure(error)
semaphore?.signal()
return result
case (.valid(let dbWriter), true):
dbWriter.asyncWrite(
{ db in result = .success(try Storage.track(db, info, operation)) },
completion: { _, dbResult in
switch dbResult {
case .success: break
case .failure(let error): result = .failure(error)
}
semaphore?.signal()
if info.isAsync { logErrorIfNeeded(result) }
completion?(result)
}
)
case (.valid(let dbWriter), false):
dbWriter.asyncRead { dbResult in
do {
switch dbResult {
case .failure(let error): throw error
case .success(let db): result = .success(try Storage.track(db, info, operation))
}
} catch {
result = .failure(error)
}
semaphore?.signal()
if info.isAsync { logErrorIfNeeded(result) }
completion?(result)
}
}
/// If this is a synchronous operation then `semaphore` will exist and will block here waiting on the signal from one of the
/// above closures to be sent
let semaphoreResult: DispatchTimeoutResult? = semaphore?.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
/// If the transaction timed out then log the error and report a failure
guard semaphoreResult != .timedOut else {
StorageState.logIfNeeded(StorageError.transactionDeadlockTimeout, isWrite: info.isWrite)
return .failure(StorageError.transactionDeadlockTimeout)
}
if !info.isAsync { logErrorIfNeeded(result) }
return result
}
private func performPublisherOperation<T>(
_ fileName: String,
_ functionName: String,
_ lineNumber: Int,
isWrite: Bool,
_ operation: @escaping (Database) throws -> T
) -> AnyPublisher<T, Error> {
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false)
case .valid:
/// **Note:** GRDB does have `readPublisher`/`writePublisher` functions but it appears to asynchronously
/// trigger both the `output` and `complete` closures at the same time which causes a lot of unexpected
/// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code
/// for more information see https://github.com/groue/GRDB.swift/issues/1334)
///
/// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled
/// which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does
let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, .syncWrite)
return Deferred {
Future { resolver in
resolver(Storage.performOperation(info, operation))
}
}.eraseToAnyPublisher()
}
}
// MARK: - Functions
@discardableResult public func write<T>(
fileName: String = #file,
functionName: String = #function,
lineNumber: Int = #line,
fileName file: String = #file,
functionName funcN: String = #function,
lineNumber line: Int = #line,
using dependencies: Dependencies = Dependencies(),
updates: @escaping (Database) throws -> T?
) -> T? {
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true)
case .valid(let dbWriter):
let info: CallInfo = CallInfo(fileName, functionName, lineNumber, true, self)
do { return try dbWriter.write(Storage.perform(info: info, updates: updates)) }
catch { return StorageState.logIfNeeded(error, isWrite: true) }
switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncWrite), updates) {
case .failure: return nil
case .success(let result): return result
}
}
open func writeAsync<T>(
fileName: String = #file,
functionName: String = #function,
lineNumber: Int = #line,
fileName file: String = #file,
functionName funcN: String = #function,
lineNumber line: Int = #line,
using dependencies: Dependencies = Dependencies(),
updates: @escaping (Database) throws -> T,
completion: @escaping (Database, Swift.Result<T, Error>) throws -> Void = { _, _ in }
completion: @escaping (Result<T, Error>) -> Void = { _ in }
) {
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true)
case .valid(let dbWriter):
let info: CallInfo = CallInfo(fileName, functionName, lineNumber, true, self)
dbWriter.asyncWrite(
Storage.perform(info: info, updates: updates),
completion: { db, result in
switch result {
case .failure(let error): StorageState.logIfNeeded(error, isWrite: true)
default: break
}
try? completion(db, result)
}
)
}
Storage.performOperation(CallInfo(self, file, funcN, line, .asyncWrite), updates, completion)
}
open func writePublisher<T>(
@ -631,50 +723,19 @@ open class Storage {
using dependencies: Dependencies = Dependencies(),
updates: @escaping (Database) throws -> T
) -> AnyPublisher<T, Error> {
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true)
case .valid:
/// **Note:** GRDB does have a `writePublisher` method but it appears to asynchronously trigger
/// both the `output` and `complete` closures at the same time which causes a lot of unexpected
/// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code
/// for more information see https://github.com/groue/GRDB.swift/issues/1334)
///
/// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled
/// which behaves in a much more expected way than the GRDB `writePublisher` does
let info: CallInfo = CallInfo(fileName, functionName, lineNumber, true, self)
return Deferred {
Future { [weak self] resolver in
/// The `StorageState` may have changed between the creation of the publisher and it actually
/// being executed so we need to check again
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: true)
case .valid(let dbWriter):
do {
resolver(Result.success(try dbWriter.write(Storage.perform(info: info, updates: updates))))
}
catch {
StorageState.logIfNeeded(error, isWrite: true)
resolver(Result.failure(error))
}
}
}
}.eraseToAnyPublisher()
}
return performPublisherOperation(fileName, functionName, lineNumber, isWrite: true, updates)
}
@discardableResult public func read<T>(
fileName: String = #file,
functionName: String = #function,
lineNumber: Int = #line,
fileName file: String = #file,
functionName funcN: String = #function,
lineNumber line: Int = #line,
using dependencies: Dependencies = Dependencies(),
_ value: @escaping (Database) throws -> T?
) -> T? {
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false)
case .valid(let dbWriter):
let info: CallInfo = CallInfo(fileName, functionName, lineNumber, false, self)
do { return try dbWriter.read(Storage.perform(info: info, updates: value)) }
catch { return StorageState.logIfNeeded(error, isWrite: false) }
switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncRead), value) {
case .failure: return nil
case .success(let result): return result
}
}
@ -685,35 +746,7 @@ open class Storage {
using dependencies: Dependencies = Dependencies(),
value: @escaping (Database) throws -> T
) -> AnyPublisher<T, Error> {
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false)
case .valid:
/// **Note:** GRDB does have a `readPublisher` method but it appears to asynchronously trigger
/// both the `output` and `complete` closures at the same time which causes a lot of unexpected
/// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code
/// for more information see https://github.com/groue/GRDB.swift/issues/1334)
///
/// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled
/// which behaves in a much more expected way than the GRDB `readPublisher` does
let info: CallInfo = CallInfo(fileName, functionName, lineNumber, false, self)
return Deferred {
Future { [weak self] resolver in
/// The `StorageState` may have changed between the creation of the publisher and it actually
/// being executed so we need to check again
switch StorageState(self) {
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false)
case .valid(let dbWriter):
do {
resolver(Result.success(try dbWriter.read(Storage.perform(info: info, updates: value))))
}
catch {
StorageState.logIfNeeded(error, isWrite: false)
resolver(Result.failure(error))
}
}
}
}.eraseToAnyPublisher()
}
return performPublisherOperation(fileName, functionName, lineNumber, isWrite: false, value)
}
/// Rever to the `ValueObservation.start` method for full documentation
@ -904,11 +937,18 @@ public extension Storage {
private extension Storage {
class CallInfo {
enum Behaviour {
case syncRead
case asyncRead
case syncWrite
case asyncWrite
}
weak var storage: Storage?
let file: String
let function: String
let line: Int
let isWrite: Bool
weak var storage: Storage?
let behaviour: Behaviour
var callInfo: String {
let fileInfo: String = (file.components(separatedBy: "/").last.map { "\($0):\(line) - " } ?? "")
@ -916,18 +956,31 @@ private extension Storage {
return "\(fileInfo)\(function)"
}
var isWrite: Bool {
switch behaviour {
case .syncWrite, .asyncWrite: return true
case .syncRead, .asyncRead: return false
}
}
var isAsync: Bool {
switch behaviour {
case .asyncRead, .asyncWrite: return true
case .syncRead, .syncWrite: return false
}
}
init(
_ storage: Storage?,
_ file: String,
_ function: String,
_ line: Int,
_ isWrite: Bool,
_ storage: Storage?
_ behaviour: Behaviour
) {
self.storage = storage
self.file = file
self.function = function
self.line = line
self.isWrite = isWrite
self.storage = storage
self.behaviour = behaviour
}
}
}

@ -13,6 +13,8 @@ public enum StorageError: Error {
case keySpecCreationFailed
case keySpecInaccessible
case decodingFailed
case invalidQueryResult
case transactionDeadlockTimeout
case failedToSave
case objectNotFound

@ -180,12 +180,9 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
return []
}
// This looks odd but if we just use `commitProcessingQueue.async` then the code can
// get executed immediately wihch can result in a new transaction being started whilst
// we are still within the transaction wrapping `databaseDidCommit` (which we don't
// want), by adding this tiny 0.01 delay we should be giving it enough time to finish
// processing the current transaction
commitProcessingQueue.asyncAfter(deadline: .now() + 0.01) { [weak self] in
// Dispatch to the `commitProcessingQueue` so we don't block the database `write` queue
// when updatind the data
commitProcessingQueue.async { [weak self] in
self?.processDatabaseCommit(committedChanges: committedChanges)
}
}

@ -1298,10 +1298,7 @@ public final class JobQueue: Hashable {
// thread and do so by creating a number of background queues to run the jobs on, if this
// function was called on the wrong queue then we need to dispatch to the correct one
guard DispatchQueue.with(key: queueKey, matches: queueContext, using: dependencies) else {
// Note: We need to dispatch this after a small 0.01 delay to prevent any potential
// re-entrancy issues since the `start` function can be called within an existing
// database transaction (eg. via `db.afterNextTransactionNestedOnce`)
internalQueue.asyncAfter(deadline: .now() + 0.01, using: dependencies) { [weak self] in
internalQueue.async(using: dependencies) { [weak self] in
self?.start(forceWhenAlreadyRunning: forceWhenAlreadyRunning, using: dependencies)
}
return

@ -119,14 +119,14 @@ class SynchronousStorage: Storage {
lineNumber: Int = #line,
using dependencies: Dependencies = Dependencies(),
updates: @escaping (Database) throws -> T,
completion: @escaping (Database, Result<T, Error>) throws -> Void
completion: @escaping (Result<T, Error>) -> Void
) {
do {
let result: T = try write(using: dependencies, updates: updates) ?? { throw StorageError.failedToSave }()
write(using: dependencies) { db in try completion(db, Result.success(result)) }
completion(Result.success(result))
}
catch {
write(using: dependencies) { db in try completion(db, Result.failure(error)) }
completion(Result.failure(error))
}
}

Loading…
Cancel
Save