make update message expiration a seperate job from disappearing messages job

pull/941/head
Ryan Zhao 1 year ago
parent 5063feeb6a
commit 754279e6ab

@ -129,6 +129,7 @@
7B7CB18E270D066F0079FF93 /* IncomingCallBanner.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB18D270D066F0079FF93 /* IncomingCallBanner.swift */; };
7B7CB190270FB2150079FF93 /* MiniCallView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB18F270FB2150079FF93 /* MiniCallView.swift */; };
7B7CB192271508AD0079FF93 /* CallRingTonePlayer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB191271508AD0079FF93 /* CallRingTonePlayer.swift */; };
7B7E5B522A4D024C00A8208E /* ExpirationUpdateJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7E5B512A4D024C00A8208E /* ExpirationUpdateJob.swift */; };
7B81682328A4C1210069F315 /* UpdateTypes.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682228A4C1210069F315 /* UpdateTypes.swift */; };
7B81682828B310D50069F315 /* _007_HomeQueryOptimisationIndexes.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682728B310D50069F315 /* _007_HomeQueryOptimisationIndexes.swift */; };
7B81682A28B6F1420069F315 /* ReactionResponse.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682928B6F1420069F315 /* ReactionResponse.swift */; };
@ -1260,6 +1261,7 @@
7B7CB18D270D066F0079FF93 /* IncomingCallBanner.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IncomingCallBanner.swift; sourceTree = "<group>"; };
7B7CB18F270FB2150079FF93 /* MiniCallView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MiniCallView.swift; sourceTree = "<group>"; };
7B7CB191271508AD0079FF93 /* CallRingTonePlayer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CallRingTonePlayer.swift; sourceTree = "<group>"; };
7B7E5B512A4D024C00A8208E /* ExpirationUpdateJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ExpirationUpdateJob.swift; sourceTree = "<group>"; };
7B81682228A4C1210069F315 /* UpdateTypes.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UpdateTypes.swift; sourceTree = "<group>"; };
7B81682728B310D50069F315 /* _007_HomeQueryOptimisationIndexes.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = _007_HomeQueryOptimisationIndexes.swift; sourceTree = "<group>"; };
7B81682928B6F1420069F315 /* ReactionResponse.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReactionResponse.swift; sourceTree = "<group>"; };
@ -4360,6 +4362,7 @@
C352A35A2557824E00338F3E /* AttachmentUploadJob.swift */,
7B521E0929BFF84400C3C36A /* GroupLeavingJob.swift */,
FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */,
7B7E5B512A4D024C00A8208E /* ExpirationUpdateJob.swift */,
);
path = Types;
sourceTree = "<group>";
@ -5815,6 +5818,7 @@
FD716E6A2850327900C96BF4 /* EndCallMode.swift in Sources */,
FDF0B75C2807F41D004C14C5 /* MessageSender+Convenience.swift in Sources */,
7B81682A28B6F1420069F315 /* ReactionResponse.swift in Sources */,
7B7E5B522A4D024C00A8208E /* ExpirationUpdateJob.swift in Sources */,
FD09799727FFA84A00936362 /* RecipientState.swift in Sources */,
FDA8EB00280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift in Sources */,
FD09798927FD1C5A00936362 /* OpenGroup.swift in Sources */,

@ -60,5 +60,6 @@ public enum SNMessagingKit { // Just to make the external API nice
JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload)
JobRunner.add(executor: ConfigurationSyncJob.self, for: .configurationSync)
JobRunner.add(executor: ConfigMessageReceiveJob.self, for: .configMessageReceive)
JobRunner.add(executor: ExpirationUpdateJob.self, for: .expirationUpdate)
}
}

