diff --git a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift index 53b39a8bc..63be3d275 100644 --- a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift +++ b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift @@ -99,7 +99,7 @@ public extension DisappearingMessagesJob { ) if !interactionIdsByExpiresInSeconds.isEmpty { - JobRunner.add( + JobRunner.upsert( db, job: Job( variant: .syncExpires, diff --git a/SessionMessagingKit/Jobs/Types/SyncExpiriesJob.swift b/SessionMessagingKit/Jobs/Types/SyncExpiriesJob.swift index c2a1bb7fa..d9a00ffd3 100644 --- a/SessionMessagingKit/Jobs/Types/SyncExpiriesJob.swift +++ b/SessionMessagingKit/Jobs/Types/SyncExpiriesJob.swift @@ -45,47 +45,41 @@ public enum SyncExpiriesJob: JobExecutor { let expirationTimestamp: Int64 = Int64(ceil(details.startedAtMs + expiresInSeconds * 1000)) let userPublicKey: String = getUserHexEncodedPublicKey() + // Send SyncExpiriesMessage + let syncTarget: String = interactions[0].authorId + let syncExpiries: [SyncedExpiriesMessage.SyncedExpiry] = serverHashes.map { serverHash in + return SyncedExpiriesMessage.SyncedExpiry( + serverHash: serverHash, + expirationTimestamp: expirationTimestamp) + } + + let syncExpiriesMessage = SyncedExpiriesMessage( + conversationExpiries: [syncTarget: syncExpiries] + ) + + Storage.shared.writeAsync { db in + MessageSender + .send( + db, + message: syncExpiriesMessage, + threadId: details.threadId, + interactionId: nil, + to: .contact(publicKey: userPublicKey) + ) + } + // Update the ttls SnodeAPI.updateExpiry( publicKey: userPublicKey, updatedExpiryMs: expirationTimestamp, serverHashes: serverHashes - ) - .done(on: queue) { _ in - // Send SyncExpiriesMessage - let syncTarget: String = interactions[0].authorId - let syncExpiries: [SyncedExpiriesMessage.SyncedExpiry] = serverHashes.map { serverHash in - return SyncedExpiriesMessage.SyncedExpiry( - serverHash: serverHash, - expirationTimestamp: expirationTimestamp) - } - - let syncExpiriesMessage = SyncedExpiriesMessage( - conversationExpiries: [syncTarget: syncExpiries] - ) - - Storage.shared.write { db in - MessageSender - .send( - db, - message: syncExpiriesMessage, - threadId: details.threadId, - interactionId: nil, - to: .contact(publicKey: userPublicKey) - ) - } - success(job, false) - } - .catch(on: queue) { error in - failure(job, error, true) - } - .retainUntilComplete() + ).retainUntilComplete() } - guard interactionIdsWithNoServerHashByExpiresInSeconds.isEmpty else { return } + guard !interactionIdsWithNoServerHashByExpiresInSeconds.isEmpty else { return } Storage.shared.writeAsync { db in - JobRunner.add( + JobRunner.upsert( db, job: Job( variant: .syncExpires, diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 065ad3ba1..135b63fa1 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -89,6 +89,13 @@ public final class JobRunner { jobVariants.remove(.attachmentDownload) ].compactMap { $0 } ) + let syncExpiriesQueue: JobQueue = JobQueue( + type: .syncExpiries, + qos: .default, + jobVariants: [ + jobVariants.remove(.syncExpires) + ].compactMap { $0 } + ) let generalQueue: JobQueue = JobQueue( type: .general(number: 0), qos: .utility, @@ -99,6 +106,7 @@ public final class JobRunner { messageSendQueue, messageReceiveQueue, attachmentDownloadQueue, + syncExpiriesQueue, generalQueue ].reduce(into: [:]) { prev, next in next.jobVariants.forEach { variant in @@ -397,6 +405,7 @@ private final class JobQueue { case messageSend case messageReceive case attachmentDownload + case syncExpiries var name: String { switch self { @@ -405,6 +414,7 @@ private final class JobQueue { case .messageSend: return "MessageSend" case .messageReceive: return "MessageReceive" case .attachmentDownload: return "AttachmentDownload" + case .syncExpiries: return "SyncExpiries" } } }