diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index e929c5a0f..11e85bdf7 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -6599,7 +6599,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 417; + CURRENT_PROJECT_VERSION = 418; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = "$(inherited)"; @@ -6671,7 +6671,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 417; + CURRENT_PROJECT_VERSION = 418; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; ENABLE_NS_ASSERTIONS = NO; @@ -6736,7 +6736,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 417; + CURRENT_PROJECT_VERSION = 418; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = "$(inherited)"; @@ -6810,7 +6810,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; CODE_SIGN_STYLE = Automatic; COPY_PHASE_STRIP = NO; - CURRENT_PROJECT_VERSION = 417; + CURRENT_PROJECT_VERSION = 418; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; DEVELOPMENT_TEAM = SUQ8J2PCT7; ENABLE_NS_ASSERTIONS = NO; @@ -7718,7 +7718,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 417; + CURRENT_PROJECT_VERSION = 418; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", @@ -7789,7 +7789,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 417; + CURRENT_PROJECT_VERSION = 418; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index 1373c0bd5..7c30437dc 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -213,6 +213,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { // MARK: - Interaction Data + private var lastInteractionIdMarkedAsRead: Int64? = nil private var lastInteractionTimestampMsMarkedAsRead: Int64 = 0 public private(set) var unobservedInteractionDataChanges: ([SectionModel], StagedChangeset<[SectionModel]>)? public private(set) var interactionData: [SectionModel] = [] @@ -651,8 +652,8 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { /// Since this method now gets triggered when scrolling we want to try to optimise it and avoid busying the database /// write queue when it isn't needed, in order to do this we: /// - Throttle the updates to 100ms (quick enough that users shouldn't notice, but will help the DB when the user flings the list) - /// - Don't bother marking anything as read if this was called with the same `interactionId` that we previously marked as - /// read (ie. when scrolling and the last message hasn't changed) + /// - Only mark interactions as read if they have newer `timestampMs` or `id` values (ie. were sent later or were more-recent + /// entries in the database), **Note:** Old messages will be marked as read upon insertion so shouldn't be an issue /// /// The `ThreadViewModel.markAsRead` method also tries to avoid marking as read if a conversation is already fully read if markAsReadPublisher == nil { @@ -662,10 +663,11 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { receiveOutput: { [weak self] target, timestampMs in switch target { case .thread: self?.threadData.markAsRead(target: target) - case .threadAndInteractions: + case .threadAndInteractions(let interactionId): guard timestampMs == nil || - (self?.lastInteractionTimestampMsMarkedAsRead ?? 0) < (timestampMs ?? 0) + (self?.lastInteractionTimestampMsMarkedAsRead ?? 0) < (timestampMs ?? 0) || + (self?.lastInteractionIdMarkedAsRead ?? 0) < (interactionId ?? 0) else { self?.threadData.markAsRead(target: .thread) return @@ -677,6 +679,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { self?.lastInteractionTimestampMsMarkedAsRead = timestampMs } + self?.lastInteractionIdMarkedAsRead = (interactionId ?? self?.threadData.interactionId) self?.threadData.markAsRead(target: target) } } diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index 211f2ed92..5c351ec76 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -521,61 +521,65 @@ public final class OpenGroupManager { } } - db.afterNextTransactionNested { db in - // Start the poller if needed - if dependencies.cache.pollers[server.lowercased()] == nil { - dependencies.mutableCache.mutate { - $0.pollers[server.lowercased()]?.stop() - $0.pollers[server.lowercased()] = OpenGroupAPI.Poller(for: server.lowercased()) + db.afterNextTransactionNested { _ in + // Dispatch async to the workQueue to prevent holding up the DBWrite thread from the + // above transaction + OpenGroupAPI.workQueue.async { + // Start the poller if needed + if dependencies.cache.pollers[server.lowercased()] == nil { + dependencies.mutableCache.mutate { + $0.pollers[server.lowercased()]?.stop() + $0.pollers[server.lowercased()] = OpenGroupAPI.Poller(for: server.lowercased()) + } + + dependencies.cache.pollers[server.lowercased()]?.startIfNeeded(using: dependencies) } - dependencies.cache.pollers[server.lowercased()]?.startIfNeeded(using: dependencies) - } - - /// Start downloading the room image (if we don't have one or it's been updated) - if - let imageId: String = (pollInfo.details?.imageId ?? openGroup.imageId), - ( - openGroup.imageData == nil || - openGroup.imageId != imageId - ) - { - OpenGroupManager - .roomImage( - fileId: imageId, - for: roomToken, - on: server, - existingData: openGroup.imageData, - using: dependencies + /// Start downloading the room image (if we don't have one or it's been updated) + if + let imageId: String = (pollInfo.details?.imageId ?? openGroup.imageId), + ( + openGroup.imageData == nil || + openGroup.imageId != imageId ) - // Note: We need to subscribe and receive on different threads to ensure the - // logic in 'receiveValue' doesn't result in a reentrancy database issue - .subscribe(on: OpenGroupAPI.workQueue) - .receive(on: DispatchQueue.global(qos: .default)) - .sinkUntilComplete( - receiveCompletion: { _ in - if waitForImageToComplete { - completion?() - } - }, - receiveValue: { data in - dependencies.storage.write { db in - _ = try OpenGroup - .filter(id: threadId) - .updateAll(db, OpenGroup.Columns.imageData.set(to: data)) + { + OpenGroupManager + .roomImage( + fileId: imageId, + for: roomToken, + on: server, + existingData: openGroup.imageData, + using: dependencies + ) + // Note: We need to subscribe and receive on different threads to ensure the + // logic in 'receiveValue' doesn't result in a reentrancy database issue + .subscribe(on: OpenGroupAPI.workQueue) + .receive(on: DispatchQueue.global(qos: .default)) + .sinkUntilComplete( + receiveCompletion: { _ in + if waitForImageToComplete { + completion?() + } + }, + receiveValue: { data in + dependencies.storage.write { db in + _ = try OpenGroup + .filter(id: threadId) + .updateAll(db, OpenGroup.Columns.imageData.set(to: data)) + } } - } - ) - } - else if waitForImageToComplete { + ) + } + else if waitForImageToComplete { + completion?() + } + + // If we want to wait for the image to complete then don't call the completion here + guard !waitForImageToComplete else { return } + + // Finish completion?() } - - // If we want to wait for the image to complete then don't call the completion here - guard !waitForImageToComplete else { return } - - // Finish - completion?() } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift index 34ce81e73..693f84da7 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift @@ -35,7 +35,7 @@ extension MessageReceiver { guard let profilePictureUrl: String = profile.profilePictureUrl, let profileKey: Data = profile.profileKey - else { return .none } + else { return .remove } return .updateTo( url: profilePictureUrl, diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index b752b6464..35174c7fa 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -14,7 +14,7 @@ public final class ClosedGroupPoller: Poller { override var namespaces: [SnodeAPI.Namespace] { ClosedGroupPoller.namespaces } override var maxNodePollCount: UInt { 0 } - private static let minPollInterval: Double = 2 + private static let minPollInterval: Double = 3 private static let maxPollInterval: Double = 30 // MARK: - Initialization @@ -78,30 +78,12 @@ public final class ClosedGroupPoller: Poller { return nextPollInterval } - override func getSnodeForPolling( - for publicKey: String - ) -> AnyPublisher { - return SnodeAPI.getSwarm(for: publicKey) - .tryMap { swarm -> Snode in - guard let snode: Snode = swarm.randomElement() else { - throw OnionRequestAPIError.insufficientSnodes - } - - return snode - } - .eraseToAnyPublisher() - } - override func handlePollError( _ error: Error, for publicKey: String, using dependencies: SMKDependencies = SMKDependencies() - ) { + ) -> Bool { SNLog("Polling failed for closed group with public key: \(publicKey) due to error: \(error).") - - // Try to restart the poller from scratch - Threading.pollerQueue.async { [weak self] in - self?.setUpPolling(for: publicKey, using: dependencies) - } + return true } } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/CurrentUserPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/CurrentUserPoller.swift index b144f9479..02b793160 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/CurrentUserPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/CurrentUserPoller.swift @@ -11,9 +11,6 @@ public final class CurrentUserPoller: Poller { public static var namespaces: [SnodeAPI.Namespace] = [ .default, .configUserProfile, .configContacts, .configConvoInfoVolatile, .configUserGroups ] - - private var targetSnode: Atomic = Atomic(nil) - private var usedSnodes: Atomic> = Atomic([]) // MARK: - Settings @@ -63,53 +60,16 @@ public final class CurrentUserPoller: Poller { return min(maxRetryInterval, nextDelay) } - override func getSnodeForPolling( - for publicKey: String - ) -> AnyPublisher { - if let targetSnode: Snode = self.targetSnode.wrappedValue { - return Just(targetSnode) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - } - - // Used the cached swarm for the given key and update the list of unusedSnodes - let swarm: Set = (SnodeAPI.swarmCache.wrappedValue[publicKey] ?? []) - let unusedSnodes: Set = swarm.subtracting(usedSnodes.wrappedValue) - - // randomElement() uses the system's default random generator, which is cryptographically secure - if let nextSnode: Snode = unusedSnodes.randomElement() { - self.targetSnode.mutate { $0 = nextSnode } - self.usedSnodes.mutate { $0.insert(nextSnode) } - - return Just(nextSnode) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - } - - // If we haven't retrieved a target snode at this point then either the cache - // is empty or we have used all of the snodes and need to start from scratch - return SnodeAPI.getSwarm(for: publicKey) - .tryFlatMap { [weak self] _ -> AnyPublisher in - guard let strongSelf = self else { throw SnodeAPIError.generic } - - self?.targetSnode.mutate { $0 = nil } - self?.usedSnodes.mutate { $0.removeAll() } - - return strongSelf.getSnodeForPolling(for: publicKey) - } - .eraseToAnyPublisher() - } - override func handlePollError( _ error: Error, for publicKey: String, using dependencies: SMKDependencies = SMKDependencies() - ) { + ) -> Bool { if UserDefaults.sharedLokiProject?[.isMainAppActive] != true { // Do nothing when an error gets throws right after returning from the background (happens frequently) } else if let targetSnode: Snode = targetSnode.wrappedValue { - SNLog("Polling \(targetSnode) failed; dropping it and switching to next snode.") + SNLog("Main Poller polling \(targetSnode) failed; dropping it and switching to next snode.") self.targetSnode.mutate { $0 = nil } SnodeAPI.dropSnodeFromSwarmIfNeeded(targetSnode, publicKey: publicKey) } @@ -117,9 +77,6 @@ public final class CurrentUserPoller: Poller { SNLog("Polling failed due to having no target service node.") } - // Try to restart the poller from scratch - Threading.pollerQueue.async { [weak self] in - self?.setUpPolling(for: publicKey, using: dependencies) - } + return true } } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift index a4cce3c35..48cb16f64 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift @@ -57,49 +57,42 @@ extension OpenGroupAPI { ) { guard hasStarted else { return } - dependencies.storage - .readPublisher { [server = server] db in - try OpenGroup - .filter(OpenGroup.Columns.server == server) - .select(min(OpenGroup.Columns.pollFailureCount)) - .asRequest(of: TimeInterval.self) - .fetchOne(db) - } - .tryFlatMap { [weak self] minPollFailureCount -> AnyPublisher<(TimeInterval, TimeInterval), Error> in - guard let strongSelf = self else { throw OpenGroupAPIError.invalidPoll } - - let lastPollStart: TimeInterval = Date().timeIntervalSince1970 - let nextPollInterval: TimeInterval = Poller.getInterval( - for: (minPollFailureCount ?? 0), - minInterval: Poller.minPollInterval, - maxInterval: Poller.maxPollInterval - ) - - // Wait until the last poll completes before polling again ensuring we don't poll any faster than - // the 'nextPollInterval' value - return strongSelf.poll(using: dependencies) - .map { _ in (lastPollStart, nextPollInterval) } - .eraseToAnyPublisher() - } + let server: String = self.server + let lastPollStart: TimeInterval = Date().timeIntervalSince1970 + + poll(using: dependencies) .subscribe(on: dependencies.subscribeQueue) .receive(on: dependencies.receiveQueue) .sinkUntilComplete( - receiveValue: { [weak self] lastPollStart, nextPollInterval in + receiveCompletion: { [weak self] _ in + let minPollFailureCount: Int64 = dependencies.storage + .read { db in + try OpenGroup + .filter(OpenGroup.Columns.server == server) + .select(min(OpenGroup.Columns.pollFailureCount)) + .asRequest(of: Int64.self) + .fetchOne(db) + } + .defaulting(to: 0) + + // Calculate the remaining poll delay let currentTime: TimeInterval = Date().timeIntervalSince1970 + let nextPollInterval: TimeInterval = Poller.getInterval( + for: TimeInterval(minPollFailureCount), + minInterval: Poller.minPollInterval, + maxInterval: Poller.maxPollInterval + ) let remainingInterval: TimeInterval = max(0, nextPollInterval - (currentTime - lastPollStart)) - + + // Schedule the next poll guard remainingInterval > 0 else { return dependencies.subscribeQueue.async { self?.pollRecursively(using: dependencies) } } - - self?.timer = Timer.scheduledTimerOnMainThread(withTimeInterval: remainingInterval, repeats: false) { timer in - timer.invalidate() - - dependencies.subscribeQueue.async { - self?.pollRecursively(using: dependencies) - } + + dependencies.subscribeQueue.asyncAfter(deadline: .now() + .milliseconds(Int(remainingInterval * 1000)), qos: .default) { + self?.pollRecursively(using: dependencies) } } ) @@ -227,7 +220,7 @@ extension OpenGroupAPI { .defaulting(to: 0) var prunedIds: [String] = [] - Storage.shared.writeAsync { db in + dependencies.storage.writeAsync { db in struct Info: Decodable, FetchableRecord { let id: String let shouldBeVisible: Bool diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 480f40a07..60ae24320 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -8,11 +8,14 @@ import SessionSnodeKit import SessionUtilitiesKit public class Poller { - private var timers: Atomic<[String: Timer]> = Atomic([:]) + private var cancellables: Atomic<[String: AnyCancellable]> = Atomic([:]) internal var isPolling: Atomic<[String: Bool]> = Atomic([:]) internal var pollCount: Atomic<[String: Int]> = Atomic([:]) internal var failureCount: Atomic<[String: Int]> = Atomic([:]) + internal var targetSnode: Atomic = Atomic(nil) + private var usedSnodes: Atomic> = Atomic([]) + // MARK: - Settings /// The namespaces which this poller queries @@ -20,7 +23,7 @@ public class Poller { preconditionFailure("abstract class - override in subclass") } - /// The number of times the poller can poll before swapping to a new snode + /// The number of times the poller can poll a single snode before swapping to a new snode internal var maxNodePollCount: UInt { preconditionFailure("abstract class - override in subclass") } @@ -39,7 +42,7 @@ public class Poller { public func stopPolling(for publicKey: String) { isPolling.mutate { $0[publicKey] = false } - timers.mutate { $0[publicKey]?.invalidate() } + cancellables.mutate { $0[publicKey]?.cancel() } } // MARK: - Abstract Methods @@ -49,17 +52,13 @@ public class Poller { preconditionFailure("abstract class - override in subclass") } + /// Calculate the delay which should occur before the next poll internal func nextPollDelay(for publicKey: String) -> TimeInterval { preconditionFailure("abstract class - override in subclass") } - internal func getSnodeForPolling( - for publicKey: String - ) -> AnyPublisher { - preconditionFailure("abstract class - override in subclass") - } - - internal func handlePollError(_ error: Error, for publicKey: String, using dependencies: SMKDependencies) { + /// Perform and logic which should occur when the poll errors, will stop polling if `false` is returned + internal func handlePollError(_ error: Error, for publicKey: String, using dependencies: SMKDependencies) -> Bool { preconditionFailure("abstract class - override in subclass") } @@ -75,48 +74,65 @@ public class Poller { // and the timer is not created, if we mark the group as is polling // after setUpPolling. So the poller may not work, thus misses messages self?.isPolling.mutate { $0[publicKey] = true } - self?.setUpPolling(for: publicKey) + self?.pollRecursively(for: publicKey) } } - /// We want to initially trigger a poll against the target service node and then run the recursive polling, - /// if an error is thrown during the poll then this should automatically restart the polling - internal func setUpPolling( + internal func getSnodeForPolling( for publicKey: String, - using dependencies: SMKDependencies = SMKDependencies( - subscribeQueue: Threading.pollerQueue, - receiveQueue: Threading.pollerQueue - ) - ) { - guard isPolling.wrappedValue[publicKey] == true else { return } + using dependencies: SMKDependencies = SMKDependencies() + ) -> AnyPublisher { + // If we don't want to poll a snode multiple times then just grab a random one from the swarm + guard maxNodePollCount > 0 else { + return SnodeAPI.getSwarm(for: publicKey, using: dependencies) + .tryMap { swarm -> Snode in + try swarm.randomElement() ?? { throw OnionRequestAPIError.insufficientSnodes }() + } + .eraseToAnyPublisher() + } - let namespaces: [SnodeAPI.Namespace] = self.namespaces + // If we already have a target snode then use that + if let targetSnode: Snode = self.targetSnode.wrappedValue { + return Just(targetSnode) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } - getSnodeForPolling(for: publicKey) - .flatMap { snode -> AnyPublisher<[Message], Error> in - Poller.poll( - namespaces: namespaces, - from: snode, - for: publicKey, - poller: self, - using: dependencies - ) - } - .subscribe(on: dependencies.subscribeQueue) - .receive(on: dependencies.receiveQueue) - .sinkUntilComplete( - receiveCompletion: { [weak self] result in - switch result { - case .finished: self?.pollRecursively(for: publicKey, using: dependencies) - case .failure(let error): - guard self?.isPolling.wrappedValue[publicKey] == true else { return } - - self?.handlePollError(error, for: publicKey, using: dependencies) - } + // Select the next unused snode from the swarm (if we've used them all then clear the used list and + // start cycling through them again) + return SnodeAPI.getSwarm(for: publicKey, using: dependencies) + .tryMap { [usedSnodes = self.usedSnodes, targetSnode = self.targetSnode] swarm -> Snode in + let unusedSnodes: Set = swarm.subtracting(usedSnodes.wrappedValue) + + // If we've used all of the SNodes then clear out the used list + if unusedSnodes.isEmpty { + usedSnodes.mutate { $0.removeAll() } } - ) + + // Select the next SNode + let nextSnode: Snode = try swarm.randomElement() ?? { throw OnionRequestAPIError.insufficientSnodes }() + targetSnode.mutate { $0 = nextSnode } + usedSnodes.mutate { $0.insert(nextSnode) } + + return nextSnode + } + .eraseToAnyPublisher() } - + + internal func incrementPollCount(publicKey: String) { + guard maxNodePollCount > 0 else { return } + + let pollCount: Int = (self.pollCount.wrappedValue[publicKey] ?? 0) + self.pollCount.mutate { $0[publicKey] = (pollCount + 1) } + + // Check if we've polled the serice node too many times + guard pollCount > maxNodePollCount else { return } + + // If we have polled this service node more than the maximum allowed then clear out + // the 'targetServiceNode' value + self.targetSnode.mutate { $0 = nil } + } + private func pollRecursively( for publicKey: String, using dependencies: SMKDependencies = SMKDependencies() @@ -124,65 +140,60 @@ public class Poller { guard isPolling.wrappedValue[publicKey] == true else { return } let namespaces: [SnodeAPI.Namespace] = self.namespaces - let nextPollInterval: TimeInterval = nextPollDelay(for: publicKey) + let lastPollStart: TimeInterval = Date().timeIntervalSince1970 + let lastPollInterval: TimeInterval = nextPollDelay(for: publicKey) + let getSnodePublisher: AnyPublisher = getSnodeForPolling(for: publicKey) - timers.mutate { - $0[publicKey] = Timer.scheduledTimerOnMainThread( - withTimeInterval: nextPollInterval, - repeats: false - ) { [weak self] timer in - timer.invalidate() - - self?.getSnodeForPolling(for: publicKey) - .flatMap { snode -> AnyPublisher<[Message], Error> in - Poller.poll( - namespaces: namespaces, - from: snode, - for: publicKey, - poller: self, - using: dependencies + // Store the publisher intp the cancellables dictionary + cancellables.mutate { [weak self] cancellables in + cancellables[publicKey] = getSnodePublisher + .flatMap { snode -> AnyPublisher<[Message], Error> in + Poller.poll( + namespaces: namespaces, + from: snode, + for: publicKey, + poller: self, + using: dependencies + ) + } + .subscribe(on: dependencies.subscribeQueue) + .receive(on: dependencies.receiveQueue) + .sink( + receiveCompletion: { result in + switch result { + case .failure(let error): + // Determine if the error should stop us from polling anymore + guard self?.handlePollError(error, for: publicKey, using: dependencies) == true else { + return + } + + case .finished: break + } + + // Increment the poll count + self?.incrementPollCount(publicKey: publicKey) + + // Calculate the remaining poll delay + let currentTime: TimeInterval = Date().timeIntervalSince1970 + let nextPollInterval: TimeInterval = ( + self?.nextPollDelay(for: publicKey) ?? + lastPollInterval ) - } - .subscribe(on: dependencies.subscribeQueue) - .receive(on: dependencies.receiveQueue) - .sinkUntilComplete( - receiveCompletion: { result in - switch result { - case .failure(let error): self?.handlePollError(error, for: publicKey, using: dependencies) - case .finished: - let maxNodePollCount: UInt = (self?.maxNodePollCount ?? 0) - - // If we have polled this service node more than the - // maximum allowed then throw an error so the parent - // loop can restart the polling - if maxNodePollCount > 0 { - let pollCount: Int = (self?.pollCount.wrappedValue[publicKey] ?? 0) - self?.pollCount.mutate { $0[publicKey] = (pollCount + 1) } - - guard pollCount < maxNodePollCount else { - let newSnodeNextPollInterval: TimeInterval = (self?.nextPollDelay(for: publicKey) ?? nextPollInterval) - - self?.timers.mutate { - $0[publicKey] = Timer.scheduledTimerOnMainThread( - withTimeInterval: newSnodeNextPollInterval, - repeats: false - ) { [weak self] timer in - timer.invalidate() - - self?.pollCount.mutate { $0[publicKey] = 0 } - self?.setUpPolling(for: publicKey, using: dependencies) - } - } - return - } - } - - // Otherwise just loop - self?.pollRecursively(for: publicKey, using: dependencies) + let remainingInterval: TimeInterval = max(0, nextPollInterval - (currentTime - lastPollStart)) + + // Schedule the next poll + guard remainingInterval > 0 else { + return dependencies.subscribeQueue.async { + self?.pollRecursively(for: publicKey, using: dependencies) } } - ) - } + + dependencies.subscribeQueue.asyncAfter(deadline: .now() + .milliseconds(Int(remainingInterval * 1000)), qos: .default) { + self?.pollRecursively(for: publicKey, using: dependencies) + } + }, + receiveValue: { _ in } + ) } } @@ -199,6 +210,7 @@ public class Poller { isBackgroundPollValid: @escaping (() -> Bool) = { true }, poller: Poller? = nil, using dependencies: SMKDependencies = SMKDependencies( + subscribeQueue: Threading.pollerQueue, receiveQueue: Threading.pollerQueue ) ) -> AnyPublisher<[Message], Error> { diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+ConvoInfoVolatile.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+ConvoInfoVolatile.swift index c504a0c35..7e83611db 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+ConvoInfoVolatile.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+ConvoInfoVolatile.swift @@ -80,7 +80,8 @@ internal extension SessionUtil { try Interaction .filter( Interaction.Columns.threadId == threadId && - Interaction.Columns.timestampMs <= lastReadTimestampMs + Interaction.Columns.timestampMs <= lastReadTimestampMs && + Interaction.Columns.wasRead == false ) .updateAll( // Handling a config update so don't use `updateAllAndConfig` db, diff --git a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift index a2640835c..ab6ae915f 100644 --- a/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift +++ b/SessionUtilitiesKit/Database/Types/PagedDatabaseObserver.swift @@ -158,6 +158,9 @@ public class PagedDatabaseObserver: TransactionObserver where /// to avoid blocking the DBWrite thread we dispatch to a serial `commitProcessingQueue` to process the incoming changes (in the past not doing /// so was resulting in hanging when there was a lot of activity happening) public func databaseDidCommit(_ db: Database) { + // If there were no pending changes in the commit then do nothing + guard !self.changesInCommit.wrappedValue.isEmpty else { return } + // Since we can't be sure the behaviours of 'databaseDidChange' and 'databaseDidCommit' won't change in // the future we extract and clear the values in 'changesInCommit' since it's 'Atomic' so will different // threads modifying the data resulting in us missing a change @@ -174,9 +177,6 @@ public class PagedDatabaseObserver: TransactionObserver where } private func processDatabaseCommit(committedChanges: Set) { - // Do nothing when there are no changes - guard !committedChanges.isEmpty else { return } - typealias AssociatedDataInfo = [(hasChanges: Bool, data: ErasedAssociatedRecord)] typealias UpdatedData = (cache: DataCache, pageInfo: PagedData.PageInfo, hasChanges: Bool, associatedData: AssociatedDataInfo)