diff --git a/Session/Settings/SettingsViewModel.swift b/Session/Settings/SettingsViewModel.swift index 8ee5ba0dd..41db4c023 100644 --- a/Session/Settings/SettingsViewModel.swift +++ b/Session/Settings/SettingsViewModel.swift @@ -561,9 +561,8 @@ class ImagePickerHandler: NSObject, UIImagePickerControllerDelegate & UINavigati // Check if the user selected an animated image (if so then don't crop, just // set the avatar directly guard - let type: Any = try? imageUrl.resourceValues(forKeys: [.typeIdentifierKey]) - .allValues - .first, + let resourceValues: URLResourceValues = (try? imageUrl.resourceValues(forKeys: [.typeIdentifierKey])), + let type: Any = resourceValues.allValues.first?.value, let typeString: String = type as? String, MIMETypeUtil.supportedAnimatedImageUTITypes().contains(typeString) else { diff --git a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift index 4692d48b1..88e72891f 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift @@ -20,6 +20,7 @@ public enum AttachmentUploadJob: JobExecutor { ) { guard let threadId: String = job.threadId, + let interactionId: Int64 = job.interactionId, let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData), let (attachment, openGroup): (Attachment, OpenGroup?) = Storage.shared.read({ db in @@ -36,11 +37,9 @@ public enum AttachmentUploadJob: JobExecutor { // If the original interaction no longer exists then don't bother uploading the attachment (ie. the // message was deleted before it even got sent) - if let interactionId: Int64 = job.interactionId { - guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { - failure(job, StorageError.objectNotFound, true) - return - } + guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { + failure(job, StorageError.objectNotFound, true) + return } // If the attachment is still pending download the hold off on running this job @@ -55,6 +54,8 @@ public enum AttachmentUploadJob: JobExecutor { attachment.upload( queue: queue, using: { db, data in + SNLog("[AttachmentUpload] Started for message \(interactionId) (\(attachment.byteCount) bytes)") + if let openGroup: OpenGroup = openGroup { return OpenGroupAPI .uploadFile( diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 8e9aa3732..e8d3e0265 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -96,7 +96,7 @@ public enum MessageSendJob: JobExecutor { ) ) } - .compactMap { stateInfo in + .compactMap { stateInfo -> (jobId: Int64, job: Job)? in JobRunner .insert( db, @@ -111,10 +111,9 @@ public enum MessageSendJob: JobExecutor { ) ), before: job - )? - .id + ) } - .forEach { otherJobId in + .forEach { otherJobId, _ in // Create the dependency between the jobs try JobDependencies( jobId: jobId, diff --git a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift index b000f6ba6..7addb56e5 100644 --- a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift +++ b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift @@ -44,8 +44,8 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist // MARK: - Custom Database Interaction - public mutating func didInsert(with rowID: Int64, for column: String?) { - self.id = rowID + public mutating func didInsert(_ inserted: InsertionSuccess) { + self.id = inserted.rowID } } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 9af678cee..037d83fc3 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -304,8 +304,8 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer extension Job { internal static func filterPendingJobs( variants: [Variant], - excludeFutureJobs: Bool = true, - includeJobsWithDependencies: Bool = false + excludeFutureJobs: Bool, + includeJobsWithDependencies: Bool ) -> QueryInterfaceRequest { var query: QueryInterfaceRequest = Job .filter( diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 607c89b29..065ad3ba1 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -131,6 +131,10 @@ public final class JobRunner { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") return } + guard !canStartJob || updatedJob.id != nil else { + SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") + return + } queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } @@ -150,6 +154,10 @@ public final class JobRunner { /// is in the future then the job won't be started public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) { guard let job: Job = job else { return } // Ignore null jobs + guard job.id != nil else { + add(db, job: job, canStartJob: canStartJob) + return + } queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) @@ -159,7 +167,7 @@ public final class JobRunner { } } - @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> Job? { + @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? { switch job?.behaviour { case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")") @@ -173,6 +181,10 @@ public final class JobRunner { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") return nil } + guard let jobId: Int64 = updatedJob.id else { + SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") + return nil + } queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) @@ -181,7 +193,7 @@ public final class JobRunner { queues.wrappedValue[updatedJob.variant]?.start() } - return updatedJob + return (jobId, updatedJob) } public static func appDidFinishLaunching() { @@ -499,6 +511,10 @@ private final class JobQueue { job.behaviour != .runOnceNextLaunch, job.nextRunTimestamp <= Date().timeIntervalSince1970 else { return } + guard job.id != nil else { + SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue") + return + } queue.mutate { $0.append(job) } } @@ -510,7 +526,7 @@ private final class JobQueue { /// is in the future then the job won't be started fileprivate func upsert(_ job: Job, canStartJob: Bool = true) { guard let jobId: Int64 = job.id else { - add(job, canStartJob: canStartJob) + SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") return } @@ -535,6 +551,11 @@ private final class JobQueue { } fileprivate func insert(_ job: Job, before otherJob: Job) { + guard job.id != nil else { + SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue") + return + } + // Insert the job before the current job (re-adding the current job to // the start of the queue if it's not in there) - this will mean the new // job will run and then the otherJob will run (or run again) once it's @@ -634,7 +655,12 @@ private final class JobQueue { let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = Storage.shared.read { db in - try Job.filterPendingJobs(variants: jobVariants) + try Job + .filterPendingJobs( + variants: jobVariants, + excludeFutureJobs: true, + includeJobsWithDependencies: false + ) .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue .fetchAll(db) @@ -709,6 +735,11 @@ private final class JobQueue { handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true) return } + guard nextJob.id != nil else { + SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id") + handleJobFailed(nextJob, error: JobRunnerError.jobIdMissing, permanentFailure: false) + return + } // If the 'nextRunTimestamp' for the job is in the future then don't run it yet guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else { @@ -787,7 +818,7 @@ private final class JobQueue { numJobsRunning = jobsCurrentlyRunning.count } detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } - SNLog("[JobRunner] \(queueContext) started job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") + SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") jobExecutor.run( nextJob, @@ -809,7 +840,12 @@ private final class JobQueue { private func scheduleNextSoonestJob() { let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in - try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false) + try Job + .filterPendingJobs( + variants: jobVariants, + excludeFutureJobs: false, + includeJobsWithDependencies: false + ) .select(.nextRunTimestamp) .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .asRequest(of: TimeInterval.self) diff --git a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift index 8d015095d..0932187ed 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift @@ -6,6 +6,7 @@ public enum JobRunnerError: Error { case generic case executorMissing + case jobIdMissing case requiredThreadIdMissing case requiredInteractionIdMissing