fix sync expiries job

pull/941/head
Ryan Zhao 2 years ago
parent f736e4709b
commit 3368f184de

@ -99,7 +99,7 @@ public extension DisappearingMessagesJob {
)
if !interactionIdsByExpiresInSeconds.isEmpty {
JobRunner.add(
JobRunner.upsert(
db,
job: Job(
variant: .syncExpires,

@ -45,47 +45,41 @@ public enum SyncExpiriesJob: JobExecutor {
let expirationTimestamp: Int64 = Int64(ceil(details.startedAtMs + expiresInSeconds * 1000))
let userPublicKey: String = getUserHexEncodedPublicKey()
// Send SyncExpiriesMessage
let syncTarget: String = interactions[0].authorId
let syncExpiries: [SyncedExpiriesMessage.SyncedExpiry] = serverHashes.map { serverHash in
return SyncedExpiriesMessage.SyncedExpiry(
serverHash: serverHash,
expirationTimestamp: expirationTimestamp)
}
let syncExpiriesMessage = SyncedExpiriesMessage(
conversationExpiries: [syncTarget: syncExpiries]
)
Storage.shared.writeAsync { db in
MessageSender
.send(
db,
message: syncExpiriesMessage,
threadId: details.threadId,
interactionId: nil,
to: .contact(publicKey: userPublicKey)
)
}
// Update the ttls
SnodeAPI.updateExpiry(
publicKey: userPublicKey,
updatedExpiryMs: expirationTimestamp,
serverHashes: serverHashes
)
.done(on: queue) { _ in
// Send SyncExpiriesMessage
let syncTarget: String = interactions[0].authorId
let syncExpiries: [SyncedExpiriesMessage.SyncedExpiry] = serverHashes.map { serverHash in
return SyncedExpiriesMessage.SyncedExpiry(
serverHash: serverHash,
expirationTimestamp: expirationTimestamp)
}
let syncExpiriesMessage = SyncedExpiriesMessage(
conversationExpiries: [syncTarget: syncExpiries]
)
Storage.shared.write { db in
MessageSender
.send(
db,
message: syncExpiriesMessage,
threadId: details.threadId,
interactionId: nil,
to: .contact(publicKey: userPublicKey)
)
}
success(job, false)
}
.catch(on: queue) { error in
failure(job, error, true)
}
.retainUntilComplete()
).retainUntilComplete()
}
guard interactionIdsWithNoServerHashByExpiresInSeconds.isEmpty else { return }
guard !interactionIdsWithNoServerHashByExpiresInSeconds.isEmpty else { return }
Storage.shared.writeAsync { db in
JobRunner.add(
JobRunner.upsert(
db,
job: Job(
variant: .syncExpires,

@ -89,6 +89,13 @@ public final class JobRunner {
jobVariants.remove(.attachmentDownload)
].compactMap { $0 }
)
let syncExpiriesQueue: JobQueue = JobQueue(
type: .syncExpiries,
qos: .default,
jobVariants: [
jobVariants.remove(.syncExpires)
].compactMap { $0 }
)
let generalQueue: JobQueue = JobQueue(
type: .general(number: 0),
qos: .utility,
@ -99,6 +106,7 @@ public final class JobRunner {
messageSendQueue,
messageReceiveQueue,
attachmentDownloadQueue,
syncExpiriesQueue,
generalQueue
].reduce(into: [:]) { prev, next in
next.jobVariants.forEach { variant in
@ -397,6 +405,7 @@ private final class JobQueue {
case messageSend
case messageReceive
case attachmentDownload
case syncExpiries
var name: String {
switch self {
@ -405,6 +414,7 @@ private final class JobQueue {
case .messageSend: return "MessageSend"
case .messageReceive: return "MessageReceive"
case .attachmentDownload: return "AttachmentDownload"
case .syncExpiries: return "SyncExpiries"
}
}
}

Loading…
Cancel
Save