@ -117,14 +117,14 @@ public extension DisappearingMessagesJob {
.map { (_, response) in
Storage.shared.writeAsync { db in
try response.expiries.forEach { hash, expireAtMs in
guard let expireInSeconds: TimeInterval = expirationInfo[hash] else { return }
let startAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expireInSeconds * 1000))
guard let expiresInSeconds: TimeInterval = expirationInfo[hash] else { return }
let expiresStartedAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expiresInSeconds * 1000))
_ = try Interaction
.filter(Interaction.Columns.serverHash == hash)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: startAtMs)
Interaction.Columns.expiresStartedAtMs.set(to: expiresStartedAtMs)
)
guard let index = expirationInfo.index(forKey: hash) else { return }
@ -192,49 +192,19 @@ public extension DisappearingMessagesJob {
// If there were no changes then none of the provided `interactionIds` are expiring messages
guard (changeCount ?? 0) > 0 else { return nil }
let userPublicKey: String = getUserHexEncodedPublicKey(db)
let updateExpiryPublishers: [AnyPublisher<[String: UpdateExpiryResponseResult], Error>] = interactionExpirationInfosByExpiresInSeconds
.map { expiresInSeconds, expirationInfos -> AnyPublisher<[String: UpdateExpiryResponseResult], Error> in
let expirationTimestampMs: Int64 = Int64(startedAtMs + expiresInSeconds * 1000)
return SnodeAPI
.updateExpiry(
publicKey: userPublicKey,
interactionExpirationInfosByExpiresInSeconds.forEach { expiresInSeconds, expirationInfos in
let expirationTimestampMs: Int64 = Int64(startedAtMs + expiresInSeconds * 1000)
JobRunner.add(
db,
job: Job(
variant: .expirationUpdate,
details: ExpirationUpdateJob.Details(
serverHashes: expirationInfos.map { $0.serverHash },
updatedExpiryMs: expirationTimestampMs,
shortenOnly: true
expirationTimestampMs: expirationTimestampMs
)
}
Publishers
.MergeMany(updateExpiryPublishers)
.collect()
.sinkUntilComplete(
receiveValue: { allResults in
guard
let results: [UpdateExpiryResponseResult] = allResults
.compactMap({ result in result.first(where: { _, value in !value.didError })?.value })
.nullIfEmpty(),
let unchangedMessages: [UInt64: [String]] = results
.reduce([:], { result, next in result.updated(with: next.unchanged) })
.groupedByValue()
.nullIfEmpty()
else { return }
Storage.shared.writeAsync { db in
try unchangedMessages.forEach { updatedExpiry, hashes in
let expiresInSeconds: TimeInterval = ((TimeInterval(updatedExpiry) - startedAtMs) / 1000)
_ = try Interaction
.filter(hashes.contains(Interaction.Columns.serverHash))
.updateAll(
db,
Interaction.Columns.expiresInSeconds.set(to: expiresInSeconds)
)
}
}
}
)
)
}
return updateNextRunIfNeeded(db)
}

@ -0,0 +1,136 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import Combine
import GRDB
import SessionUtilitiesKit
import SessionSnodeKit
public enum ExpirationUpdateJob: JobExecutor {
public static var maxFailureCount: Int = -1
public static var requiresThreadId: Bool = false
public static var requiresInteractionId: Bool = false
public static func run(
_ job: SessionUtilitiesKit.Job,
queue: DispatchQueue,
success: @escaping (SessionUtilitiesKit.Job, Bool) -> (),
failure: @escaping (SessionUtilitiesKit.Job, Error?, Bool) -> (),
deferred: @escaping (SessionUtilitiesKit.Job) -> ()
) {
guard
let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData)
else {
SNLog("[ExpirationUpdateJob] Failing due to missing details")
failure(job, JobRunnerError.missingRequiredDetails, true)
return
}
let userPublicKey: String = getUserHexEncodedPublicKey()
SnodeAPI
.updateExpiry(
publicKey: userPublicKey,
serverHashes: details.serverHashes,
updatedExpiryMs: details.expirationTimestampMs,
shortenOnly: true
)
.subscribe(on: queue)
.receive(on: queue)
.map { response -> [UInt64: [String]] in
guard
let results: [UpdateExpiryResponseResult] = response
.compactMap({ _, value in value.didError ? nil : value })
.nullIfEmpty(),
let unchangedMessages: [UInt64: [String]] = results
.reduce([:], { result, next in result.updated(with: next.unchanged) })
.groupedByValue()
.nullIfEmpty()
else {
return [:]
}
return unchangedMessages
}
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .finished:
success(job, false)
case .failure(let error):
failure(job, error, true)
}
},
receiveValue: { unchangedMessages in
guard !unchangedMessages.isEmpty else { return }
Storage.shared.writeAsync { db in
try unchangedMessages.forEach { updatedExpiry, hashes in
try hashes.forEach { hash in
guard
let expiresInSeconds: TimeInterval = try? Interaction
.filter(Interaction.Columns.serverHash == hash)
.select(Interaction.Columns.expiresInSeconds)
.asRequest(of: TimeInterval.self)
.fetchOne(db)
else {
return
}
let expiresStartedAtMs: TimeInterval = TimeInterval(updatedExpiry - UInt64(expiresInSeconds * 1000))
_ = try Interaction
.filter(Interaction.Columns.serverHash == hash)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: expiresStartedAtMs)
)
}
}
}
}
)
}
}
// MARK: - ExpirationUpdateJob.Details
extension ExpirationUpdateJob {
public struct Details: Codable {
private enum CodingKeys: String, CodingKey {
case serverHashes
case expirationTimestampMs
}
public let serverHashes: [String]
public let expirationTimestampMs: Int64
// MARK: - Initialization
public init(
serverHashes: [String],
expirationTimestampMs: Int64
) {
self.serverHashes = serverHashes
self.expirationTimestampMs = expirationTimestampMs
}
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
self = Details(
serverHashes: try container.decode([String].self, forKey: .serverHashes),
expirationTimestampMs: try container.decode(Int64.self, forKey: .expirationTimestampMs)
)
}
public func encode(to encoder: Encoder) throws {
var container: KeyedEncodingContainer<CodingKeys> = encoder.container(keyedBy: CodingKeys.self)
try container.encode(serverHashes, forKey: .serverHashes)
try container.encode(expirationTimestampMs, forKey: .expirationTimestampMs)
}
}
}

@ -116,6 +116,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord,
/// config state with the changes; this job will generally be scheduled along since a `messageReceive` job
/// and will block the standard message receive job
case configMessageReceive
/// This is a job that runs once whenever disappearing after read messages are read and needed to update the
/// expiration on the network
case expirationUpdate
}
public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {

@ -77,7 +77,8 @@ public final class JobRunner {
jobVariants.remove(.notifyPushServer),
jobVariants.remove(.sendReadReceipts),
jobVariants.remove(.groupLeaving),
jobVariants.remove(.configurationSync)
jobVariants.remove(.configurationSync),
jobVariants.remove(.expirationUpdate)
].compactMap { $0 }
)
let messageReceiveQueue: JobQueue = JobQueue(

Loading…
Cancel
Save