From df7b47da643a0a8926e3db2bad91964c14701c90 Mon Sep 17 00:00:00 2001 From: ryanzhao Date: Wed, 5 Jul 2023 17:17:15 +1000 Subject: [PATCH] make get expiration a job to fix race condtion --- Session.xcodeproj/project.pbxproj | 4 + .../Jobs/Types/DisappearingMessagesJob.swift | 32 ++-- .../Jobs/Types/GetExpirationJob.swift | 140 ++++++++++++++++++ SessionUtilitiesKit/Database/Models/Job.swift | 4 + 4 files changed, 171 insertions(+), 9 deletions(-) create mode 100644 SessionMessagingKit/Jobs/Types/GetExpirationJob.swift diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index 18fa9e777..eff615d2f 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -126,6 +126,7 @@ 7B7037432834B81F000DCF35 /* ReactionContainerView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7037422834B81F000DCF35 /* ReactionContainerView.swift */; }; 7B7037452834BCC0000DCF35 /* ReactionView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7037442834BCC0000DCF35 /* ReactionView.swift */; }; 7B71A98F2925E2A600E54854 /* SessionFooterView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B71A98E2925E2A600E54854 /* SessionFooterView.swift */; }; + 7B7AD41F2A5512CA00469FB1 /* GetExpirationJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7AD41E2A5512CA00469FB1 /* GetExpirationJob.swift */; }; 7B7CB18E270D066F0079FF93 /* IncomingCallBanner.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB18D270D066F0079FF93 /* IncomingCallBanner.swift */; }; 7B7CB190270FB2150079FF93 /* MiniCallView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB18F270FB2150079FF93 /* MiniCallView.swift */; }; 7B7CB192271508AD0079FF93 /* CallRingTonePlayer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB191271508AD0079FF93 /* CallRingTonePlayer.swift */; }; @@ -1248,6 +1249,7 @@ 7B7037422834B81F000DCF35 /* ReactionContainerView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReactionContainerView.swift; sourceTree = ""; }; 7B7037442834BCC0000DCF35 /* ReactionView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReactionView.swift; sourceTree = ""; }; 7B71A98E2925E2A600E54854 /* SessionFooterView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionFooterView.swift; sourceTree = ""; }; + 7B7AD41E2A5512CA00469FB1 /* GetExpirationJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GetExpirationJob.swift; sourceTree = ""; }; 7B7CB18D270D066F0079FF93 /* IncomingCallBanner.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IncomingCallBanner.swift; sourceTree = ""; }; 7B7CB18F270FB2150079FF93 /* MiniCallView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MiniCallView.swift; sourceTree = ""; }; 7B7CB191271508AD0079FF93 /* CallRingTonePlayer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CallRingTonePlayer.swift; sourceTree = ""; }; @@ -4333,6 +4335,7 @@ 7B521E0929BFF84400C3C36A /* GroupLeavingJob.swift */, FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */, 7B7E5B512A4D024C00A8208E /* ExpirationUpdateJob.swift */, + 7B7AD41E2A5512CA00469FB1 /* GetExpirationJob.swift */, ); path = Types; sourceTree = ""; @@ -5997,6 +6000,7 @@ FDF0B73C27FFD3D6004C14C5 /* LinkPreview.swift in Sources */, FD09797527FAB64300936362 /* ProfileManager.swift in Sources */, FD245C57285065F100B966DD /* Poller.swift in Sources */, + 7B7AD41F2A5512CA00469FB1 /* GetExpirationJob.swift in Sources */, FDA8EAFE280E8B78002B68E5 /* FailedMessageSendsJob.swift in Sources */, FD245C6A2850666F00B966DD /* FileServerAPI.swift in Sources */, FDC4386927B4E6B800C60D73 /* String+Utlities.swift in Sources */, diff --git a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift index c649b670d..82097c379 100644 --- a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift +++ b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift @@ -83,7 +83,7 @@ public extension DisappearingMessagesJob { let expiresInSeconds: TimeInterval let serverHash: String } - let interactionIds: [Int64] = (try? Interaction + let expirationInfo: [String: TimeInterval] = (try? Interaction .filter( Interaction.Columns.threadId == threadId && Interaction.Columns.timestampMs <= lastReadTimestampMs && @@ -91,17 +91,31 @@ public extension DisappearingMessagesJob { Interaction.Columns.expiresStartedAtMs == nil ) .select( - Interaction.Columns.id + Interaction.Columns.expiresInSeconds, + Interaction.Columns.serverHash ) - .asRequest(of: Int64.self) + .asRequest(of: ExpirationInfo.self) .fetchAll(db)) .defaulting(to: []) - - guard !interactionIds.isEmpty else { return } - - let startedAtMs: Double = Double(SnodeAPI.currentOffsetTimestampMs()) - - updateNextRunIfNeeded(db, interactionIds: interactionIds, startedAtMs: startedAtMs, threadId: threadId) + .grouped(by: \.serverHash) + .compactMapValues{ $0.first?.expiresInSeconds } + + guard (expirationInfo.count > 0) else { return } + + let startedAtTimestampMs: Double = Double(SnodeAPI.currentOffsetTimestampMs()) + + JobRunner.add( + db, + job: Job( + variant: .getExpiration, + behaviour: .runOnce, + threadId: threadId, + details: GetExpirationJob.Details( + expirationInfo: expirationInfo, + startedAtTimestampMs: startedAtTimestampMs + ) + ) + ) } @discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? { diff --git a/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift b/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift new file mode 100644 index 000000000..c0335d6c8 --- /dev/null +++ b/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift @@ -0,0 +1,140 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import Combine +import GRDB +import SessionUtilitiesKit +import SessionSnodeKit + +public enum GetExpirationJob: JobExecutor { + public static var maxFailureCount: Int = -1 + public static var requiresThreadId: Bool = true + public static var requiresInteractionId: Bool = false + private static let minRunFrequency: TimeInterval = 1 + + public static func run( + _ job: SessionUtilitiesKit.Job, + queue: DispatchQueue, + success: @escaping (SessionUtilitiesKit.Job, Bool) -> (), + failure: @escaping (SessionUtilitiesKit.Job, Error?, Bool) -> (), + deferred: @escaping (SessionUtilitiesKit.Job) -> () + ) { + guard + let detailsData: Data = job.details, + let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) + else { + SNLog("[GetExpirationJob] Failing due to missing details") + failure(job, JobRunnerError.missingRequiredDetails, true) + return + } + + var expirationInfo: [String: TimeInterval] = details.expirationInfo + + let userPublicKey: String = getUserHexEncodedPublicKey() + SnodeAPI.getSwarm(for: userPublicKey) + .tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in + guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } + return SnodeAPI.getExpiries( + from: snode, + associatedWith: userPublicKey, + of: expirationInfo.map { $0.key } + ) + } + .subscribe(on: queue) + .receive(on: queue) + .map { (_, response) -> GetExpiriesResponse in + return response + } + .sinkUntilComplete( + receiveCompletion: { result in + switch result { + case .finished: + success(job, false) + case .failure(let error): + failure(job, error, true) + } + }, + receiveValue: { response in + Storage.shared.writeAsync { db in + try response.expiries.forEach { hash, expireAtMs in + guard let expiresInSeconds: TimeInterval = expirationInfo[hash] else { return } + let expiresStartedAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expiresInSeconds * 1000)) + + _ = try Interaction + .filter(Interaction.Columns.serverHash == hash) + .updateAll( + db, + Interaction.Columns.expiresStartedAtMs.set(to: expiresStartedAtMs) + ) + + guard let index = expirationInfo.index(forKey: hash) else { return } + expirationInfo.remove(at: index) + } + + try expirationInfo.forEach { key, _ in + _ = try Interaction + .filter(Interaction.Columns.serverHash == key) + .filter(Interaction.Columns.expiresStartedAtMs == nil) + .updateAll( + db, + Interaction.Columns.expiresStartedAtMs.set(to: details.startedAtTimestampMs) + ) + } + + if !expirationInfo.isEmpty { + let updatedJob: Job? = Storage.shared.write { db in + try job + .with(nextRunTimestamp: Date().timeIntervalSince1970 + minRunFrequency) + .saved(db) + } + + deferred(updatedJob ?? job) + } + } + } + ) + } +} + +// MARK: - GetExpirationJob.Details + +extension GetExpirationJob { + public struct Details: Codable { + private enum CodingKeys: String, CodingKey { + case expirationInfo + case startedAtTimestampMs + } + + public let expirationInfo: [String: TimeInterval] + public let startedAtTimestampMs: Double + + // MARK: - Initialization + + public init( + expirationInfo: [String: TimeInterval], + startedAtTimestampMs: Double + ) { + self.expirationInfo = expirationInfo + self.startedAtTimestampMs = startedAtTimestampMs + } + + // MARK: - Codable + + public init(from decoder: Decoder) throws { + let container: KeyedDecodingContainer = try decoder.container(keyedBy: CodingKeys.self) + + self = Details( + expirationInfo: try container.decode([String: TimeInterval].self, forKey: .expirationInfo), + startedAtTimestampMs: try container.decode(Double.self, forKey: .startedAtTimestampMs) + ) + } + + public func encode(to encoder: Encoder) throws { + var container: KeyedEncodingContainer = encoder.container(keyedBy: CodingKeys.self) + + try container.encode(expirationInfo, forKey: .expirationInfo) + try container.encode(startedAtTimestampMs, forKey: .startedAtTimestampMs) + } + } +} + diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 94cbd307e..b8a41ed55 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -120,6 +120,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, /// This is a job that runs once whenever disappearing after read messages are read and needed to update the /// expiration on the network case expirationUpdate + + /// This is a job that runs once whenever a message is marked as read because of syncing from user config and + /// needs to get expiration from network + case getExpiration } public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {