From 09a6a422d042709a569f667d29d48e80a5c35eae Mon Sep 17 00:00:00 2001 From: ryanzhao Date: Wed, 30 Nov 2022 10:13:43 +1100 Subject: [PATCH 1/2] fix an issue where jobs' ids won't be set correctly and the jobs will be run more than once --- SessionUtilitiesKit/Database/Models/Job.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 471df30c1..9af678cee 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -294,8 +294,8 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer // MARK: - Custom Database Interaction - public mutating func didInsert(with rowID: Int64, for column: String?) { - self.id = rowID + public mutating func didInsert(_ inserted: InsertionSuccess) { + self.id = inserted.rowID } } From b834dddbe4267d143af243b92496bac4679106c5 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 30 Nov 2022 11:57:13 +1100 Subject: [PATCH 2/2] Fixed another minor bug, added logging and defensive coding Fixed a bug where you couldn't set animated avatars Added some defensive coding to prevent the JobRunner queues from adding or starting jobs that don't have id's set Tweaked the code to be a little more explicit when inserting jobs before other jobs --- Session/Settings/SettingsViewModel.swift | 5 +- .../Jobs/Types/AttachmentUploadJob.swift | 11 +++-- .../Jobs/Types/MessageSendJob.swift | 7 ++- .../Models/SnodeReceivedMessageInfo.swift | 4 +- SessionUtilitiesKit/Database/Models/Job.swift | 4 +- SessionUtilitiesKit/JobRunner/JobRunner.swift | 48 ++++++++++++++++--- .../JobRunner/JobRunnerError.swift | 1 + 7 files changed, 58 insertions(+), 22 deletions(-) diff --git a/Session/Settings/SettingsViewModel.swift b/Session/Settings/SettingsViewModel.swift index 8ee5ba0dd..41db4c023 100644 --- a/Session/Settings/SettingsViewModel.swift +++ b/Session/Settings/SettingsViewModel.swift @@ -561,9 +561,8 @@ class ImagePickerHandler: NSObject, UIImagePickerControllerDelegate & UINavigati // Check if the user selected an animated image (if so then don't crop, just // set the avatar directly guard - let type: Any = try? imageUrl.resourceValues(forKeys: [.typeIdentifierKey]) - .allValues - .first, + let resourceValues: URLResourceValues = (try? imageUrl.resourceValues(forKeys: [.typeIdentifierKey])), + let type: Any = resourceValues.allValues.first?.value, let typeString: String = type as? String, MIMETypeUtil.supportedAnimatedImageUTITypes().contains(typeString) else { diff --git a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift index 4692d48b1..88e72891f 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift @@ -20,6 +20,7 @@ public enum AttachmentUploadJob: JobExecutor { ) { guard let threadId: String = job.threadId, + let interactionId: Int64 = job.interactionId, let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData), let (attachment, openGroup): (Attachment, OpenGroup?) = Storage.shared.read({ db in @@ -36,11 +37,9 @@ public enum AttachmentUploadJob: JobExecutor { // If the original interaction no longer exists then don't bother uploading the attachment (ie. the // message was deleted before it even got sent) - if let interactionId: Int64 = job.interactionId { - guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { - failure(job, StorageError.objectNotFound, true) - return - } + guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { + failure(job, StorageError.objectNotFound, true) + return } // If the attachment is still pending download the hold off on running this job @@ -55,6 +54,8 @@ public enum AttachmentUploadJob: JobExecutor { attachment.upload( queue: queue, using: { db, data in + SNLog("[AttachmentUpload] Started for message \(interactionId) (\(attachment.byteCount) bytes)") + if let openGroup: OpenGroup = openGroup { return OpenGroupAPI .uploadFile( diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 8e9aa3732..e8d3e0265 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -96,7 +96,7 @@ public enum MessageSendJob: JobExecutor { ) ) } - .compactMap { stateInfo in + .compactMap { stateInfo -> (jobId: Int64, job: Job)? in JobRunner .insert( db, @@ -111,10 +111,9 @@ public enum MessageSendJob: JobExecutor { ) ), before: job - )? - .id + ) } - .forEach { otherJobId in + .forEach { otherJobId, _ in // Create the dependency between the jobs try JobDependencies( jobId: jobId, diff --git a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift index b000f6ba6..7addb56e5 100644 --- a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift +++ b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift @@ -44,8 +44,8 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist // MARK: - Custom Database Interaction - public mutating func didInsert(with rowID: Int64, for column: String?) { - self.id = rowID + public mutating func didInsert(_ inserted: InsertionSuccess) { + self.id = inserted.rowID } } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 9af678cee..037d83fc3 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -304,8 +304,8 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer extension Job { internal static func filterPendingJobs( variants: [Variant], - excludeFutureJobs: Bool = true, - includeJobsWithDependencies: Bool = false + excludeFutureJobs: Bool, + includeJobsWithDependencies: Bool ) -> QueryInterfaceRequest { var query: QueryInterfaceRequest = Job .filter( diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 607c89b29..065ad3ba1 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -131,6 +131,10 @@ public final class JobRunner { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") return } + guard !canStartJob || updatedJob.id != nil else { + SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") + return + } queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } @@ -150,6 +154,10 @@ public final class JobRunner { /// is in the future then the job won't be started public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) { guard let job: Job = job else { return } // Ignore null jobs + guard job.id != nil else { + add(db, job: job, canStartJob: canStartJob) + return + } queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) @@ -159,7 +167,7 @@ public final class JobRunner { } } - @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> Job? { + @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? { switch job?.behaviour { case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")") @@ -173,6 +181,10 @@ public final class JobRunner { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") return nil } + guard let jobId: Int64 = updatedJob.id else { + SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") + return nil + } queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) @@ -181,7 +193,7 @@ public final class JobRunner { queues.wrappedValue[updatedJob.variant]?.start() } - return updatedJob + return (jobId, updatedJob) } public static func appDidFinishLaunching() { @@ -499,6 +511,10 @@ private final class JobQueue { job.behaviour != .runOnceNextLaunch, job.nextRunTimestamp <= Date().timeIntervalSince1970 else { return } + guard job.id != nil else { + SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue") + return + } queue.mutate { $0.append(job) } } @@ -510,7 +526,7 @@ private final class JobQueue { /// is in the future then the job won't be started fileprivate func upsert(_ job: Job, canStartJob: Bool = true) { guard let jobId: Int64 = job.id else { - add(job, canStartJob: canStartJob) + SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") return } @@ -535,6 +551,11 @@ private final class JobQueue { } fileprivate func insert(_ job: Job, before otherJob: Job) { + guard job.id != nil else { + SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue") + return + } + // Insert the job before the current job (re-adding the current job to // the start of the queue if it's not in there) - this will mean the new // job will run and then the otherJob will run (or run again) once it's @@ -634,7 +655,12 @@ private final class JobQueue { let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = Storage.shared.read { db in - try Job.filterPendingJobs(variants: jobVariants) + try Job + .filterPendingJobs( + variants: jobVariants, + excludeFutureJobs: true, + includeJobsWithDependencies: false + ) .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue .fetchAll(db) @@ -709,6 +735,11 @@ private final class JobQueue { handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true) return } + guard nextJob.id != nil else { + SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id") + handleJobFailed(nextJob, error: JobRunnerError.jobIdMissing, permanentFailure: false) + return + } // If the 'nextRunTimestamp' for the job is in the future then don't run it yet guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else { @@ -787,7 +818,7 @@ private final class JobQueue { numJobsRunning = jobsCurrentlyRunning.count } detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } - SNLog("[JobRunner] \(queueContext) started job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") + SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") jobExecutor.run( nextJob, @@ -809,7 +840,12 @@ private final class JobQueue { private func scheduleNextSoonestJob() { let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in - try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false) + try Job + .filterPendingJobs( + variants: jobVariants, + excludeFutureJobs: false, + includeJobsWithDependencies: false + ) .select(.nextRunTimestamp) .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .asRequest(of: TimeInterval.self) diff --git a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift index 8d015095d..0932187ed 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift @@ -6,6 +6,7 @@ public enum JobRunnerError: Error { case generic case executorMissing + case jobIdMissing case requiredThreadIdMissing case requiredInteractionIdMissing