refactor: make sync expiries to a job & only save sync message server hash for outgoing messages

pull/941/head
Ryan Zhao 2 years ago
parent 4c5bfe258d
commit f736e4709b

@ -143,6 +143,7 @@
7BA68909272A27BE00EFC32F /* SessionCall.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BA68908272A27BE00EFC32F /* SessionCall.swift */; };
7BA6890D27325CCC00EFC32F /* SessionCallManager+CXCallController.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BA6890C27325CCC00EFC32F /* SessionCallManager+CXCallController.swift */; };
7BA6890F27325CE300EFC32F /* SessionCallManager+CXProvider.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BA6890E27325CE300EFC32F /* SessionCallManager+CXProvider.swift */; };
7BA7F0EF2942C33600CF6B05 /* SyncExpiriesJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BA7F0EE2942C33600CF6B05 /* SyncExpiriesJob.swift */; };
7BAA7B6628D2DE4700AE1489 /* _009_OpenGroupPermission.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BAA7B6528D2DE4700AE1489 /* _009_OpenGroupPermission.swift */; };
7BAADFCC27B0EF23007BCF92 /* CallVideoView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BAADFCB27B0EF23007BCF92 /* CallVideoView.swift */; };
7BAADFCE27B215FE007BCF92 /* UIView+Draggable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7BAADFCD27B215FE007BCF92 /* UIView+Draggable.swift */; };
@ -1217,6 +1218,7 @@
7BA68908272A27BE00EFC32F /* SessionCall.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionCall.swift; sourceTree = "<group>"; };
7BA6890C27325CCC00EFC32F /* SessionCallManager+CXCallController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SessionCallManager+CXCallController.swift"; sourceTree = "<group>"; };
7BA6890E27325CE300EFC32F /* SessionCallManager+CXProvider.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SessionCallManager+CXProvider.swift"; sourceTree = "<group>"; };
7BA7F0EE2942C33600CF6B05 /* SyncExpiriesJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SyncExpiriesJob.swift; sourceTree = "<group>"; };
7BAA7B6528D2DE4700AE1489 /* _009_OpenGroupPermission.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _009_OpenGroupPermission.swift; sourceTree = "<group>"; };
7BAADFCB27B0EF23007BCF92 /* CallVideoView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CallVideoView.swift; sourceTree = "<group>"; };
7BAADFCD27B215FE007BCF92 /* UIView+Draggable.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "UIView+Draggable.swift"; sourceTree = "<group>"; };
@ -4151,6 +4153,7 @@
FDF0B74E28079E5E004C14C5 /* SendReadReceiptsJob.swift */,
C352A348255781F400338F3E /* AttachmentDownloadJob.swift */,
C352A35A2557824E00338F3E /* AttachmentUploadJob.swift */,
7BA7F0EE2942C33600CF6B05 /* SyncExpiriesJob.swift */,
);
path = Types;
sourceTree = "<group>";
@ -5475,6 +5478,7 @@
FD09797F27FCFBFF00936362 /* OWSAES256Key+Utilities.swift in Sources */,
FDB4BBC72838B91E00B7C95D /* LinkPreviewError.swift in Sources */,
FD09798327FD1A1500936362 /* ClosedGroup.swift in Sources */,
7BA7F0EF2942C33600CF6B05 /* SyncExpiriesJob.swift in Sources */,
B8B320B7258C30D70020074B /* HTMLMetadata.swift in Sources */,
FD09798727FD1B7800936362 /* GroupMember.swift in Sources */,
FDB4BBC92839BEF000B7C95D /* ProfileManagerError.swift in Sources */,

@ -45,5 +45,6 @@ public enum SNMessagingKit { // Just to make the external API nice
JobRunner.add(executor: SendReadReceiptsJob.self, for: .sendReadReceipts)
JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload)
JobRunner.add(executor: AttachmentUploadJob.self, for: .attachmentUpload)
JobRunner.add(executor: SyncExpiriesJob.self, for: .syncExpires)
}
}

@ -489,7 +489,8 @@ public extension Interaction {
job: DisappearingMessagesJob.updateNextRunIfNeeded(
db,
interactionIds: interactionIds,
startedAtMs: (Date().timeIntervalSince1970 * 1000)
startedAtMs: (Date().timeIntervalSince1970 * 1000),
threadId: threadId
)
)

