Fixed another minor bug, added logging and defensive coding

Fixed a bug where you couldn't set animated avatars
Added some defensive coding to prevent the JobRunner queues from adding or starting jobs that don't have id's set
Tweaked the code to be a little more explicit when inserting jobs before other jobs
pull/745/head
Morgan Pretty 2 years ago
parent 09a6a422d0
commit b834dddbe4

@ -561,9 +561,8 @@ class ImagePickerHandler: NSObject, UIImagePickerControllerDelegate & UINavigati
// Check if the user selected an animated image (if so then don't crop, just // Check if the user selected an animated image (if so then don't crop, just
// set the avatar directly // set the avatar directly
guard guard
let type: Any = try? imageUrl.resourceValues(forKeys: [.typeIdentifierKey]) let resourceValues: URLResourceValues = (try? imageUrl.resourceValues(forKeys: [.typeIdentifierKey])),
.allValues let type: Any = resourceValues.allValues.first?.value,
.first,
let typeString: String = type as? String, let typeString: String = type as? String,
MIMETypeUtil.supportedAnimatedImageUTITypes().contains(typeString) MIMETypeUtil.supportedAnimatedImageUTITypes().contains(typeString)
else { else {

@ -20,6 +20,7 @@ public enum AttachmentUploadJob: JobExecutor {
) { ) {
guard guard
let threadId: String = job.threadId, let threadId: String = job.threadId,
let interactionId: Int64 = job.interactionId,
let detailsData: Data = job.details, let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData), let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData),
let (attachment, openGroup): (Attachment, OpenGroup?) = Storage.shared.read({ db in let (attachment, openGroup): (Attachment, OpenGroup?) = Storage.shared.read({ db in
@ -36,11 +37,9 @@ public enum AttachmentUploadJob: JobExecutor {
// If the original interaction no longer exists then don't bother uploading the attachment (ie. the // If the original interaction no longer exists then don't bother uploading the attachment (ie. the
// message was deleted before it even got sent) // message was deleted before it even got sent)
if let interactionId: Int64 = job.interactionId { guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else {
guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { failure(job, StorageError.objectNotFound, true)
failure(job, StorageError.objectNotFound, true) return
return
}
} }
// If the attachment is still pending download the hold off on running this job // If the attachment is still pending download the hold off on running this job
@ -55,6 +54,8 @@ public enum AttachmentUploadJob: JobExecutor {
attachment.upload( attachment.upload(
queue: queue, queue: queue,
using: { db, data in using: { db, data in
SNLog("[AttachmentUpload] Started for message \(interactionId) (\(attachment.byteCount) bytes)")
if let openGroup: OpenGroup = openGroup { if let openGroup: OpenGroup = openGroup {
return OpenGroupAPI return OpenGroupAPI
.uploadFile( .uploadFile(

@ -96,7 +96,7 @@ public enum MessageSendJob: JobExecutor {
) )
) )
} }
.compactMap { stateInfo in .compactMap { stateInfo -> (jobId: Int64, job: Job)? in
JobRunner JobRunner
.insert( .insert(
db, db,
@ -111,10 +111,9 @@ public enum MessageSendJob: JobExecutor {
) )
), ),
before: job before: job
)? )
.id
} }
.forEach { otherJobId in .forEach { otherJobId, _ in
// Create the dependency between the jobs // Create the dependency between the jobs
try JobDependencies( try JobDependencies(
jobId: jobId, jobId: jobId,

@ -44,8 +44,8 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist
// MARK: - Custom Database Interaction // MARK: - Custom Database Interaction
public mutating func didInsert(with rowID: Int64, for column: String?) { public mutating func didInsert(_ inserted: InsertionSuccess) {
self.id = rowID self.id = inserted.rowID
} }
} }

@ -304,8 +304,8 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
extension Job { extension Job {
internal static func filterPendingJobs( internal static func filterPendingJobs(
variants: [Variant], variants: [Variant],
excludeFutureJobs: Bool = true, excludeFutureJobs: Bool,
includeJobsWithDependencies: Bool = false includeJobsWithDependencies: Bool
) -> QueryInterfaceRequest<Job> { ) -> QueryInterfaceRequest<Job> {
var query: QueryInterfaceRequest<Job> = Job var query: QueryInterfaceRequest<Job> = Job
.filter( .filter(

@ -131,6 +131,10 @@ public final class JobRunner {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job")
return return
} }
guard !canStartJob || updatedJob.id != nil else {
SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id")
return
}
queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) }
@ -150,6 +154,10 @@ public final class JobRunner {
/// is in the future then the job won't be started /// is in the future then the job won't be started
public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) { public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) {
guard let job: Job = job else { return } // Ignore null jobs guard let job: Job = job else { return } // Ignore null jobs
guard job.id != nil else {
add(db, job: job, canStartJob: canStartJob)
return
}
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob)
@ -159,7 +167,7 @@ public final class JobRunner {
} }
} }
@discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> Job? { @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? {
switch job?.behaviour { switch job?.behaviour {
case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch:
SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")") SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")")
@ -173,6 +181,10 @@ public final class JobRunner {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job")
return nil return nil
} }
guard let jobId: Int64 = updatedJob.id else {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id")
return nil
}
queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
@ -181,7 +193,7 @@ public final class JobRunner {
queues.wrappedValue[updatedJob.variant]?.start() queues.wrappedValue[updatedJob.variant]?.start()
} }
return updatedJob return (jobId, updatedJob)
} }
public static func appDidFinishLaunching() { public static func appDidFinishLaunching() {
@ -499,6 +511,10 @@ private final class JobQueue {
job.behaviour != .runOnceNextLaunch, job.behaviour != .runOnceNextLaunch,
job.nextRunTimestamp <= Date().timeIntervalSince1970 job.nextRunTimestamp <= Date().timeIntervalSince1970
else { return } else { return }
guard job.id != nil else {
SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue")
return
}
queue.mutate { $0.append(job) } queue.mutate { $0.append(job) }
} }
@ -510,7 +526,7 @@ private final class JobQueue {
/// is in the future then the job won't be started /// is in the future then the job won't be started
fileprivate func upsert(_ job: Job, canStartJob: Bool = true) { fileprivate func upsert(_ job: Job, canStartJob: Bool = true) {
guard let jobId: Int64 = job.id else { guard let jobId: Int64 = job.id else {
add(job, canStartJob: canStartJob) SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue")
return return
} }
@ -535,6 +551,11 @@ private final class JobQueue {
} }
fileprivate func insert(_ job: Job, before otherJob: Job) { fileprivate func insert(_ job: Job, before otherJob: Job) {
guard job.id != nil else {
SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue")
return
}
// Insert the job before the current job (re-adding the current job to // Insert the job before the current job (re-adding the current job to
// the start of the queue if it's not in there) - this will mean the new // the start of the queue if it's not in there) - this will mean the new
// job will run and then the otherJob will run (or run again) once it's // job will run and then the otherJob will run (or run again) once it's
@ -634,7 +655,12 @@ private final class JobQueue {
let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue
let jobsAlreadyInQueue: Set<Int64> = queue.wrappedValue.compactMap { $0.id }.asSet() let jobsAlreadyInQueue: Set<Int64> = queue.wrappedValue.compactMap { $0.id }.asSet()
let jobsToRun: [Job] = Storage.shared.read { db in let jobsToRun: [Job] = Storage.shared.read { db in
try Job.filterPendingJobs(variants: jobVariants) try Job
.filterPendingJobs(
variants: jobVariants,
excludeFutureJobs: true,
includeJobsWithDependencies: false
)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue .filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue
.fetchAll(db) .fetchAll(db)
@ -709,6 +735,11 @@ private final class JobQueue {
handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true) handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true)
return return
} }
guard nextJob.id != nil else {
SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id")
handleJobFailed(nextJob, error: JobRunnerError.jobIdMissing, permanentFailure: false)
return
}
// If the 'nextRunTimestamp' for the job is in the future then don't run it yet // If the 'nextRunTimestamp' for the job is in the future then don't run it yet
guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else { guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else {
@ -787,7 +818,7 @@ private final class JobQueue {
numJobsRunning = jobsCurrentlyRunning.count numJobsRunning = jobsCurrentlyRunning.count
} }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) }
SNLog("[JobRunner] \(queueContext) started job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)")
jobExecutor.run( jobExecutor.run(
nextJob, nextJob,
@ -809,7 +840,12 @@ private final class JobQueue {
private func scheduleNextSoonestJob() { private func scheduleNextSoonestJob() {
let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue
let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in
try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false) try Job
.filterPendingJobs(
variants: jobVariants,
excludeFutureJobs: false,
includeJobsWithDependencies: false
)
.select(.nextRunTimestamp) .select(.nextRunTimestamp)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.asRequest(of: TimeInterval.self) .asRequest(of: TimeInterval.self)

@ -6,6 +6,7 @@ public enum JobRunnerError: Error {
case generic case generic
case executorMissing case executorMissing
case jobIdMissing
case requiredThreadIdMissing case requiredThreadIdMissing
case requiredInteractionIdMissing case requiredInteractionIdMissing

Loading…
Cancel
Save