Fix job duplication bug

This could cause attachments not to download on the receiving side, and potentially also cause duplicate push notifications to be sent
pull/427/head
nielsandriesse 4 years ago
parent c375dee5e3
commit e24fca7b37

@ -59,6 +59,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
// MARK: Running // MARK: Running
public func execute() { public func execute() {
JobQueue.currentlyExecutingJobs.insert(id!)
guard !isDeferred else { return } guard !isDeferred else { return }
if TSAttachment.fetch(uniqueId: attachmentID) is TSAttachmentStream { if TSAttachment.fetch(uniqueId: attachmentID) is TSAttachmentStream {
// FIXME: It's not clear * how * this happens, but apparently we can get to this point // FIXME: It's not clear * how * this happens, but apparently we can get to this point

@ -60,6 +60,7 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
// MARK: Running // MARK: Running
public func execute() { public func execute() {
JobQueue.currentlyExecutingJobs.insert(id!)
guard let stream = TSAttachment.fetch(uniqueId: attachmentID) as? TSAttachmentStream else { guard let stream = TSAttachment.fetch(uniqueId: attachmentID) as? TSAttachmentStream else {
return handleFailure(error: Error.noAttachment) return handleFailure(error: Error.noAttachment)
} }

@ -5,6 +5,8 @@ public final class JobQueue : NSObject, JobDelegate {
private static var jobIDs: [UInt64:UInt64] = [:] private static var jobIDs: [UInt64:UInt64] = [:]
internal static var currentlyExecutingJobs: Set<String> = []
@objc public static let shared = JobQueue() @objc public static let shared = JobQueue()
@objc public func add(_ job: Job, using transaction: Any) { @objc public func add(_ job: Job, using transaction: Any) {
@ -34,6 +36,9 @@ public final class JobQueue : NSObject, JobDelegate {
allJobTypes.forEach { type in allJobTypes.forEach { type in
let allPendingJobs = SNMessagingKitConfiguration.shared.storage.getAllPendingJobs(of: type) let allPendingJobs = SNMessagingKitConfiguration.shared.storage.getAllPendingJobs(of: type)
allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { job in // Retry the oldest jobs first allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { job in // Retry the oldest jobs first
guard !JobQueue.currentlyExecutingJobs.contains(job.id!) else {
return SNLog("Not resuming already executing job.")
}
SNLog("Resuming pending job of type: \(type).") SNLog("Resuming pending job of type: \(type).")
job.delegate = self job.delegate = self
job.execute() job.execute()
@ -42,6 +47,7 @@ public final class JobQueue : NSObject, JobDelegate {
} }
public func handleJobSucceeded(_ job: Job) { public func handleJobSucceeded(_ job: Job) {
JobQueue.currentlyExecutingJobs.remove(job.id!)
SNMessagingKitConfiguration.shared.storage.write(with: { transaction in SNMessagingKitConfiguration.shared.storage.write(with: { transaction in
SNMessagingKitConfiguration.shared.storage.markJobAsSucceeded(job, using: transaction) SNMessagingKitConfiguration.shared.storage.markJobAsSucceeded(job, using: transaction)
}, completion: { }, completion: {
@ -50,6 +56,7 @@ public final class JobQueue : NSObject, JobDelegate {
} }
public func handleJobFailed(_ job: Job, with error: Error) { public func handleJobFailed(_ job: Job, with error: Error) {
JobQueue.currentlyExecutingJobs.remove(job.id!)
job.failureCount += 1 job.failureCount += 1
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
guard !storage.isJobCanceled(job) else { return SNLog("\(type(of: job)) canceled.") } guard !storage.isJobCanceled(job) else { return SNLog("\(type(of: job)) canceled.") }
@ -71,6 +78,7 @@ public final class JobQueue : NSObject, JobDelegate {
} }
public func handleJobFailedPermanently(_ job: Job, with error: Error) { public func handleJobFailedPermanently(_ job: Job, with error: Error) {
JobQueue.currentlyExecutingJobs.remove(job.id!)
job.failureCount += 1 job.failureCount += 1
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
storage.write(with: { transaction in storage.write(with: { transaction in

@ -54,6 +54,7 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
} }
public func execute() -> Promise<Void> { public func execute() -> Promise<Void> {
JobQueue.currentlyExecutingJobs.insert(id!)
let (promise, seal) = Promise<Void>.pending() let (promise, seal) = Promise<Void>.pending()
SNMessagingKitConfiguration.shared.storage.write(with: { transaction in // Intentionally capture self SNMessagingKitConfiguration.shared.storage.write(with: { transaction in // Intentionally capture self
do { do {

@ -69,6 +69,7 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
// MARK: Running // MARK: Running
public func execute() { public func execute() {
JobQueue.currentlyExecutingJobs.insert(id!)
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
if let message = message as? VisibleMessage { if let message = message as? VisibleMessage {
guard TSOutgoingMessage.find(withTimestamp: message.sentTimestamp!) != nil else { return } // The message has been deleted guard TSOutgoingMessage.find(withTimestamp: message.sentTimestamp!) != nil else { return } // The message has been deleted

@ -38,6 +38,7 @@ public final class NotifyPNServerJob : NSObject, Job, NSCoding { // NSObject/NSC
} }
public func execute() -> Promise<Void> { public func execute() -> Promise<Void> {
JobQueue.currentlyExecutingJobs.insert(id!)
let server = PushNotificationAPI.server let server = PushNotificationAPI.server
let parameters = [ "data" : message.data.description, "send_to" : message.recipient ] let parameters = [ "data" : message.data.description, "send_to" : message.recipient ]
let url = URL(string: "\(server)/notify")! let url = URL(string: "\(server)/notify")!

Loading…
Cancel
Save