@ -67,7 +67,7 @@ public extension DisappearingMessagesJob {
.saved(db)
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double) -> Job? {
@discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? {
let interactionsByExpiresInSeconds: [TimeInterval?: [Interaction]]? = try? Interaction
.filter(interactionIds.contains(Interaction.Columns.id))
.filter(
@ -91,42 +91,25 @@ public extension DisappearingMessagesJob {
guard (changeCount ?? 0) > 0 else { return nil }
if DisappearingMessagesConfiguration.isNewConfigurationEnabled {
interactionsByExpiresInSeconds?.forEach { expiresInSeconds, interactions in
let serverHashes = interactions.compactMap { $0.serverHash }
guard let expiresInSeconds = expiresInSeconds, !serverHashes.isEmpty else { return }
let expirationTimestamp: Int64 = Int64(ceil(startedAtMs + expiresInSeconds * 1000))
let userPublicKey: String = getUserHexEncodedPublicKey(db)
let threadId: String = interactions[0].threadId
// Send SyncExpiriesMessage
let syncTarget: String = interactions[0].authorId
let syncExpiries: [SyncedExpiriesMessage.SyncedExpiry] = serverHashes.map { serverHash in
return SyncedExpiriesMessage.SyncedExpiry(
serverHash: serverHash,
expirationTimestamp: expirationTimestamp)
}
let syncExpiriesMessage = SyncedExpiriesMessage(
conversationExpiries: [syncTarget: syncExpiries]
)
MessageSender
.send(
db,
message: syncExpiriesMessage,
threadId: threadId,
interactionId: nil,
to: .contact(publicKey: userPublicKey)
let interactionIdsByExpiresInSeconds: [TimeInterval: [Int64]] = Dictionary(
uniqueKeysWithValues: interactionsByExpiresInSeconds?.compactMap { expireInSeconds, interactions in
guard let expireInSeconds = expireInSeconds else { return nil }
return (expireInSeconds, interactions.compactMap { $0.id })
} ?? []
)
if !interactionIdsByExpiresInSeconds.isEmpty {
JobRunner.add(
db,
job: Job(
variant: .syncExpires,
details: SyncExpiriesJob.Details(
interactionIdsByExpiresInSeconds: interactionIdsByExpiresInSeconds,
startedAtMs: startedAtMs,
threadId: threadId
)
)
// Update the ttls
SnodeAPI.updateExpiry(
publicKey: userPublicKey,
updatedExpiryMs: expirationTimestamp,
serverHashes: serverHashes
)
.retainUntilComplete()
}
}
@ -146,7 +129,7 @@ public extension DisappearingMessagesJob {
throw StorageError.objectNotFound
}
return updateNextRunIfNeeded(db, interactionIds: [interactionId], startedAtMs: startedAtMs)
return updateNextRunIfNeeded(db, interactionIds: [interactionId], startedAtMs: startedAtMs, threadId: interaction.threadId)
}
catch {
SNLog("Failed to update the expiring messages timer on an interaction")

@ -0,0 +1,159 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import PromiseKit
import SignalCoreKit
import SessionUtilitiesKit
import SessionSnodeKit
public enum SyncExpiriesJob: JobExecutor {
public static let maxFailureCount: Int = 10
public static let requiresThreadId: Bool = false
public static let requiresInteractionId: Bool = false
public static func run(
_ job: Job,
queue: DispatchQueue,
success: @escaping (Job, Bool) -> (),
failure: @escaping (Job, Error?, Bool) -> (),
deferred: @escaping (Job) -> ()
) {
guard
let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData)
else {
failure(job, JobRunnerError.missingRequiredDetails, false)
return
}
guard DisappearingMessagesConfiguration.isNewConfigurationEnabled else { return }
var interactionIdsWithNoServerHashByExpiresInSeconds: [TimeInterval: [Int64]] = [:]
details.interactionIdsByExpiresInSeconds.forEach { expiresInSeconds, interactionIds in
guard let interactions = Storage.shared.read({ db in try? Interaction.fetchAll(db, ids: interactionIds) }) else { return }
let interactionIdsWithNoServerHash: [Int64] = interactions.compactMap { $0.serverHash == nil ? $0.id : nil }
if !interactionIdsWithNoServerHash.isEmpty {
interactionIdsWithNoServerHashByExpiresInSeconds[expiresInSeconds] = interactionIdsWithNoServerHash
}
let serverHashes = interactions.compactMap { $0.serverHash }
guard !serverHashes.isEmpty else { return }
let expirationTimestamp: Int64 = Int64(ceil(details.startedAtMs + expiresInSeconds * 1000))
let userPublicKey: String = getUserHexEncodedPublicKey()
// Update the ttls
SnodeAPI.updateExpiry(
publicKey: userPublicKey,
updatedExpiryMs: expirationTimestamp,
serverHashes: serverHashes
)
.done(on: queue) { _ in
// Send SyncExpiriesMessage
let syncTarget: String = interactions[0].authorId
let syncExpiries: [SyncedExpiriesMessage.SyncedExpiry] = serverHashes.map { serverHash in
return SyncedExpiriesMessage.SyncedExpiry(
serverHash: serverHash,
expirationTimestamp: expirationTimestamp)
}
let syncExpiriesMessage = SyncedExpiriesMessage(
conversationExpiries: [syncTarget: syncExpiries]
)
Storage.shared.write { db in
MessageSender
.send(
db,
message: syncExpiriesMessage,
threadId: details.threadId,
interactionId: nil,
to: .contact(publicKey: userPublicKey)
)
}
success(job, false)
}
.catch(on: queue) { error in
failure(job, error, true)
}
.retainUntilComplete()
}
guard interactionIdsWithNoServerHashByExpiresInSeconds.isEmpty else { return }
Storage.shared.writeAsync { db in
JobRunner.add(
db,
job: Job(
variant: .syncExpires,
details: SyncExpiriesJob.Details(
interactionIdsByExpiresInSeconds: interactionIdsWithNoServerHashByExpiresInSeconds,
startedAtMs: details.startedAtMs,
threadId: details.threadId
)
)
)
}
}
}
// MARK: - SyncExpiriesJob.Details
extension SyncExpiriesJob {
public struct Details: Codable {
private enum CodingKeys: String, CodingKey {
case interactionIdsByExpiresInSeconds
case startedAtMs
case threadId
}
public let interactionIdsByExpiresInSeconds: [TimeInterval: [Int64]]
public let startedAtMs: Double
public let threadId: String
// MARK: - Initialization
public init(
interactionIdsByExpiresInSeconds: [TimeInterval: [Int64]],
startedAtMs: Double,
threadId: String
) {
self.interactionIdsByExpiresInSeconds = interactionIdsByExpiresInSeconds
self.startedAtMs = startedAtMs
self.threadId = threadId
}
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
self = Details(
interactionIdsByExpiresInSeconds: try container.decode(
[TimeInterval: [Int64]].self,
forKey: .interactionIdsByExpiresInSeconds
),
startedAtMs: try container.decode(
Double.self,
forKey: .startedAtMs
),
threadId: try container.decode(
String.self,
forKey: .threadId
)
)
}
public func encode(to encoder: Encoder) throws {
var container: KeyedEncodingContainer<CodingKeys> = encoder.container(keyedBy: CodingKeys.self)
try container.encode(interactionIdsByExpiresInSeconds, forKey: .interactionIdsByExpiresInSeconds)
try container.encode(startedAtMs, forKey: .startedAtMs)
try container.encode(threadId, forKey: .threadId)
}
}
}

@ -591,21 +591,21 @@ public final class MessageSender {
// Get the visible message if possible
if let interaction: Interaction = interaction {
// When the sync message is successfully sent, the hash value of this TSOutgoingMessage
// will be replaced by the hash value of the sync message. Since the hash value of the
// real message has no use when we delete a message. It is OK to let it be.
try interaction.with(
serverHash: message.serverHash,
// Track the open group server message ID and update server timestamp (use server
// timestamp for open group messages otherwise the quote messages may not be able
// to be found by the timestamp on other devices
timestampMs: (message.openGroupServerMessageId == nil ?
nil :
serverTimestampMs.map { Int64($0) }
),
openGroupServerMessageId: message.openGroupServerMessageId.map { Int64($0) }
).update(db)
// Only store the server hash of a sync message if the message is self send valid
if (message.isSelfSendValid && isSyncMessage || !message.isSelfSendValid) {
try interaction.with(
serverHash: message.serverHash,
// Track the open group server message ID and update server timestamp (use server
// timestamp for open group messages otherwise the quote messages may not be able
// to be found by the timestamp on other devices
timestampMs: (message.openGroupServerMessageId == nil ?
nil :
serverTimestampMs.map { Int64($0) }
),
openGroupServerMessageId: message.openGroupServerMessageId.map { Int64($0) }
).update(db)
}
// Mark the message as sent
try interaction.recipientStates

@ -102,6 +102,10 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
/// This is a job that runs once whenever an attachment is downloaded to attempt to decode and properly
/// download the attachment
case attachmentDownload
/// This is a job that runs once whenever some disappearing messages is read and started the timer to inform
/// linked devices and the network for the change of the messages's ttl
case syncExpires
}
public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {

Loading…
Cancel
Save