|
|
|
@ -53,6 +53,9 @@ open class Storage {
|
|
|
|
|
private static let writeTransactionStartTimeout: TimeInterval = 5
|
|
|
|
|
|
|
|
|
|
/// If a transaction takes longer than this duration then we should fail the transaction rather than keep hanging
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** This timeout only applies to synchronous operations (the assumption being that if we know an operation is going to
|
|
|
|
|
/// take a long time then we should probably be handling it asynchronously rather than a synchronous way)
|
|
|
|
|
private static let transactionDeadlockTimeoutSeconds: Int = 5
|
|
|
|
|
|
|
|
|
|
private static var sharedDatabaseDirectoryPath: String { "\(SessionFileManager.nonInjectedAppSharedDataDirectoryPath)/database" }
|
|
|
|
@ -88,6 +91,9 @@ open class Storage {
|
|
|
|
|
/// This property gets set the first time we successfully write to the database
|
|
|
|
|
public private(set) var hasSuccessfullyWritten: Bool = false
|
|
|
|
|
|
|
|
|
|
/// This property keeps track of all current database tasks and can be used when suspending the database to explicitly
|
|
|
|
|
/// cancel any currently running tasks
|
|
|
|
|
@ThreadSafeObject private var currentTasks: Set<Task<(), Never>> = []
|
|
|
|
|
|
|
|
|
|
// MARK: - Initialization
|
|
|
|
|
|
|
|
|
@ -142,11 +148,6 @@ open class Storage {
|
|
|
|
|
var config = Configuration()
|
|
|
|
|
config.label = Storage.queuePrefix
|
|
|
|
|
config.maximumReaderCount = 10 /// Increase the max read connection limit - Default is 5
|
|
|
|
|
|
|
|
|
|
/// It seems we should do this per https://github.com/groue/GRDB.swift/pull/1485 but with this change
|
|
|
|
|
/// we then need to define how long a write transaction should wait for before timing out (read transactions always run
|
|
|
|
|
/// in`DEFERRED` mode so won't be affected by these settings)
|
|
|
|
|
config.defaultTransactionKind = .immediate
|
|
|
|
|
config.busyMode = .timeout(Storage.writeTransactionStartTimeout)
|
|
|
|
|
|
|
|
|
|
/// Load in the SQLCipher keys
|
|
|
|
@ -485,7 +486,12 @@ open class Storage {
|
|
|
|
|
guard !isSuspended else { return }
|
|
|
|
|
|
|
|
|
|
isSuspended = true
|
|
|
|
|
Log.info(.storage, "Database access suspended.")
|
|
|
|
|
Log.info(.storage, "Database access suspended - cancelling \(currentTasks.count) running task(s).")
|
|
|
|
|
|
|
|
|
|
/// Before triggering an `interrupt` (which will forcibly kill in-progress database queries) we want to try to cancel all
|
|
|
|
|
/// database tasks to give them a small chance to resolve cleanly before we take a brute-force approach
|
|
|
|
|
currentTasks.forEach { $0.cancel() }
|
|
|
|
|
_currentTasks.performUpdate { _ in [] }
|
|
|
|
|
|
|
|
|
|
/// Interrupt any open transactions (if this function is called then we are expecting that all processes have finished running
|
|
|
|
|
/// and don't actually want any more transactions to occur)
|
|
|
|
@ -551,6 +557,13 @@ open class Storage {
|
|
|
|
|
case valid(DatabaseWriter)
|
|
|
|
|
case invalid(Error)
|
|
|
|
|
|
|
|
|
|
var forcedError: Error {
|
|
|
|
|
switch self {
|
|
|
|
|
case .valid: return StorageError.validStorageIncorrectlyHandledAsError
|
|
|
|
|
case .invalid(let error): return error
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
init(_ storage: Storage?) {
|
|
|
|
|
switch (storage?.isSuspended, storage?.isValid, storage?.dbWriter) {
|
|
|
|
|
case (true, _, _): self = .invalid(StorageError.databaseSuspended)
|
|
|
|
@ -559,38 +572,46 @@ open class Storage {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static func logIfNeeded(_ error: Error, isWrite: Bool) {
|
|
|
|
|
fileprivate static func logIfNeeded(_ error: Error, info: Storage.CallInfo) {
|
|
|
|
|
let action: String = (info.isWrite ? "write" : "read")
|
|
|
|
|
|
|
|
|
|
switch error {
|
|
|
|
|
case DatabaseError.SQLITE_ABORT, DatabaseError.SQLITE_INTERRUPT, DatabaseError.SQLITE_ERROR:
|
|
|
|
|
let message: String = ((error as? DatabaseError)?.message ?? "Unknown")
|
|
|
|
|
Log.error(.storage, "Database \(isWrite ? "write" : "read") failed due to error: \(message)")
|
|
|
|
|
Log.error(.storage, "Database \(action) failed due to error: \(message) - [ \(info.callInfo) ]")
|
|
|
|
|
|
|
|
|
|
case StorageError.databaseInvalid:
|
|
|
|
|
Log.error(.storage, "Database \(isWrite ? "write" : "read") failed as the database is invalid.")
|
|
|
|
|
Log.error(.storage, "Database \(action) failed as the database is invalid - [ \(info.callInfo) ]")
|
|
|
|
|
|
|
|
|
|
case StorageError.databaseSuspended:
|
|
|
|
|
Log.error(.storage, "Database \(isWrite ? "write" : "read") failed as the database is suspended.")
|
|
|
|
|
Log.error(.storage, "Database \(action) failed as the database is suspended - [ \(info.callInfo) ]")
|
|
|
|
|
|
|
|
|
|
case StorageError.transactionDeadlockTimeout:
|
|
|
|
|
Log.critical("[Storage] Database \(isWrite ? "write" : "read") failed due to a potential synchronous query deadlock timeout.")
|
|
|
|
|
Log.critical(.storage, "Database \(action) failed due to a potential synchronous query deadlock timeout - [ \(info.callInfo) ]")
|
|
|
|
|
|
|
|
|
|
default: break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static func logIfNeeded<T>(_ error: Error, isWrite: Bool) -> T? {
|
|
|
|
|
logIfNeeded(error, isWrite: isWrite)
|
|
|
|
|
fileprivate static func logIfNeeded<T>(_ error: Error, info: Storage.CallInfo) -> T? {
|
|
|
|
|
logIfNeeded(error, info: info)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static func logIfNeeded<T>(_ error: Error, isWrite: Bool) -> AnyPublisher<T, Error> {
|
|
|
|
|
logIfNeeded(error, isWrite: isWrite)
|
|
|
|
|
fileprivate static func logIfNeeded<T>(_ error: Error, info: Storage.CallInfo) -> AnyPublisher<T, Error> {
|
|
|
|
|
logIfNeeded(error, info: info)
|
|
|
|
|
return Fail<T, Error>(error: error).eraseToAnyPublisher()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MARK: - Operations
|
|
|
|
|
|
|
|
|
|
/// Internal type to wrap the database operation `Task` so it can be cancelled when used with `Combine` (since GRDB doesn't
|
|
|
|
|
/// actually handle publishers publishers)
|
|
|
|
|
final class TaskHolder {
|
|
|
|
|
var task: Task<(), Never>?
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static func track<T>(
|
|
|
|
|
_ db: Database,
|
|
|
|
|
_ info: CallInfo,
|
|
|
|
@ -634,122 +655,114 @@ open class Storage {
|
|
|
|
|
_ dependencies: Dependencies,
|
|
|
|
|
_ operation: @escaping (Database) throws -> T,
|
|
|
|
|
_ asyncCompletion: ((Result<T, Error>) -> Void)? = nil
|
|
|
|
|
) -> Result<T, Error> {
|
|
|
|
|
// A serial queue for synchronizing completion updates.
|
|
|
|
|
let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue")
|
|
|
|
|
) -> (result: Result<T, Error>, task: Task<(), Never>?) {
|
|
|
|
|
/// Ensure we are in a valid state
|
|
|
|
|
let storageState: StorageState = StorageState(info.storage)
|
|
|
|
|
|
|
|
|
|
guard case .valid(let dbWriter) = storageState else {
|
|
|
|
|
if info.isAsync { asyncCompletion?(.failure(storageState.forcedError)) }
|
|
|
|
|
return (.failure(storageState.forcedError), nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
weak var queryDb: Database?
|
|
|
|
|
var didTimeout: Bool = false
|
|
|
|
|
/// Setup required variables
|
|
|
|
|
let syncQueue = DispatchQueue(label: "com.session.performOperation.syncQueue")
|
|
|
|
|
let semaphore: DispatchSemaphore = DispatchSemaphore(value: 0)
|
|
|
|
|
var operationResult: Result<T, Error>?
|
|
|
|
|
let semaphore: DispatchSemaphore? = (info.isAsync ? nil : DispatchSemaphore(value: 0))
|
|
|
|
|
var operationTask: Task<(), Never>?
|
|
|
|
|
let logErrorIfNeeded: (Result<T, Error>) -> Result<T, Error> = { result in
|
|
|
|
|
switch result {
|
|
|
|
|
case .success: break
|
|
|
|
|
case .failure(let error): StorageState.logIfNeeded(error, isWrite: info.isWrite)
|
|
|
|
|
case .failure(let error): StorageState.logIfNeeded(error, info: info)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Convenience function to remove duplication
|
|
|
|
|
func completeOperation(with result: Result<T, Error>) {
|
|
|
|
|
syncQueue.sync {
|
|
|
|
|
guard !didTimeout && operationResult == nil else { return }
|
|
|
|
|
guard operationResult == nil else { return }
|
|
|
|
|
info.storage?.removeTask(operationTask)
|
|
|
|
|
operationResult = result
|
|
|
|
|
semaphore?.signal()
|
|
|
|
|
semaphore.signal()
|
|
|
|
|
|
|
|
|
|
// For async operations, log and invoke the completion closure.
|
|
|
|
|
/// For async operations, log and invoke the completion closure.
|
|
|
|
|
if info.isAsync {
|
|
|
|
|
asyncCompletion?(logErrorIfNeeded(result))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Perform the actual operation
|
|
|
|
|
switch (StorageState(info.storage), info.isWrite) {
|
|
|
|
|
case (.invalid(let error), _): completeOperation(with: .failure(error))
|
|
|
|
|
case (.valid(let dbWriter), true):
|
|
|
|
|
dbWriter.asyncWrite(
|
|
|
|
|
{ db in
|
|
|
|
|
syncQueue.sync { queryDb = db }
|
|
|
|
|
defer { syncQueue.sync { queryDb = nil } }
|
|
|
|
|
|
|
|
|
|
let task: Task<(), Never> = Task {
|
|
|
|
|
return await withThrowingTaskGroup(of: T.self) { group in
|
|
|
|
|
/// Add the task to perform the actual database operation
|
|
|
|
|
group.addTask {
|
|
|
|
|
let trackedOperation: @Sendable (Database) throws -> T = { db in
|
|
|
|
|
if dependencies[feature: .forceSlowDatabaseQueries] {
|
|
|
|
|
Thread.sleep(forTimeInterval: 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return try Storage.track(db, info, operation)
|
|
|
|
|
},
|
|
|
|
|
completion: { _, dbResult in completeOperation(with: dbResult) }
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return (info.isWrite ?
|
|
|
|
|
try await dbWriter.write(trackedOperation) :
|
|
|
|
|
try await dbWriter.read(trackedOperation)
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case (.valid(let dbWriter), false):
|
|
|
|
|
dbWriter.asyncRead { dbResult in
|
|
|
|
|
do {
|
|
|
|
|
switch dbResult {
|
|
|
|
|
case .failure(let error): throw error
|
|
|
|
|
case .success(let db):
|
|
|
|
|
syncQueue.sync { queryDb = db }
|
|
|
|
|
defer { syncQueue.sync { queryDb = nil } }
|
|
|
|
|
|
|
|
|
|
if dependencies[feature: .forceSlowDatabaseQueries] {
|
|
|
|
|
Thread.sleep(forTimeInterval: 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
completeOperation(with: .success(try Storage.track(db, info, operation)))
|
|
|
|
|
/// If this is a syncronous task then we want to the operation to timeout to ensure we don't unintentionally
|
|
|
|
|
/// create a deadlock
|
|
|
|
|
if !info.isAsync {
|
|
|
|
|
group.addTask {
|
|
|
|
|
let timeoutNanoseconds: UInt64 = UInt64(Storage.transactionDeadlockTimeoutSeconds * 1_000_000_000)
|
|
|
|
|
|
|
|
|
|
/// If the debugger is attached then we want to have a lot of shorter sleep iterations as the clock doesn't get
|
|
|
|
|
/// paused when stopped on a breakpoint (and we don't want to end up having a bunch of false positive
|
|
|
|
|
/// database timeouts while debugging code)
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** `isDebuggerAttached` will always return `false` in production builds
|
|
|
|
|
if isDebuggerAttached() {
|
|
|
|
|
let numIterations: UInt64 = 50
|
|
|
|
|
|
|
|
|
|
for _ in (0..<numIterations) {
|
|
|
|
|
try await Task.sleep(nanoseconds: (timeoutNanoseconds / numIterations))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
completeOperation(with: .failure(error))
|
|
|
|
|
else if info.isWrite {
|
|
|
|
|
/// This if statement is redundant **but** it means when we get symbolicated crash logs we can distinguish
|
|
|
|
|
/// between the database threads which are reading and writing
|
|
|
|
|
try await Task.sleep(nanoseconds: timeoutNanoseconds)
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
try await Task.sleep(nanoseconds: timeoutNanoseconds)
|
|
|
|
|
}
|
|
|
|
|
throw StorageError.transactionDeadlockTimeout
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// If this is a synchronous operation then `semaphore` will exist and will block here waiting on the signal from one of the
|
|
|
|
|
/// above closures to be sent
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** Unfortunately this timeout can be really annoying when debugging because the semaphore timeout is based on
|
|
|
|
|
/// system time which doesn't get paused when stopping on a breakpoint (which means if you break in the middle of a database
|
|
|
|
|
/// query it's pretty much guaranteed to timeout)
|
|
|
|
|
///
|
|
|
|
|
/// To try to avoid this we have the below code to try to replicate the behaviour of the proper semaphore timeout while the debugger
|
|
|
|
|
/// is attached as this approach does seem to get paused (or at least only perform a single iteration per debugger step)
|
|
|
|
|
if let semaphore: DispatchSemaphore = semaphore {
|
|
|
|
|
var semaphoreResult: DispatchTimeoutResult
|
|
|
|
|
|
|
|
|
|
#if DEBUG
|
|
|
|
|
if isDebuggerAttached() {
|
|
|
|
|
semaphoreResult = debugWait(semaphore: semaphore, info: info)
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
semaphoreResult = semaphore.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
/// This if statement is redundant **but** it means when we get symbolicated crash logs we can distinguish
|
|
|
|
|
/// between the database threads which are reading and writing
|
|
|
|
|
if info.isWrite {
|
|
|
|
|
semaphoreResult = semaphore.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
semaphoreResult = semaphore.wait(timeout: .now() + .seconds(Storage.transactionDeadlockTimeoutSeconds))
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/// Check if the query timed out in the `syncQueue` to ensure that we don't run into a race condition between handling
|
|
|
|
|
/// the timeout and handling the query completion
|
|
|
|
|
///
|
|
|
|
|
/// If it did timeout then we should interrupt the query (don't want the query thread to remain blocked when we've
|
|
|
|
|
/// already handled it as a failure)
|
|
|
|
|
syncQueue.sync {
|
|
|
|
|
guard semaphoreResult == .timedOut && operationResult == nil else { return }
|
|
|
|
|
|
|
|
|
|
didTimeout = true
|
|
|
|
|
queryDb?.interrupt()
|
|
|
|
|
/// Wait for the first task to finish
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** THe case where `nextResult` returns `nil` is only meant to happen when the group has no
|
|
|
|
|
/// tasks, so shouldn't be considered a valid case (hence the `invalidQueryResult` fallback)
|
|
|
|
|
let result: Result<T, Error>? = await group.nextResult()
|
|
|
|
|
group.cancelAll()
|
|
|
|
|
completeOperation(with: result ?? .failure(StorageError.invalidQueryResult))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return logErrorIfNeeded(operationResult ?? .failure(StorageError.transactionDeadlockTimeout))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Store the task in case we want to
|
|
|
|
|
info.storage?.addTask(task)
|
|
|
|
|
operationTask = task
|
|
|
|
|
|
|
|
|
|
/// For the `async` operation the returned value should be ignored so just return the `invalidQueryResult` error
|
|
|
|
|
return .failure(StorageError.invalidQueryResult)
|
|
|
|
|
guard !info.isAsync else {
|
|
|
|
|
return (.failure(StorageError.invalidQueryResult), task)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Block until we have a result
|
|
|
|
|
semaphore.wait()
|
|
|
|
|
return (logErrorIfNeeded(operationResult ?? .failure(StorageError.transactionDeadlockTimeout)), task)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private func performPublisherOperation<T>(
|
|
|
|
@ -759,62 +772,47 @@ open class Storage {
|
|
|
|
|
isWrite: Bool,
|
|
|
|
|
_ operation: @escaping (Database) throws -> T
|
|
|
|
|
) -> AnyPublisher<T, Error> {
|
|
|
|
|
let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, (isWrite ? .asyncWrite : .asyncRead))
|
|
|
|
|
|
|
|
|
|
switch StorageState(self) {
|
|
|
|
|
case .invalid(let error): return StorageState.logIfNeeded(error, isWrite: false)
|
|
|
|
|
case .invalid(let error): return StorageState.logIfNeeded(error, info: info)
|
|
|
|
|
case .valid:
|
|
|
|
|
/// **Note:** GRDB does have `readPublisher`/`writePublisher` functions but it appears to asynchronously
|
|
|
|
|
/// trigger both the `output` and `complete` closures at the same time which causes a lot of unexpected
|
|
|
|
|
/// behaviours (this behaviour is apparently expected but still causes a number of odd behaviours in our code
|
|
|
|
|
/// for more information see https://github.com/groue/GRDB.swift/issues/1334)
|
|
|
|
|
///
|
|
|
|
|
/// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduled
|
|
|
|
|
/// which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does
|
|
|
|
|
let info: CallInfo = CallInfo(self, fileName, functionName, lineNumber, .syncWrite)
|
|
|
|
|
/// Instead of this we are just using `Deferred { Future {} }` which is executed on the specified scheduler
|
|
|
|
|
/// (which behaves in a much more expected way than the GRDB `readPublisher`/`writePublisher` does)
|
|
|
|
|
/// and hooking that into our `performOperation` function which uses the GRDB async/await functions that support
|
|
|
|
|
/// cancellation (as we want to support cancellation as well)
|
|
|
|
|
let holder: TaskHolder = TaskHolder()
|
|
|
|
|
|
|
|
|
|
return Deferred { [dependencies] in
|
|
|
|
|
Future { resolver in
|
|
|
|
|
resolver(Storage.performOperation(info, dependencies, operation))
|
|
|
|
|
let (_, task) = Storage.performOperation(info, dependencies, operation) { result in
|
|
|
|
|
resolver(result)
|
|
|
|
|
}
|
|
|
|
|
holder.task = task
|
|
|
|
|
}
|
|
|
|
|
}.eraseToAnyPublisher()
|
|
|
|
|
}
|
|
|
|
|
.handleEvents(receiveCancel: { [weak self] in
|
|
|
|
|
holder.task?.cancel()
|
|
|
|
|
self?.removeTask(holder.task)
|
|
|
|
|
})
|
|
|
|
|
.eraseToAnyPublisher()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static func debugWait(semaphore: DispatchSemaphore, info: CallInfo) -> DispatchTimeoutResult {
|
|
|
|
|
let pollQueue: DispatchQueue = DispatchQueue(label: "com.session.debugWaitTimer.\(UUID().uuidString)")
|
|
|
|
|
let standardPollInterval: DispatchTimeInterval = .milliseconds(100)
|
|
|
|
|
var iterations: Int = 0
|
|
|
|
|
let maxIterations: Int = ((Storage.transactionDeadlockTimeoutSeconds * 1000) / standardPollInterval.milliseconds)
|
|
|
|
|
let pollCompletionSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0)
|
|
|
|
|
|
|
|
|
|
/// Stagger the size of the `pollIntervals` to avoid holding up the thread in case the query resolves very quickly (this
|
|
|
|
|
/// means the timeout will occur ~500ms early but helps prevent false main thread lag appearing when debugging that wouldn't
|
|
|
|
|
/// affect production)
|
|
|
|
|
let pollIntervals: [DispatchTimeInterval] = [
|
|
|
|
|
.milliseconds(5), .milliseconds(5), .milliseconds(10), .milliseconds(10), .milliseconds(10),
|
|
|
|
|
standardPollInterval
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
func pollSemaphore() {
|
|
|
|
|
iterations += 1
|
|
|
|
|
|
|
|
|
|
guard iterations < maxIterations && semaphore.wait(timeout: .now()) != .success else {
|
|
|
|
|
pollCompletionSemaphore.signal()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let nextInterval: DispatchTimeInterval = pollIntervals[min(iterations, pollIntervals.count - 1)]
|
|
|
|
|
pollQueue.asyncAfter(deadline: .now() + nextInterval) {
|
|
|
|
|
pollSemaphore()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Poll the semaphore in a background queue
|
|
|
|
|
pollQueue.asyncAfter(deadline: .now() + pollIntervals[0]) { pollSemaphore() }
|
|
|
|
|
pollCompletionSemaphore.wait() // Wait indefinitely for the timer semaphore
|
|
|
|
|
|
|
|
|
|
return (iterations >= 50 ? .timedOut : .success)
|
|
|
|
|
// MARK: - Functions
|
|
|
|
|
|
|
|
|
|
private func addTask(_ task: Task<(), Never>) {
|
|
|
|
|
_currentTasks.performUpdate { $0.inserting(task) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MARK: - Functions
|
|
|
|
|
private func removeTask(_ task: Task<(), Never>?) {
|
|
|
|
|
_currentTasks.performUpdate { $0.removing(task) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@discardableResult public func write<T>(
|
|
|
|
|
fileName file: String = #file,
|
|
|
|
@ -823,8 +821,8 @@ open class Storage {
|
|
|
|
|
updates: @escaping (Database) throws -> T?
|
|
|
|
|
) -> T? {
|
|
|
|
|
switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncWrite), dependencies, updates) {
|
|
|
|
|
case .failure: return nil
|
|
|
|
|
case .success(let result): return result
|
|
|
|
|
case (.failure, _): return nil
|
|
|
|
|
case (.success(let result), _): return result
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -854,8 +852,8 @@ open class Storage {
|
|
|
|
|
_ value: @escaping (Database) throws -> T?
|
|
|
|
|
) -> T? {
|
|
|
|
|
switch Storage.performOperation(CallInfo(self, file, funcN, line, .syncRead), dependencies, value) {
|
|
|
|
|
case .failure: return nil
|
|
|
|
|
case .success(let result): return result
|
|
|
|
|
case (.failure, _): return nil
|
|
|
|
|
case (.success(let result), _): return result
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -896,30 +894,32 @@ open class Storage {
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Add a database observation
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** This function **MUST NOT** be called from the main thread
|
|
|
|
|
public func addObserver(_ observer: TransactionObserver?) {
|
|
|
|
|
guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return }
|
|
|
|
|
guard let observer: TransactionObserver = observer else { return }
|
|
|
|
|
|
|
|
|
|
// Note: This actually triggers a write to the database so can be blocked by other
|
|
|
|
|
// writes, since it's usually called on the main thread when creating a view controller
|
|
|
|
|
// this can result in the UI hanging - to avoid this we dispatch (and hope there isn't
|
|
|
|
|
// negative impact)
|
|
|
|
|
DispatchQueue.global(qos: .default).async {
|
|
|
|
|
dbWriter.add(transactionObserver: observer)
|
|
|
|
|
}
|
|
|
|
|
/// This actually triggers a write to the database so can be blocked by other writes so shouldn't be called on the main thread,
|
|
|
|
|
/// we don't dispatch to an async thread in here because `TransactionObserver` isn't `Sendable` so instead just require
|
|
|
|
|
/// that it isn't called on the main thread
|
|
|
|
|
Log.assertNotOnMainThread()
|
|
|
|
|
dbWriter.add(transactionObserver: observer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Remove a database observation
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** This function **MUST NOT** be called from the main thread
|
|
|
|
|
public func removeObserver(_ observer: TransactionObserver?) {
|
|
|
|
|
guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return }
|
|
|
|
|
guard let observer: TransactionObserver = observer else { return }
|
|
|
|
|
|
|
|
|
|
// Note: This actually triggers a write to the database so can be blocked by other
|
|
|
|
|
// writes, since it's usually called on the main thread when creating a view controller
|
|
|
|
|
// this can result in the UI hanging - to avoid this we dispatch (and hope there isn't
|
|
|
|
|
// negative impact)
|
|
|
|
|
DispatchQueue.global(qos: .default).async {
|
|
|
|
|
dbWriter.remove(transactionObserver: observer)
|
|
|
|
|
}
|
|
|
|
|
/// This actually triggers a write to the database so can be blocked by other writes so shouldn't be called on the main thread,
|
|
|
|
|
/// we don't dispatch to an async thread in here because `TransactionObserver` isn't `Sendable` so instead just require
|
|
|
|
|
/// that it isn't called on the main thread
|
|
|
|
|
Log.assertNotOnMainThread()
|
|
|
|
|
dbWriter.remove(transactionObserver: observer)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1028,7 +1028,7 @@ private extension Storage {
|
|
|
|
|
result?.timer = nil
|
|
|
|
|
|
|
|
|
|
let action: String = (info.isWrite ? "write" : "read")
|
|
|
|
|
Log.warn("[Storage] Slow \(action) taking longer than \(Storage.slowTransactionThreshold, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]")
|
|
|
|
|
Log.warn(.storage, "Slow \(action) taking longer than \(Storage.slowTransactionThreshold, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]")
|
|
|
|
|
result?.wasSlowTransaction = true
|
|
|
|
|
}
|
|
|
|
|
result.timer?.resume()
|
|
|
|
@ -1044,7 +1044,7 @@ private extension Storage {
|
|
|
|
|
|
|
|
|
|
let end: CFTimeInterval = CACurrentMediaTime()
|
|
|
|
|
let action: String = (info.isWrite ? "write" : "read")
|
|
|
|
|
Log.warn("[Storage] Slow \(action) completed after \(end - start, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]")
|
|
|
|
|
Log.warn(.storage, "Slow \(action) completed after \(end - start, format: ".2", omitZeroDecimal: true)s - [ \(info.callInfo) ]")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1159,13 +1159,18 @@ public extension Storage {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#if DEBUG
|
|
|
|
|
/// Function to determine if the debugger is attached
|
|
|
|
|
///
|
|
|
|
|
/// **Note:** Only contains logic when `DEBUG` is defined, otherwise it always returns false
|
|
|
|
|
func isDebuggerAttached() -> Bool {
|
|
|
|
|
#if DEBUG
|
|
|
|
|
var info = kinfo_proc()
|
|
|
|
|
var size = MemoryLayout<kinfo_proc>.stride
|
|
|
|
|
var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid()]
|
|
|
|
|
let sysctlResult = sysctl(&mib, UInt32(mib.count), &info, &size, nil, 0)
|
|
|
|
|
guard sysctlResult == 0 else { return false }
|
|
|
|
|
return (info.kp_proc.p_flag & P_TRACED) != 0
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
return false
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|