WIP: race condition handling

pull/941/head
Ryan Zhao 12 months ago
parent ce02aa3f5a
commit 1f20403a26

@ -12,6 +12,7 @@ public enum ConfigurationSyncJob: JobExecutor {
public static let requiresThreadId: Bool = true
public static let requiresInteractionId: Bool = false
private static let maxRunFrequency: TimeInterval = 3
private static let waitTimeForExpirationUpdate: TimeInterval = 1
public static func run(
_ job: Job,
@ -50,6 +51,27 @@ public enum ConfigurationSyncJob: JobExecutor {
return deferred(updatedJob ?? job)
}
// We want to update lastReadTimestamp after the disappearing messages are updated to the network, so on
// linked devices the expiration time can be the same and avoid race condition
guard
JobRunner
.infoForCurrentlyRunningJobs(of: .expirationUpdate)
.filter({ _, info in
info.threadId == job.threadId // Exclude expiration update jobs for different threads
})
.isEmpty
else {
// Defer the job to run 'maxRunFrequency'
let updatedJob: Job? = Storage.shared.write { db in
try job
.with(nextRunTimestamp: Date().timeIntervalSince1970 + waitTimeForExpirationUpdate)
.saved(db)
}
SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") deferred due to expiration update jobs running.")
return deferred(updatedJob ?? job)
}
// If we don't have a userKeyPair yet then there is no need to sync the configuration
// as the user doesn't exist yet (this will get triggered on the first launch of a
// fresh install due to the migrations getting run)

@ -83,8 +83,7 @@ public extension DisappearingMessagesJob {
let expiresInSeconds: TimeInterval
let serverHash: String
}
var expirationInfo: [String: TimeInterval] = (try? Interaction
let interactionIds: [Int64] = (try? Interaction
.filter(
Interaction.Columns.threadId == threadId &&
Interaction.Columns.timestampMs <= lastReadTimestampMs &&
@ -92,66 +91,17 @@ public extension DisappearingMessagesJob {
Interaction.Columns.expiresStartedAtMs == nil
)
.select(
Interaction.Columns.expiresInSeconds,
Interaction.Columns.serverHash
Interaction.Columns.id
)
.asRequest(of: ExpirationInfo.self)
.asRequest(of: Int64.self)
.fetchAll(db))
.defaulting(to: [])
.grouped(by: \.serverHash)
.compactMapValues{ $0.first?.expiresInSeconds }
// If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages
guard (expirationInfo.count > 0) else { return }
let timestampMs: Int64 = SnodeAPI.currentOffsetTimestampMs()
let userPublicKey: String = getUserHexEncodedPublicKey(db)
SnodeAPI.getSwarm(for: userPublicKey)
.tryFlatMap { swarm -> AnyPublisher<Void, Error> in
guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return SnodeAPI.getExpiries(
from: snode,
associatedWith: userPublicKey,
of: expirationInfo.map { $0.key }
)
.map { (_, response) in
Storage.shared.writeAsync { db in
try response.expiries.forEach { hash, expireAtMs in
guard let expiresInSeconds: TimeInterval = expirationInfo[hash] else { return }
let expiresStartedAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expiresInSeconds * 1000))
_ = try Interaction
.filter(Interaction.Columns.serverHash == hash)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: expiresStartedAtMs)
)
guard let index = expirationInfo.index(forKey: hash) else { return }
expirationInfo.remove(at: index)
}
try expirationInfo.forEach { key, _ in
_ = try Interaction
.filter(Interaction.Columns.serverHash == key)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: timestampMs)
)
}
JobRunner.upsert(
db,
job: updateNextRunIfNeeded(db)
)
}
}
.mapError { error in
return error
}
.eraseToAnyPublisher()
}
.sinkUntilComplete()
guard !interactionIds.isEmpty else { return }
let startedAtMs: Double = Double(SnodeAPI.currentOffsetTimestampMs())
updateNextRunIfNeeded(db, interactionIds: interactionIds, startedAtMs: startedAtMs, threadId: threadId)
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? {
@ -194,10 +144,12 @@ public extension DisappearingMessagesJob {
interactionExpirationInfosByExpiresInSeconds.forEach { expiresInSeconds, expirationInfos in
let expirationTimestampMs: Int64 = Int64(startedAtMs + expiresInSeconds * 1000)
JobRunner.upsert(
JobRunner.add(
db,
job: Job(
variant: .expirationUpdate,
behaviour: .runOnce,
threadId: threadId,
details: ExpirationUpdateJob.Details(
serverHashes: expirationInfos.map { $0.serverHash },
expirationTimestampMs: expirationTimestampMs

@ -8,7 +8,7 @@ import SessionSnodeKit
public enum ExpirationUpdateJob: JobExecutor {
public static var maxFailureCount: Int = -1
public static var requiresThreadId: Bool = false
public static var requiresThreadId: Bool = true
public static var requiresInteractionId: Bool = false
public static func run(

@ -67,6 +67,14 @@ public final class JobRunner {
private static let queues: Atomic<[Job.Variant: JobQueue]> = {
var jobVariants: Set<Job.Variant> = Job.Variant.allCases.asSet()
let expirationUpdateQueue: JobQueue = JobQueue(
type: .expirationUpdate,
executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
qos: .default,
jobVariants: [
jobVariants.remove(.expirationUpdate)
].compactMap { $0 }
)
let messageSendQueue: JobQueue = JobQueue(
type: .messageSend,
executionType: .concurrent, // Allow as many jobs to run at once as supported by the device
@ -77,8 +85,7 @@ public final class JobRunner {
jobVariants.remove(.notifyPushServer),
jobVariants.remove(.sendReadReceipts),
jobVariants.remove(.groupLeaving),
jobVariants.remove(.configurationSync),
jobVariants.remove(.expirationUpdate)
jobVariants.remove(.configurationSync)
].compactMap { $0 }
)
let messageReceiveQueue: JobQueue = JobQueue(
@ -109,6 +116,7 @@ public final class JobRunner {
)
return Atomic([
expirationUpdateQueue,
messageSendQueue,
messageReceiveQueue,
attachmentDownloadQueue,
@ -471,6 +479,7 @@ public final class JobQueue {
case messageSend
case messageReceive
case attachmentDownload
case expirationUpdate
var name: String {
switch self {
@ -479,6 +488,7 @@ public final class JobQueue {
case .messageSend: return "MessageSend"
case .messageReceive: return "MessageReceive"
case .attachmentDownload: return "AttachmentDownload"
case .expirationUpdate: return "ExpirationUpdate"
}
}
}

Loading…
Cancel
Save