make get expiration a job to fix race condtion

pull/941/head
ryanzhao 12 months ago
parent 6738af200f
commit df7b47da64

@ -126,6 +126,7 @@
7B7037432834B81F000DCF35 /* ReactionContainerView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7037422834B81F000DCF35 /* ReactionContainerView.swift */; };
7B7037452834BCC0000DCF35 /* ReactionView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7037442834BCC0000DCF35 /* ReactionView.swift */; };
7B71A98F2925E2A600E54854 /* SessionFooterView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B71A98E2925E2A600E54854 /* SessionFooterView.swift */; };
7B7AD41F2A5512CA00469FB1 /* GetExpirationJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7AD41E2A5512CA00469FB1 /* GetExpirationJob.swift */; };
7B7CB18E270D066F0079FF93 /* IncomingCallBanner.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB18D270D066F0079FF93 /* IncomingCallBanner.swift */; };
7B7CB190270FB2150079FF93 /* MiniCallView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB18F270FB2150079FF93 /* MiniCallView.swift */; };
7B7CB192271508AD0079FF93 /* CallRingTonePlayer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B7CB191271508AD0079FF93 /* CallRingTonePlayer.swift */; };
@ -1248,6 +1249,7 @@
7B7037422834B81F000DCF35 /* ReactionContainerView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReactionContainerView.swift; sourceTree = "<group>"; };
7B7037442834BCC0000DCF35 /* ReactionView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReactionView.swift; sourceTree = "<group>"; };
7B71A98E2925E2A600E54854 /* SessionFooterView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionFooterView.swift; sourceTree = "<group>"; };
7B7AD41E2A5512CA00469FB1 /* GetExpirationJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GetExpirationJob.swift; sourceTree = "<group>"; };
7B7CB18D270D066F0079FF93 /* IncomingCallBanner.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IncomingCallBanner.swift; sourceTree = "<group>"; };
7B7CB18F270FB2150079FF93 /* MiniCallView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MiniCallView.swift; sourceTree = "<group>"; };
7B7CB191271508AD0079FF93 /* CallRingTonePlayer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CallRingTonePlayer.swift; sourceTree = "<group>"; };
@ -4333,6 +4335,7 @@
7B521E0929BFF84400C3C36A /* GroupLeavingJob.swift */,
FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */,
7B7E5B512A4D024C00A8208E /* ExpirationUpdateJob.swift */,
7B7AD41E2A5512CA00469FB1 /* GetExpirationJob.swift */,
);
path = Types;
sourceTree = "<group>";
@ -5997,6 +6000,7 @@
FDF0B73C27FFD3D6004C14C5 /* LinkPreview.swift in Sources */,
FD09797527FAB64300936362 /* ProfileManager.swift in Sources */,
FD245C57285065F100B966DD /* Poller.swift in Sources */,
7B7AD41F2A5512CA00469FB1 /* GetExpirationJob.swift in Sources */,
FDA8EAFE280E8B78002B68E5 /* FailedMessageSendsJob.swift in Sources */,
FD245C6A2850666F00B966DD /* FileServerAPI.swift in Sources */,
FDC4386927B4E6B800C60D73 /* String+Utlities.swift in Sources */,

