From 6f4bdcdccbed1080810de0764a2673564986a539 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 7 Jul 2023 13:20:32 +1000 Subject: [PATCH] Moved some logic outside of DBWrite closures to prevent hangs Updated the SessionApp.presentConversation function from using the DBWrite thread if it didn't need to Updated the PagedDatabaseObserver to process database commits async on a serial queue to avoid holding up the DBWrite thread Moved another Atomic mutation outside of a DBWrite closure Refactored the PagedDatabaseObserver 'databaseDidCommit' logic to be much more straightforward Tweaked a couple of flaky unit tests --- Session.xcodeproj/project.pbxproj | 12 +- Session/Closed Groups/NewClosedGroupVC.swift | 8 +- .../New Conversation/NewConversationVC.swift | 15 +- Session/Home/New Conversation/NewDMVC.swift | 16 +- Session/Meta/SessionApp.swift | 105 ++-- Session/Notifications/AppNotifications.swift | 38 +- Session/Open Groups/JoinOpenGroupVC.swift | 8 +- Session/Settings/QRCodeVC.swift | 16 +- .../Database/Models/SessionThread.swift | 46 +- .../Utilities/ProfileManager.swift | 5 +- .../Open Groups/OpenGroupManagerSpec.swift | 8 - ...eadDisappearingMessagesViewModelSpec.swift | 11 +- .../Types/PagedDatabaseObserver.swift | 505 +++++++++--------- _SharedTestUtilities/SynchronousStorage.swift | 5 +- 14 files changed, 426 insertions(+), 372 deletions(-) diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index a37a16a23..e929c5a0f 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -6599,7 +6599,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 416; + CURRENT_PROJECT_VERSION = 417; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = "$(inherited)"; @@ -6671,7 +6671,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 416; + CURRENT_PROJECT_VERSION = 417; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; ENABLE_NS_ASSERTIONS = NO; @@ -6736,7 +6736,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 416; + CURRENT_PROJECT_VERSION = 417; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = "$(inherited)"; @@ -6810,7 +6810,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 416; + CURRENT_PROJECT_VERSION = 417; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; ENABLE_NS_ASSERTIONS = NO; @@ -7718,7 +7718,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 416; + CURRENT_PROJECT_VERSION = 417; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", @@ -7789,7 +7789,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 416; + CURRENT_PROJECT_VERSION = 417; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", diff --git a/Session/Closed Groups/NewClosedGroupVC.swift b/Session/Closed Groups/NewClosedGroupVC.swift index deb2165f7..7e4a1eb51 100644 --- a/Session/Closed Groups/NewClosedGroupVC.swift +++ b/Session/Closed Groups/NewClosedGroupVC.swift @@ -356,8 +356,12 @@ final class NewClosedGroupVC: BaseVC, UITableViewDataSource, UITableViewDelegate } }, receiveValue: { thread in - self?.presentingViewController?.dismiss(animated: true, completion: nil) - SessionApp.presentConversation(for: thread.id, action: .compose, animated: false) + SessionApp.presentConversationCreatingIfNeeded( + for: thread.id, + variant: thread.variant, + dismissing: self?.presentingViewController, + animated: false + ) } ) } diff --git a/Session/Home/New Conversation/NewConversationVC.swift b/Session/Home/New Conversation/NewConversationVC.swift index 0b40a49c1..34b78e82f 100644 --- a/Session/Home/New Conversation/NewConversationVC.swift +++ b/Session/Home/New Conversation/NewConversationVC.swift @@ -179,16 +179,13 @@ final class NewConversationVC: BaseVC, ThemedNavigation, UITableViewDelegate, UI tableView.deselectRow(at: indexPath, animated: true) let sessionId = newConversationViewModel.sectionData[indexPath.section].contacts[indexPath.row].id - let maybeThread: SessionThread? = Storage.shared.write { db in - try SessionThread - .fetchOrCreate(db, id: sessionId, variant: .contact, shouldBeVisible: nil) - } - - guard maybeThread != nil else { return } - self.navigationController?.dismiss(animated: true, completion: nil) - - SessionApp.presentConversation(for: sessionId, action: .compose, animated: false) + SessionApp.presentConversationCreatingIfNeeded( + for: sessionId, + variant: .contact, + dismissing: navigationController, + animated: false + ) } func tableView(_ tableView: UITableView, willDisplayHeaderView view: UIView, forSection section: Int) { diff --git a/Session/Home/New Conversation/NewDMVC.swift b/Session/Home/New Conversation/NewDMVC.swift index 2ce3b92a1..2d900083d 100644 --- a/Session/Home/New Conversation/NewDMVC.swift +++ b/Session/Home/New Conversation/NewDMVC.swift @@ -260,16 +260,12 @@ final class NewDMVC: BaseVC, UIPageViewControllerDataSource, UIPageViewControlle } private func startNewDM(with sessionId: String) { - let maybeThread: SessionThread? = Storage.shared.write { db in - try SessionThread - .fetchOrCreate(db, id: sessionId, variant: .contact, shouldBeVisible: nil) - } - - guard maybeThread != nil else { return } - - presentingViewController?.dismiss(animated: true, completion: nil) - - SessionApp.presentConversation(for: sessionId, action: .compose, animated: false) + SessionApp.presentConversationCreatingIfNeeded( + for: sessionId, + variant: .contact, + dismissing: presentingViewController, + animated: false + ) } } diff --git a/Session/Meta/SessionApp.swift b/Session/Meta/SessionApp.swift index 319c82f7e..7ffa9292c 100644 --- a/Session/Meta/SessionApp.swift +++ b/Session/Meta/SessionApp.swift @@ -35,59 +35,78 @@ public struct SessionApp { // MARK: - View Convenience Methods - public static func presentConversation(for threadId: String, action: ConversationViewModel.Action = .none, animated: Bool) { - let maybeThreadInfo: (thread: SessionThread, isMessageRequest: Bool)? = Storage.shared.write { db in - let thread: SessionThread = try SessionThread - .fetchOrCreate(db, id: threadId, variant: .contact, shouldBeVisible: nil) + public static func presentConversationCreatingIfNeeded( + for threadId: String, + variant: SessionThread.Variant, + action: ConversationViewModel.Action = .none, + dismissing presentingViewController: UIViewController?, + animated: Bool + ) { + let threadInfo: (threadExists: Bool, isMessageRequest: Bool)? = Storage.shared.read { db in + let isMessageRequest: Bool = { + switch variant { + case .contact: + return SessionThread + .isMessageRequest( + id: threadId, + variant: .contact, + currentUserPublicKey: getUserHexEncodedPublicKey(db), + shouldBeVisible: nil, + contactIsApproved: (try? Contact + .filter(id: threadId) + .select(.isApproved) + .asRequest(of: Bool.self) + .fetchOne(db)) + .defaulting(to: false), + includeNonVisible: true + ) + + default: return false + } + }() - return (thread, thread.isMessageRequest(db)) + return (SessionThread.filter(id: threadId).isNotEmpty(db), isMessageRequest) } - guard - let variant: SessionThread.Variant = maybeThreadInfo?.thread.variant, - let isMessageRequest: Bool = maybeThreadInfo?.isMessageRequest - else { return } + // Store the post-creation logic in a closure to avoid duplication + let afterThreadCreated: () -> () = { + presentingViewController?.dismiss(animated: true, completion: nil) + + homeViewController.wrappedValue?.show( + threadId, + variant: variant, + isMessageRequest: (threadInfo?.isMessageRequest == true), + with: action, + focusedInteractionInfo: nil, + animated: animated + ) + } - self.presentConversation( - for: threadId, - threadVariant: variant, - isMessageRequest: isMessageRequest, - action: action, - focusInteractionInfo: nil, - animated: animated - ) - } - - public static func presentConversation( - for threadId: String, - threadVariant: SessionThread.Variant, - isMessageRequest: Bool, - action: ConversationViewModel.Action, - focusInteractionInfo: Interaction.TimestampInfo?, - animated: Bool - ) { + /// The thread should generally exist at the time of calling this method, but on the off change it doesn't then we need to `fetchOrCreate` it and + /// should do it on a background thread just in case something is keeping the DBWrite thread busy as in the past this could cause the app to hang + guard threadInfo?.threadExists == true else { + DispatchQueue.global(qos: .userInitiated).async { + Storage.shared.write { db in + try SessionThread.fetchOrCreate(db, id: threadId, variant: variant, shouldBeVisible: nil) + } + + // Send back to main thread for UI transitions + DispatchQueue.main.async { + afterThreadCreated() + } + } + return + } + + // Send to main thread if needed guard Thread.isMainThread else { DispatchQueue.main.async { - self.presentConversation( - for: threadId, - threadVariant: threadVariant, - isMessageRequest: isMessageRequest, - action: action, - focusInteractionInfo: focusInteractionInfo, - animated: animated - ) + afterThreadCreated() } return } - homeViewController.wrappedValue?.show( - threadId, - variant: threadVariant, - isMessageRequest: isMessageRequest, - with: action, - focusedInteractionInfo: focusInteractionInfo, - animated: animated - ) + afterThreadCreated() } // MARK: - Functions diff --git a/Session/Notifications/AppNotifications.swift b/Session/Notifications/AppNotifications.swift index 314fee3b5..31ea7d6b0 100644 --- a/Session/Notifications/AppNotifications.swift +++ b/Session/Notifications/AppNotifications.swift @@ -37,6 +37,7 @@ enum AppNotificationAction: CaseIterable { struct AppNotificationUserInfoKey { static let threadId = "Signal.AppNotificationsUserInfoKey.threadId" + static let threadVariantRaw = "Signal.AppNotificationsUserInfoKey.threadVariantRaw" static let callBackNumber = "Signal.AppNotificationsUserInfoKey.callBackNumber" static let localCallId = "Signal.AppNotificationsUserInfoKey.localCallId" static let threadNotificationCounter = "Session.AppNotificationsUserInfoKey.threadNotificationCounter" @@ -232,8 +233,9 @@ public class NotificationPresenter: NotificationsProtocol { // "no longer verified". let category = AppNotificationCategory.incomingMessage - let userInfo = [ - AppNotificationUserInfoKey.threadId: thread.id + let userInfo: [AnyHashable: Any] = [ + AppNotificationUserInfoKey.threadId: thread.id, + AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue ] let userPublicKey: String = getUserHexEncodedPublicKey(db) @@ -301,8 +303,9 @@ public class NotificationPresenter: NotificationsProtocol { let previewType: Preferences.NotificationPreviewType = db[.preferencesNotificationPreviewType] .defaulting(to: .nameAndPreview) - let userInfo = [ - AppNotificationUserInfoKey.threadId: thread.id + let userInfo: [AnyHashable: Any] = [ + AppNotificationUserInfoKey.threadId: thread.id, + AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue ] let notificationTitle: String = "Session" @@ -378,8 +381,9 @@ public class NotificationPresenter: NotificationsProtocol { let category = AppNotificationCategory.incomingMessage - let userInfo = [ - AppNotificationUserInfoKey.threadId: thread.id + let userInfo: [AnyHashable: Any] = [ + AppNotificationUserInfoKey.threadId: thread.id, + AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue ] let threadName: String = SessionThread.displayName( @@ -440,8 +444,9 @@ public class NotificationPresenter: NotificationsProtocol { let notificationBody = NotificationStrings.failedToSendBody - let userInfo = [ - AppNotificationUserInfoKey.threadId: thread.id + let userInfo: [AnyHashable: Any] = [ + AppNotificationUserInfoKey.threadId: thread.id, + AppNotificationUserInfoKey.threadVariantRaw: thread.variant.rawValue ] let fallbackSound: Preferences.Sound = db[.defaultNotificationSound] .defaulting(to: Preferences.Sound.defaultNotificationSound) @@ -609,15 +614,22 @@ class NotificationActionHandler { } func showThread(userInfo: [AnyHashable: Any]) -> AnyPublisher { - guard let threadId = userInfo[AppNotificationUserInfoKey.threadId] as? String else { - return showHomeVC() - } + guard + let threadId = userInfo[AppNotificationUserInfoKey.threadId] as? String, + let threadVariantRaw = userInfo[AppNotificationUserInfoKey.threadVariantRaw] as? Int, + let threadVariant: SessionThread.Variant = SessionThread.Variant(rawValue: threadVariantRaw) + else { return showHomeVC() } // If this happens when the the app is not, visible we skip the animation so the thread // can be visible to the user immediately upon opening the app, rather than having to watch // it animate in from the homescreen. - let shouldAnimate: Bool = (UIApplication.shared.applicationState == .active) - SessionApp.presentConversation(for: threadId, animated: shouldAnimate) + SessionApp.presentConversationCreatingIfNeeded( + for: threadId, + variant: threadVariant, + dismissing: nil, + animated: (UIApplication.shared.applicationState == .active) + ) + return Just(()) .eraseToAnyPublisher() } diff --git a/Session/Open Groups/JoinOpenGroupVC.swift b/Session/Open Groups/JoinOpenGroupVC.swift index 5c7e19df5..71d39ec73 100644 --- a/Session/Open Groups/JoinOpenGroupVC.swift +++ b/Session/Open Groups/JoinOpenGroupVC.swift @@ -217,12 +217,10 @@ final class JoinOpenGroupVC: BaseVC, UIPageViewControllerDataSource, UIPageViewC self?.presentingViewController?.dismiss(animated: true, completion: nil) if shouldOpenCommunity { - SessionApp.presentConversation( + SessionApp.presentConversationCreatingIfNeeded( for: OpenGroup.idFor(roomToken: roomToken, server: server), - threadVariant: .community, - isMessageRequest: false, - action: .compose, - focusInteractionInfo: nil, + variant: .community, + dismissing: nil, animated: false ) } diff --git a/Session/Settings/QRCodeVC.swift b/Session/Settings/QRCodeVC.swift index 18abb9f46..7f16b4bc7 100644 --- a/Session/Settings/QRCodeVC.swift +++ b/Session/Settings/QRCodeVC.swift @@ -138,16 +138,12 @@ final class QRCodeVC : BaseVC, UIPageViewControllerDataSource, UIPageViewControl self.present(modal, animated: true) } else { - let maybeThread: SessionThread? = Storage.shared.write { db in - try SessionThread - .fetchOrCreate(db, id: hexEncodedPublicKey, variant: .contact, shouldBeVisible: nil) - } - - guard maybeThread != nil else { return } - - presentingViewController?.dismiss(animated: true, completion: nil) - - SessionApp.presentConversation(for: hexEncodedPublicKey, action: .compose, animated: false) + SessionApp.presentConversationCreatingIfNeeded( + for: hexEncodedPublicKey, + variant: .contact, + dismissing: presentingViewController, + animated: false + ) } } } diff --git a/SessionMessagingKit/Database/Models/SessionThread.swift b/SessionMessagingKit/Database/Models/SessionThread.swift index 0e5df42e5..baefc44d1 100644 --- a/SessionMessagingKit/Database/Models/SessionThread.swift +++ b/SessionMessagingKit/Database/Models/SessionThread.swift @@ -192,20 +192,6 @@ public extension SessionThread { ) } - func isMessageRequest(_ db: Database, includeNonVisible: Bool = false) -> Bool { - return ( - (includeNonVisible || shouldBeVisible) && - variant == .contact && - id != getUserHexEncodedPublicKey(db) && // Note to self - (try? Contact - .filter(id: id) - .select(.isApproved) - .asRequest(of: Bool.self) - .fetchOne(db)) - .defaulting(to: false) == false - ) - } - static func canSendReadReceipt( _ db: Database, threadId: String, @@ -431,6 +417,38 @@ public extension SessionThread { ).sqlExpression } + func isMessageRequest(_ db: Database, includeNonVisible: Bool = false) -> Bool { + return SessionThread.isMessageRequest( + id: id, + variant: variant, + currentUserPublicKey: getUserHexEncodedPublicKey(db), + shouldBeVisible: shouldBeVisible, + contactIsApproved: (try? Contact + .filter(id: id) + .select(.isApproved) + .asRequest(of: Bool.self) + .fetchOne(db)) + .defaulting(to: false), + includeNonVisible: includeNonVisible + ) + } + + static func isMessageRequest( + id: String, + variant: SessionThread.Variant?, + currentUserPublicKey: String, + shouldBeVisible: Bool?, + contactIsApproved: Bool?, + includeNonVisible: Bool = false + ) -> Bool { + return ( + (includeNonVisible || shouldBeVisible == true) && + variant == .contact && + id != currentUserPublicKey && // Note to self + ((contactIsApproved ?? false) == false) + ) + } + func isNoteToSelf(_ db: Database? = nil) -> Bool { return ( variant == .contact && diff --git a/SessionMessagingKit/Utilities/ProfileManager.swift b/SessionMessagingKit/Utilities/ProfileManager.swift index 3089f59f1..5ffc3d937 100644 --- a/SessionMessagingKit/Utilities/ProfileManager.swift +++ b/SessionMessagingKit/Utilities/ProfileManager.swift @@ -265,12 +265,15 @@ public struct ProfileManager { return } + // Update the cache first (in case the DBWrite thread is blocked, this way other threads + // can retrieve from the cache and avoid triggering a download) + profileAvatarCache.mutate { $0[fileName] = decryptedData } + // Store the updated 'profilePictureFileName' Storage.shared.write { db in _ = try? Profile .filter(id: profile.id) .updateAll(db, Profile.Columns.profilePictureFileName.set(to: fileName)) - profileAvatarCache.mutate { $0[fileName] = decryptedData } } } ) diff --git a/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift b/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift index 4adf3f940..eb93e936c 100644 --- a/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift +++ b/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift @@ -438,14 +438,6 @@ class OpenGroupManagerSpec: QuickSpec { mockOGMCache.when { $0.isPolling }.thenReturn(true) mockOGMCache.when { $0.pollers }.thenReturn(["testserver": OpenGroupAPI.Poller(for: "testserver")]) - - mockUserDefaults - .when { (defaults: inout any UserDefaultsType) -> Any? in - defaults.object(forKey: SNUserDefaults.Date.lastOpen.rawValue) - } - .thenReturn(Date(timeIntervalSince1970: 1234567890)) - - openGroupManager.startPolling(using: dependencies) } it("removes all pollers") { diff --git a/SessionTests/Conversations/Settings/ThreadDisappearingMessagesViewModelSpec.swift b/SessionTests/Conversations/Settings/ThreadDisappearingMessagesViewModelSpec.swift index 8c51c3c49..89d8453f7 100644 --- a/SessionTests/Conversations/Settings/ThreadDisappearingMessagesViewModelSpec.swift +++ b/SessionTests/Conversations/Settings/ThreadDisappearingMessagesViewModelSpec.swift @@ -246,16 +246,9 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec { try DisappearingMessagesConfiguration.fetchOne(db, id: "TestId") } - expect(updatedConfig?.isEnabled) - .toEventually( - beTrue(), - timeout: .milliseconds(100) - ) + expect(updatedConfig?.isEnabled).to(beTrue()) expect(updatedConfig?.durationSeconds) - .toEventually( - equal(DisappearingMessagesConfiguration.validDurationsSeconds.last ?? -1), - timeout: .milliseconds(100) - ) + .to(equal(DisappearingMessagesConfiguration.validDurationsSeconds.last ?? -1)) } } } diff --git a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift index 26920940d..a2640835c 100644 --- a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift +++ b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift @@ -10,6 +10,12 @@ import DifferenceKit /// /// **Note:** We **MUST** have accurate `filterSQL` and `orderSQL` values otherwise the indexing won't work public class PagedDatabaseObserver: TransactionObserver where ObservedTable: TableRecord & ColumnExpressible & Identifiable, T: FetchableRecordWithRowId & Identifiable { + private let commitProcessingQueue: DispatchQueue = DispatchQueue( + label: "PagedDatabaseObserver.commitProcessingQueue", + qos: .userInitiated, + attributes: [] // Must be serial in order to avoid updates getting processed in the wrong order + ) + // MARK: - Variables private let pagedTableName: String @@ -145,74 +151,58 @@ public class PagedDatabaseObserver: TransactionObserver where changesInCommit.mutate { $0.insert(trackedChange) } } - // Note: We will process all updates which come through this method even if - // 'onChange' is null because if the UI stops observing and then starts again - // later we don't want to have missed any changes which happened while the UI - // wasn't subscribed (and doing a full re-query seems painful...) + /// We will process all updates which come through this method even if 'onChange' is null because if the UI stops observing and then starts + /// again later we don't want to have missed any changes which happened while the UI wasn't subscribed (and doing a full re-query seems painful...) + /// + /// **Note:** This function is generally called within the DBWrite thread but we don't actually need write access to process the commit, in order + /// to avoid blocking the DBWrite thread we dispatch to a serial `commitProcessingQueue` to process the incoming changes (in the past not doing + /// so was resulting in hanging when there was a lot of activity happening) public func databaseDidCommit(_ db: Database) { + // Since we can't be sure the behaviours of 'databaseDidChange' and 'databaseDidCommit' won't change in + // the future we extract and clear the values in 'changesInCommit' since it's 'Atomic' so will different + // threads modifying the data resulting in us missing a change var committedChanges: Set = [] + self.changesInCommit.mutate { cachedChanges in committedChanges = cachedChanges cachedChanges.removeAll() } - // Note: This method will be called regardless of whether there were actually changes - // in the areas we are observing so we want to early-out if there aren't any relevant - // updated rows + commitProcessingQueue.async { [weak self] in + self?.processDatabaseCommit(committedChanges: committedChanges) + } + } + + private func processDatabaseCommit(committedChanges: Set) { + // Do nothing when there are no changes guard !committedChanges.isEmpty else { return } + typealias AssociatedDataInfo = [(hasChanges: Bool, data: ErasedAssociatedRecord)] + typealias UpdatedData = (cache: DataCache, pageInfo: PagedData.PageInfo, hasChanges: Bool, associatedData: AssociatedDataInfo) + + // Store the instance variables locally to avoid unwrapping + let dataCache: DataCache = self.dataCache.wrappedValue + let pageInfo: PagedData.PageInfo = self.pageInfo.wrappedValue let joinSQL: SQL? = self.joinSQL let orderSQL: SQL = self.orderSQL let filterSQL: SQL = self.filterSQL let associatedRecords: [ErasedAssociatedRecord] = self.associatedRecords - - let updateDataAndCallbackIfNeeded: (DataCache, PagedData.PageInfo, Bool) -> () = { [weak self] updatedDataCache, updatedPageInfo, cacheHasChanges in - let associatedDataInfo: [(hasChanges: Bool, data: ErasedAssociatedRecord)] = associatedRecords - .map { associatedRecord in - let hasChanges: Bool = associatedRecord.tryUpdateForDatabaseCommit( - db, - changes: committedChanges, - joinSQL: joinSQL, - orderSQL: orderSQL, - filterSQL: filterSQL, - pageInfo: updatedPageInfo - ) - - return (hasChanges, associatedRecord) - } - - // Check if we need to trigger a change callback - guard cacheHasChanges || associatedDataInfo.contains(where: { hasChanges, _ in hasChanges }) else { - return - } - - // If the associated data changed then update the updatedCachedData with the - // updated associated data - var finalUpdatedDataCache: DataCache = updatedDataCache - - associatedDataInfo.forEach { hasChanges, associatedData in - guard cacheHasChanges || hasChanges else { return } + let getAssociatedDataInfo: (Database, PagedData.PageInfo) -> AssociatedDataInfo = { db, updatedPageInfo in + associatedRecords.map { associatedRecord in + let hasChanges: Bool = associatedRecord.tryUpdateForDatabaseCommit( + db, + changes: committedChanges, + joinSQL: joinSQL, + orderSQL: orderSQL, + filterSQL: filterSQL, + pageInfo: updatedPageInfo + ) - finalUpdatedDataCache = associatedData.updateAssociatedData(to: finalUpdatedDataCache) + return (hasChanges, associatedRecord) } - - // Update the cache, pageInfo and the change callback - self?.dataCache.mutate { $0 = finalUpdatedDataCache } - self?.pageInfo.mutate { $0 = updatedPageInfo } - - - // Make sure the updates run on the main thread - guard Thread.isMainThread else { - DispatchQueue.main.async { [weak self] in - self?.onChangeUnsorted(finalUpdatedDataCache.values, updatedPageInfo) - } - return - } - - self?.onChangeUnsorted(finalUpdatedDataCache.values, updatedPageInfo) } - // Determing if there were any direct or related data changes + // Determine if there were any direct or related data changes let directChanges: Set = committedChanges .filter { $0.tableName == pagedTableName } let relatedChanges: [String: [PagedData.TrackedChange]] = committedChanges @@ -227,215 +217,248 @@ public class PagedDatabaseObserver: TransactionObserver where .filter { $0.tableName != pagedTableName } .filter { $0.kind == .delete } - guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { - updateDataAndCallbackIfNeeded(self.dataCache.wrappedValue, self.pageInfo.wrappedValue, false) - return - } - - var updatedPageInfo: PagedData.PageInfo = self.pageInfo.wrappedValue - var updatedDataCache: DataCache = self.dataCache.wrappedValue - let deletionChanges: [Int64] = directChanges - .filter { $0.kind == .delete } - .map { $0.rowId } - let oldDataCount: Int = dataCache.wrappedValue.count - - // First remove any items which have been deleted - if !deletionChanges.isEmpty { - updatedDataCache = updatedDataCache.deleting(rowIds: deletionChanges) - - // Make sure there were actually changes - if updatedDataCache.count != oldDataCount { - let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount) + // Process and retrieve the updated data + let updatedData: UpdatedData = Storage.shared + .read { db -> UpdatedData in + // If there aren't any direct or related changes then early-out + guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { + return (dataCache, pageInfo, false, getAssociatedDataInfo(db, pageInfo)) + } - updatedPageInfo = PagedData.PageInfo( - pageSize: updatedPageInfo.pageSize, - pageOffset: updatedPageInfo.pageOffset, - currentCount: (updatedPageInfo.currentCount + dataSizeDiff), - totalCount: (updatedPageInfo.totalCount + dataSizeDiff) - ) - } - } - - // If there are no inserted/updated rows then trigger the update callback and stop here - let changesToQuery: [PagedData.TrackedChange] = directChanges - .filter { $0.kind != .delete } - - guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { - updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty) - return - } - - // First we need to get the rowIds for the paged data connected to any of the related changes - let pagedRowIdsForRelatedChanges: Set = { - guard !relatedChanges.isEmpty else { return [] } - - return relatedChanges - .reduce(into: []) { result, next in - guard - let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[next.key], - let joinToPagedType: SQL = observedChange.joinToPagedType - else { return } + // Store a mutable copies of the dataCache and pageInfo for updating + var updatedDataCache: DataCache = dataCache + var updatedPageInfo: PagedData.PageInfo = pageInfo + let deletionChanges: [Int64] = directChanges + .filter { $0.kind == .delete } + .map { $0.rowId } + let oldDataCount: Int = dataCache.count + + // First remove any items which have been deleted + if !deletionChanges.isEmpty { + updatedDataCache = updatedDataCache.deleting(rowIds: deletionChanges) - let pagedRowIds: [Int64] = PagedData.pagedRowIdsForRelatedRowIds( - db, - tableName: next.key, - pagedTableName: pagedTableName, - relatedRowIds: Array(next.value.map { $0.rowId }.asSet()), - joinToPagedType: joinToPagedType - ) + // Make sure there were actually changes + if updatedDataCache.count != oldDataCount { + let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount) + + updatedPageInfo = PagedData.PageInfo( + pageSize: updatedPageInfo.pageSize, + pageOffset: updatedPageInfo.pageOffset, + currentCount: (updatedPageInfo.currentCount + dataSizeDiff), + totalCount: (updatedPageInfo.totalCount + dataSizeDiff) + ) + } + } + + // If there are no inserted/updated rows then trigger then early-out + let changesToQuery: [PagedData.TrackedChange] = directChanges + .filter { $0.kind != .delete } + + guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else { + let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo) + return (updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty, associatedData) + } + + // Next we need to determine if any related changes were associated to the pagedData we are + // observing, if they aren't (and there were no other direct changes) we can early-out + let pagedRowIdsForRelatedChanges: Set = { + guard !relatedChanges.isEmpty else { return [] } - result.append(contentsOf: pagedRowIds) + return relatedChanges + .reduce(into: []) { result, next in + guard + let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[next.key], + let joinToPagedType: SQL = observedChange.joinToPagedType + else { return } + + let pagedRowIds: [Int64] = PagedData.pagedRowIdsForRelatedRowIds( + db, + tableName: next.key, + pagedTableName: pagedTableName, + relatedRowIds: Array(next.value.map { $0.rowId }.asSet()), + joinToPagedType: joinToPagedType + ) + + result.append(contentsOf: pagedRowIds) + } + .asSet() + }() + + guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else { + let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo) + return (updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty, associatedData) } - .asSet() - }() - - guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else { - updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty) - return - } - - // Fetch the indexes of the rowIds so we can determine whether they should be added to the screen - let directRowIds: Set = changesToQuery.map { $0.rowId }.asSet() - let pagedRowIdsForRelatedDeletions: Set = relatedDeletions - .compactMap { $0.pagedRowIdsForRelatedDeletion } - .flatMap { $0 } - .asSet() - let itemIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( - db, - rowIds: Array(directRowIds), - tableName: pagedTableName, - requiredJoinSQL: joinSQL, - orderSQL: orderSQL, - filterSQL: filterSQL - ) - let relatedChangeIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( - db, - rowIds: Array(pagedRowIdsForRelatedChanges), - tableName: pagedTableName, - requiredJoinSQL: joinSQL, - orderSQL: orderSQL, - filterSQL: filterSQL - ) - let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( - db, - rowIds: Array(pagedRowIdsForRelatedDeletions), - tableName: pagedTableName, - requiredJoinSQL: joinSQL, - orderSQL: orderSQL, - filterSQL: filterSQL - ) - - // Determine if the indexes for the row ids should be displayed on the screen and remove any - // which shouldn't - values less than 'currentCount' or if there is at least one value less than - // 'currentCount' and the indexes are sequential (ie. more than the current loaded content was - // added at once) - func determineValidChanges(for indexInfo: [PagedData.RowIndexInfo]) -> [Int64] { - let indexes: [Int64] = Array(indexInfo - .map { $0.rowIndex } - .sorted() - .asSet()) - let indexesAreSequential: Bool = (indexes.map { $0 - 1 }.dropFirst() == indexes.dropLast()) - let hasOneValidIndex: Bool = indexInfo.contains(where: { info -> Bool in - info.rowIndex >= updatedPageInfo.pageOffset && ( - info.rowIndex < updatedPageInfo.currentCount || ( - updatedPageInfo.currentCount < updatedPageInfo.pageSize && - info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize) - ) + + // Fetch the indexes of the rowIds so we can determine whether they should be added to the screen + let directRowIds: Set = changesToQuery.map { $0.rowId }.asSet() + let pagedRowIdsForRelatedDeletions: Set = relatedDeletions + .compactMap { $0.pagedRowIdsForRelatedDeletion } + .flatMap { $0 } + .asSet() + let itemIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( + db, + rowIds: Array(directRowIds), + tableName: pagedTableName, + requiredJoinSQL: joinSQL, + orderSQL: orderSQL, + filterSQL: filterSQL ) - }) - - return (indexesAreSequential && hasOneValidIndex ? - indexInfo.map { $0.rowId } : - indexInfo - .filter { info -> Bool in + let relatedChangeIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( + db, + rowIds: Array(pagedRowIdsForRelatedChanges), + tableName: pagedTableName, + requiredJoinSQL: joinSQL, + orderSQL: orderSQL, + filterSQL: filterSQL + ) + let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes( + db, + rowIds: Array(pagedRowIdsForRelatedDeletions), + tableName: pagedTableName, + requiredJoinSQL: joinSQL, + orderSQL: orderSQL, + filterSQL: filterSQL + ) + + // Determine if the indexes for the row ids should be displayed on the screen and remove any + // which shouldn't - values less than 'currentCount' or if there is at least one value less than + // 'currentCount' and the indexes are sequential (ie. more than the current loaded content was + // added at once) + func determineValidChanges(for indexInfo: [PagedData.RowIndexInfo]) -> [Int64] { + let indexes: [Int64] = Array(indexInfo + .map { $0.rowIndex } + .sorted() + .asSet()) + let indexesAreSequential: Bool = (indexes.map { $0 - 1 }.dropFirst() == indexes.dropLast()) + let hasOneValidIndex: Bool = indexInfo.contains(where: { info -> Bool in info.rowIndex >= updatedPageInfo.pageOffset && ( info.rowIndex < updatedPageInfo.currentCount || ( updatedPageInfo.currentCount < updatedPageInfo.pageSize && info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize) ) ) + }) + + return (indexesAreSequential && hasOneValidIndex ? + indexInfo.map { $0.rowId } : + indexInfo + .filter { info -> Bool in + info.rowIndex >= updatedPageInfo.pageOffset && ( + info.rowIndex < updatedPageInfo.currentCount || ( + updatedPageInfo.currentCount < updatedPageInfo.pageSize && + info.rowIndex <= (updatedPageInfo.pageOffset + updatedPageInfo.pageSize) + ) + ) + } + .map { info -> Int64 in info.rowId } + ) + } + let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes) + let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes) + let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes) + let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count + + // If the number of indexes doesn't match the number of rowIds then it means something changed + // resulting in an item being filtered out + func performRemovalsIfNeeded(for rowIds: Set, indexes: [PagedData.RowIndexInfo]) { + let uniqueIndexes: Set = indexes.map { $0.rowId }.asSet() + + // If they have the same count then nothin was filtered out so do nothing + guard rowIds.count != uniqueIndexes.count else { return } + + // Otherwise something was probably removed so try to remove it from the cache + let rowIdsRemoved: Set = rowIds.subtracting(uniqueIndexes) + let preDeletionCount: Int = updatedDataCache.count + updatedDataCache = updatedDataCache.deleting(rowIds: Array(rowIdsRemoved)) + + // Lastly make sure there were actually changes before updating the page info + guard updatedDataCache.count != preDeletionCount else { return } + + let dataSizeDiff: Int = (updatedDataCache.count - preDeletionCount) + + updatedPageInfo = PagedData.PageInfo( + pageSize: updatedPageInfo.pageSize, + pageOffset: updatedPageInfo.pageOffset, + currentCount: (updatedPageInfo.currentCount + dataSizeDiff), + totalCount: (updatedPageInfo.totalCount + dataSizeDiff) + ) + } + + // Actually perform any required removals + performRemovalsIfNeeded(for: directRowIds, indexes: itemIndexes) + performRemovalsIfNeeded(for: pagedRowIdsForRelatedChanges, indexes: relatedChangeIndexes) + performRemovalsIfNeeded(for: pagedRowIdsForRelatedDeletions, indexes: relatedDeletionIndexes) + + // Update the offset and totalCount even if the rows are outside of the current page (need to + // in order to ensure the 'load more' sections are accurate) + updatedPageInfo = PagedData.PageInfo( + pageSize: updatedPageInfo.pageSize, + pageOffset: (updatedPageInfo.pageOffset + countBefore), + currentCount: updatedPageInfo.currentCount, + totalCount: ( + updatedPageInfo.totalCount + + changesToQuery + .filter { $0.kind == .insert } + .filter { validChangeRowIds.contains($0.rowId) } + .count + ) + ) + + // If there are no valid row ids then early-out (at this point the pageInfo would have changed + // so we want to flat 'hasChanges' as true) + guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else { + let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo) + return (updatedDataCache, updatedPageInfo, true, associatedData) + } + + // Fetch the inserted/updated rows + let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet()) + let updatedItems: [T] = { + do { return try dataQuery(targetRowIds).fetchAll(db) } + catch { + SNLog("[PagedDatabaseObserver] Error fetching data during change: \(error)") + return [] } - .map { info -> Int64 in info.rowId } - ) + }() + + updatedDataCache = updatedDataCache.upserting(items: updatedItems) + + // Update the currentCount for the upserted data + let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount) + updatedPageInfo = PagedData.PageInfo( + pageSize: updatedPageInfo.pageSize, + pageOffset: updatedPageInfo.pageOffset, + currentCount: (updatedPageInfo.currentCount + dataSizeDiff), + totalCount: updatedPageInfo.totalCount + ) + + // Return the final updated data + let associatedData: AssociatedDataInfo = getAssociatedDataInfo(db, updatedPageInfo) + return (updatedDataCache, updatedPageInfo, true, associatedData) + } + .defaulting(to: (cache: dataCache, pageInfo: pageInfo, hasChanges: false, associatedData: [])) + + // Now that we have all of the changes, check if there were actually any changes + guard updatedData.hasChanges || updatedData.associatedData.contains(where: { hasChanges, _ in hasChanges }) else { + return } - let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes) - let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes) - let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes) - let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count - // If the number of indexes doesn't match the number of rowIds then it means something changed - // resulting in an item being filtered out - func performRemovalsIfNeeded(for rowIds: Set, indexes: [PagedData.RowIndexInfo]) { - let uniqueIndexes: Set = indexes.map { $0.rowId }.asSet() - - // If they have the same count then nothin was filtered out so do nothing - guard rowIds.count != uniqueIndexes.count else { return } - - // Otherwise something was probably removed so try to remove it from the cache - let rowIdsRemoved: Set = rowIds.subtracting(uniqueIndexes) - let preDeletionCount: Int = updatedDataCache.count - updatedDataCache = updatedDataCache.deleting(rowIds: Array(rowIdsRemoved)) + // If the associated data changed then update the updatedCachedData with the updated associated data + var finalUpdatedDataCache: DataCache = updatedData.cache - // Lastly make sure there were actually changes before updating the page info - guard updatedDataCache.count != preDeletionCount else { return } - - let dataSizeDiff: Int = (updatedDataCache.count - preDeletionCount) + updatedData.associatedData.forEach { hasChanges, associatedData in + guard updatedData.hasChanges || hasChanges else { return } - updatedPageInfo = PagedData.PageInfo( - pageSize: updatedPageInfo.pageSize, - pageOffset: updatedPageInfo.pageOffset, - currentCount: (updatedPageInfo.currentCount + dataSizeDiff), - totalCount: (updatedPageInfo.totalCount + dataSizeDiff) - ) + finalUpdatedDataCache = associatedData.updateAssociatedData(to: finalUpdatedDataCache) } - - // Actually perform any required removals - performRemovalsIfNeeded(for: directRowIds, indexes: itemIndexes) - performRemovalsIfNeeded(for: pagedRowIdsForRelatedChanges, indexes: relatedChangeIndexes) - performRemovalsIfNeeded(for: pagedRowIdsForRelatedDeletions, indexes: relatedDeletionIndexes) - - // Update the offset and totalCount even if the rows are outside of the current page (need to - // in order to ensure the 'load more' sections are accurate) - updatedPageInfo = PagedData.PageInfo( - pageSize: updatedPageInfo.pageSize, - pageOffset: (updatedPageInfo.pageOffset + countBefore), - currentCount: updatedPageInfo.currentCount, - totalCount: ( - updatedPageInfo.totalCount + - changesToQuery - .filter { $0.kind == .insert } - .filter { validChangeRowIds.contains($0.rowId) } - .count - ) - ) - // If there are no valid row ids then stop here (trigger updates though since the page info - // has changes) - guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else { - updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true) - return - } - - // Fetch the inserted/updated rows - let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet()) - let updatedItems: [T] = (try? dataQuery(targetRowIds) - .fetchAll(db)) - .defaulting(to: []) + // Update the cache, pageInfo and the change callback + self.dataCache.mutate { $0 = finalUpdatedDataCache } + self.pageInfo.mutate { $0 = updatedData.pageInfo } - // Process the upserted data - updatedDataCache = updatedDataCache.upserting(items: updatedItems) - - // Update the currentCount for the upserted data - let dataSizeDiff: Int = (updatedDataCache.count - oldDataCount) - - updatedPageInfo = PagedData.PageInfo( - pageSize: updatedPageInfo.pageSize, - pageOffset: updatedPageInfo.pageOffset, - currentCount: (updatedPageInfo.currentCount + dataSizeDiff), - totalCount: updatedPageInfo.totalCount - ) - - updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true) + // Trigger the unsorted change callback (the actual UI update triggering should eventually be run on + // the main thread via the `PagedData.processAndTriggerUpdates` function) + self.onChangeUnsorted(finalUpdatedDataCache.values, updatedData.pageInfo) } public func databaseDidRollback(_ db: Database) {} diff --git a/_SharedTestUtilities/SynchronousStorage.swift b/_SharedTestUtilities/SynchronousStorage.swift index c7feaf137..0e9e87655 100644 --- a/_SharedTestUtilities/SynchronousStorage.swift +++ b/_SharedTestUtilities/SynchronousStorage.swift @@ -19,9 +19,12 @@ class SynchronousStorage: Storage { } override func writePublisher( + fileName: String = #file, + functionName: String = #function, + lineNumber: Int = #line, updates: @escaping (Database) throws -> T ) -> AnyPublisher { - guard let result: T = super.write(updates: updates) else { + guard let result: T = super.write(fileName: fileName, functionName: functionName, lineNumber: lineNumber, updates: updates) else { return Fail(error: StorageError.generic) .eraseToAnyPublisher() }