diff --git a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift index 476590c3d..c5bea1d85 100644 --- a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift +++ b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift @@ -12,6 +12,7 @@ public enum ConfigurationSyncJob: JobExecutor { public static let requiresThreadId: Bool = true public static let requiresInteractionId: Bool = false private static let maxRunFrequency: TimeInterval = 3 + private static let waitTimeForExpirationUpdate: TimeInterval = 1 public static func run( _ job: Job, @@ -50,6 +51,27 @@ public enum ConfigurationSyncJob: JobExecutor { return deferred(updatedJob ?? job) } + // We want to update lastReadTimestamp after the disappearing messages are updated to the network, so on + // linked devices the expiration time can be the same and avoid race condition + guard + JobRunner + .infoForCurrentlyRunningJobs(of: .expirationUpdate) + .filter({ _, info in + info.threadId == job.threadId // Exclude expiration update jobs for different threads + }) + .isEmpty + else { + // Defer the job to run 'maxRunFrequency' + let updatedJob: Job? = Storage.shared.write { db in + try job + .with(nextRunTimestamp: Date().timeIntervalSince1970 + waitTimeForExpirationUpdate) + .saved(db) + } + + SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") deferred due to expiration update jobs running.") + return deferred(updatedJob ?? job) + } + // If we don't have a userKeyPair yet then there is no need to sync the configuration // as the user doesn't exist yet (this will get triggered on the first launch of a // fresh install due to the migrations getting run) diff --git a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift index fa77ea935..c649b670d 100644 --- a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift +++ b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift @@ -83,8 +83,7 @@ public extension DisappearingMessagesJob { let expiresInSeconds: TimeInterval let serverHash: String } - - var expirationInfo: [String: TimeInterval] = (try? Interaction + let interactionIds: [Int64] = (try? Interaction .filter( Interaction.Columns.threadId == threadId && Interaction.Columns.timestampMs <= lastReadTimestampMs && @@ -92,66 +91,17 @@ public extension DisappearingMessagesJob { Interaction.Columns.expiresStartedAtMs == nil ) .select( - Interaction.Columns.expiresInSeconds, - Interaction.Columns.serverHash + Interaction.Columns.id ) - .asRequest(of: ExpirationInfo.self) + .asRequest(of: Int64.self) .fetchAll(db)) .defaulting(to: []) - .grouped(by: \.serverHash) - .compactMapValues{ $0.first?.expiresInSeconds } - - // If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages - guard (expirationInfo.count > 0) else { return } - let timestampMs: Int64 = SnodeAPI.currentOffsetTimestampMs() - - let userPublicKey: String = getUserHexEncodedPublicKey(db) - SnodeAPI.getSwarm(for: userPublicKey) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } - return SnodeAPI.getExpiries( - from: snode, - associatedWith: userPublicKey, - of: expirationInfo.map { $0.key } - ) - .map { (_, response) in - Storage.shared.writeAsync { db in - try response.expiries.forEach { hash, expireAtMs in - guard let expiresInSeconds: TimeInterval = expirationInfo[hash] else { return } - let expiresStartedAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expiresInSeconds * 1000)) - - _ = try Interaction - .filter(Interaction.Columns.serverHash == hash) - .updateAll( - db, - Interaction.Columns.expiresStartedAtMs.set(to: expiresStartedAtMs) - ) - - guard let index = expirationInfo.index(forKey: hash) else { return } - expirationInfo.remove(at: index) - } - - try expirationInfo.forEach { key, _ in - _ = try Interaction - .filter(Interaction.Columns.serverHash == key) - .updateAll( - db, - Interaction.Columns.expiresStartedAtMs.set(to: timestampMs) - ) - } - - JobRunner.upsert( - db, - job: updateNextRunIfNeeded(db) - ) - } - } - .mapError { error in - return error - } - .eraseToAnyPublisher() - } - .sinkUntilComplete() + + guard !interactionIds.isEmpty else { return } + + let startedAtMs: Double = Double(SnodeAPI.currentOffsetTimestampMs()) + + updateNextRunIfNeeded(db, interactionIds: interactionIds, startedAtMs: startedAtMs, threadId: threadId) } @discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? { @@ -194,10 +144,12 @@ public extension DisappearingMessagesJob { interactionExpirationInfosByExpiresInSeconds.forEach { expiresInSeconds, expirationInfos in let expirationTimestampMs: Int64 = Int64(startedAtMs + expiresInSeconds * 1000) - JobRunner.upsert( + JobRunner.add( db, job: Job( variant: .expirationUpdate, + behaviour: .runOnce, + threadId: threadId, details: ExpirationUpdateJob.Details( serverHashes: expirationInfos.map { $0.serverHash }, expirationTimestampMs: expirationTimestampMs diff --git a/SessionMessagingKit/Jobs/Types/ExpirationUpdateJob.swift b/SessionMessagingKit/Jobs/Types/ExpirationUpdateJob.swift index 24c9628a6..871fdc837 100644 --- a/SessionMessagingKit/Jobs/Types/ExpirationUpdateJob.swift +++ b/SessionMessagingKit/Jobs/Types/ExpirationUpdateJob.swift @@ -8,7 +8,7 @@ import SessionSnodeKit public enum ExpirationUpdateJob: JobExecutor { public static var maxFailureCount: Int = -1 - public static var requiresThreadId: Bool = false + public static var requiresThreadId: Bool = true public static var requiresInteractionId: Bool = false public static func run( diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index af935d9a8..ac62a022e 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -67,6 +67,14 @@ public final class JobRunner { private static let queues: Atomic<[Job.Variant: JobQueue]> = { var jobVariants: Set = Job.Variant.allCases.asSet() + let expirationUpdateQueue: JobQueue = JobQueue( + type: .expirationUpdate, + executionType: .concurrent, // Allow as many jobs to run at once as supported by the device + qos: .default, + jobVariants: [ + jobVariants.remove(.expirationUpdate) + ].compactMap { $0 } + ) let messageSendQueue: JobQueue = JobQueue( type: .messageSend, executionType: .concurrent, // Allow as many jobs to run at once as supported by the device @@ -77,8 +85,7 @@ public final class JobRunner { jobVariants.remove(.notifyPushServer), jobVariants.remove(.sendReadReceipts), jobVariants.remove(.groupLeaving), - jobVariants.remove(.configurationSync), - jobVariants.remove(.expirationUpdate) + jobVariants.remove(.configurationSync) ].compactMap { $0 } ) let messageReceiveQueue: JobQueue = JobQueue( @@ -109,6 +116,7 @@ public final class JobRunner { ) return Atomic([ + expirationUpdateQueue, messageSendQueue, messageReceiveQueue, attachmentDownloadQueue, @@ -471,6 +479,7 @@ public final class JobQueue { case messageSend case messageReceive case attachmentDownload + case expirationUpdate var name: String { switch self { @@ -479,6 +488,7 @@ public final class JobQueue { case .messageSend: return "MessageSend" case .messageReceive: return "MessageReceive" case .attachmentDownload: return "AttachmentDownload" + case .expirationUpdate: return "ExpirationUpdate" } } }