@ -83,7 +83,7 @@ public extension DisappearingMessagesJob {
let expiresInSeconds: TimeInterval
let serverHash: String
}
let interactionIds: [Int64] = (try? Interaction
let expirationInfo: [String: TimeInterval] = (try? Interaction
.filter(
Interaction.Columns.threadId == threadId &&
Interaction.Columns.timestampMs <= lastReadTimestampMs &&
@ -91,17 +91,31 @@ public extension DisappearingMessagesJob {
Interaction.Columns.expiresStartedAtMs == nil
)
.select(
Interaction.Columns.id
Interaction.Columns.expiresInSeconds,
Interaction.Columns.serverHash
)
.asRequest(of: Int64.self)
.asRequest(of: ExpirationInfo.self)
.fetchAll(db))
.defaulting(to: [])
guard !interactionIds.isEmpty else { return }
let startedAtMs: Double = Double(SnodeAPI.currentOffsetTimestampMs())
updateNextRunIfNeeded(db, interactionIds: interactionIds, startedAtMs: startedAtMs, threadId: threadId)
.grouped(by: \.serverHash)
.compactMapValues{ $0.first?.expiresInSeconds }
guard (expirationInfo.count > 0) else { return }
let startedAtTimestampMs: Double = Double(SnodeAPI.currentOffsetTimestampMs())
JobRunner.add(
db,
job: Job(
variant: .getExpiration,
behaviour: .runOnce,
threadId: threadId,
details: GetExpirationJob.Details(
expirationInfo: expirationInfo,
startedAtTimestampMs: startedAtTimestampMs
)
)
)
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? {

@ -0,0 +1,140 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import Combine
import GRDB
import SessionUtilitiesKit
import SessionSnodeKit
public enum GetExpirationJob: JobExecutor {
public static var maxFailureCount: Int = -1
public static var requiresThreadId: Bool = true
public static var requiresInteractionId: Bool = false
private static let minRunFrequency: TimeInterval = 1
public static func run(
_ job: SessionUtilitiesKit.Job,
queue: DispatchQueue,
success: @escaping (SessionUtilitiesKit.Job, Bool) -> (),
failure: @escaping (SessionUtilitiesKit.Job, Error?, Bool) -> (),
deferred: @escaping (SessionUtilitiesKit.Job) -> ()
) {
guard
let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData)
else {
SNLog("[GetExpirationJob] Failing due to missing details")
failure(job, JobRunnerError.missingRequiredDetails, true)
return
}
var expirationInfo: [String: TimeInterval] = details.expirationInfo
let userPublicKey: String = getUserHexEncodedPublicKey()
SnodeAPI.getSwarm(for: userPublicKey)
.tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in
guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return SnodeAPI.getExpiries(
from: snode,
associatedWith: userPublicKey,
of: expirationInfo.map { $0.key }
)
}
.subscribe(on: queue)
.receive(on: queue)
.map { (_, response) -> GetExpiriesResponse in
return response
}
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .finished:
success(job, false)
case .failure(let error):
failure(job, error, true)
}
},
receiveValue: { response in
Storage.shared.writeAsync { db in
try response.expiries.forEach { hash, expireAtMs in
guard let expiresInSeconds: TimeInterval = expirationInfo[hash] else { return }
let expiresStartedAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expiresInSeconds * 1000))
_ = try Interaction
.filter(Interaction.Columns.serverHash == hash)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: expiresStartedAtMs)
)
guard let index = expirationInfo.index(forKey: hash) else { return }
expirationInfo.remove(at: index)
}
try expirationInfo.forEach { key, _ in
_ = try Interaction
.filter(Interaction.Columns.serverHash == key)
.filter(Interaction.Columns.expiresStartedAtMs == nil)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: details.startedAtTimestampMs)
)
}
if !expirationInfo.isEmpty {
let updatedJob: Job? = Storage.shared.write { db in
try job
.with(nextRunTimestamp: Date().timeIntervalSince1970 + minRunFrequency)
.saved(db)
}
deferred(updatedJob ?? job)
}
}
}
)
}
}
// MARK: - GetExpirationJob.Details
extension GetExpirationJob {
public struct Details: Codable {
private enum CodingKeys: String, CodingKey {
case expirationInfo
case startedAtTimestampMs
}
public let expirationInfo: [String: TimeInterval]
public let startedAtTimestampMs: Double
// MARK: - Initialization
public init(
expirationInfo: [String: TimeInterval],
startedAtTimestampMs: Double
) {
self.expirationInfo = expirationInfo
self.startedAtTimestampMs = startedAtTimestampMs
}
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
self = Details(
expirationInfo: try container.decode([String: TimeInterval].self, forKey: .expirationInfo),
startedAtTimestampMs: try container.decode(Double.self, forKey: .startedAtTimestampMs)
)
}
public func encode(to encoder: Encoder) throws {
var container: KeyedEncodingContainer<CodingKeys> = encoder.container(keyedBy: CodingKeys.self)
try container.encode(expirationInfo, forKey: .expirationInfo)
try container.encode(startedAtTimestampMs, forKey: .startedAtTimestampMs)
}
}
}

@ -120,6 +120,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord,
/// This is a job that runs once whenever disappearing after read messages are read and needed to update the
/// expiration on the network
case expirationUpdate
/// This is a job that runs once whenever a message is marked as read because of syncing from user config and
/// needs to get expiration from network
case getExpiration
}
public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {

Loading…
Cancel
Save