diff --git a/SessionMessagingKit/Jobs/AttachmentDownloadJob.swift b/SessionMessagingKit/Jobs/AttachmentDownloadJob.swift index 6c8dab50b..85e251553 100644 --- a/SessionMessagingKit/Jobs/AttachmentDownloadJob.swift +++ b/SessionMessagingKit/Jobs/AttachmentDownloadJob.swift @@ -59,6 +59,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject // MARK: Running public func execute() { + JobQueue.currentlyExecutingJobs.insert(id!) guard !isDeferred else { return } if TSAttachment.fetch(uniqueId: attachmentID) is TSAttachmentStream { // FIXME: It's not clear * how * this happens, but apparently we can get to this point diff --git a/SessionMessagingKit/Jobs/AttachmentUploadJob.swift b/SessionMessagingKit/Jobs/AttachmentUploadJob.swift index fc0f1be77..584824982 100644 --- a/SessionMessagingKit/Jobs/AttachmentUploadJob.swift +++ b/SessionMessagingKit/Jobs/AttachmentUploadJob.swift @@ -60,6 +60,7 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N // MARK: Running public func execute() { + JobQueue.currentlyExecutingJobs.insert(id!) guard let stream = TSAttachment.fetch(uniqueId: attachmentID) as? TSAttachmentStream else { return handleFailure(error: Error.noAttachment) } diff --git a/SessionMessagingKit/Jobs/JobQueue.swift b/SessionMessagingKit/Jobs/JobQueue.swift index a09bc5cfb..289b8fc70 100644 --- a/SessionMessagingKit/Jobs/JobQueue.swift +++ b/SessionMessagingKit/Jobs/JobQueue.swift @@ -4,6 +4,8 @@ import SessionUtilitiesKit public final class JobQueue : NSObject, JobDelegate { private static var jobIDs: [UInt64:UInt64] = [:] + + internal static var currentlyExecutingJobs: Set<String> = [] @objc public static let shared = JobQueue() @@ -34,6 +36,9 @@ public final class JobQueue : NSObject, JobDelegate { allJobTypes.forEach { type in let allPendingJobs = SNMessagingKitConfiguration.shared.storage.getAllPendingJobs(of: type) allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { job in // Retry the oldest jobs first + guard !JobQueue.currentlyExecutingJobs.contains(job.id!) else { + return SNLog("Not resuming already executing job.") + } SNLog("Resuming pending job of type: \(type).") job.delegate = self job.execute() @@ -42,6 +47,7 @@ public final class JobQueue : NSObject, JobDelegate { } public func handleJobSucceeded(_ job: Job) { + JobQueue.currentlyExecutingJobs.remove(job.id!) SNMessagingKitConfiguration.shared.storage.write(with: { transaction in SNMessagingKitConfiguration.shared.storage.markJobAsSucceeded(job, using: transaction) }, completion: { @@ -50,6 +56,7 @@ public final class JobQueue : NSObject, JobDelegate { } public func handleJobFailed(_ job: Job, with error: Error) { + JobQueue.currentlyExecutingJobs.remove(job.id!) job.failureCount += 1 let storage = SNMessagingKitConfiguration.shared.storage guard !storage.isJobCanceled(job) else { return SNLog("\(type(of: job)) canceled.") } @@ -71,6 +78,7 @@ public final class JobQueue : NSObject, JobDelegate { } public func handleJobFailedPermanently(_ job: Job, with error: Error) { + JobQueue.currentlyExecutingJobs.remove(job.id!) job.failureCount += 1 let storage = SNMessagingKitConfiguration.shared.storage storage.write(with: { transaction in diff --git a/SessionMessagingKit/Jobs/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/MessageReceiveJob.swift index 6cfe0dbfe..48e919ab8 100644 --- a/SessionMessagingKit/Jobs/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/MessageReceiveJob.swift @@ -54,6 +54,7 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC } public func execute() -> Promise<Void> { + JobQueue.currentlyExecutingJobs.insert(id!) let (promise, seal) = Promise<Void>.pending() SNMessagingKitConfiguration.shared.storage.write(with: { transaction in // Intentionally capture self do { diff --git a/SessionMessagingKit/Jobs/MessageSendJob.swift b/SessionMessagingKit/Jobs/MessageSendJob.swift index 6fc0ae4b4..c894e6460 100644 --- a/SessionMessagingKit/Jobs/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/MessageSendJob.swift @@ -69,6 +69,7 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi // MARK: Running public func execute() { + JobQueue.currentlyExecutingJobs.insert(id!) let storage = SNMessagingKitConfiguration.shared.storage if let message = message as? VisibleMessage { guard TSOutgoingMessage.find(withTimestamp: message.sentTimestamp!) != nil else { return } // The message has been deleted diff --git a/SessionMessagingKit/Jobs/NotifyPNServerJob.swift b/SessionMessagingKit/Jobs/NotifyPNServerJob.swift index 2c506dc43..c243fa30a 100644 --- a/SessionMessagingKit/Jobs/NotifyPNServerJob.swift +++ b/SessionMessagingKit/Jobs/NotifyPNServerJob.swift @@ -38,6 +38,7 @@ public final class NotifyPNServerJob : NSObject, Job, NSCoding { // NSObject/NSC } public func execute() -> Promise<Void> { + JobQueue.currentlyExecutingJobs.insert(id!) let server = PushNotificationAPI.server let parameters = [ "data" : message.data.description, "send_to" : message.recipient ] let url = URL(string: "\(server)/notify")!