diff --git a/Podfile.lock b/Podfile.lock index 0239f9c29..9cd0a2706 100644 --- a/Podfile.lock +++ b/Podfile.lock @@ -242,6 +242,6 @@ SPEC CHECKSUMS: YYImage: f1ddd15ac032a58b78bbed1e012b50302d318331 ZXingObjC: fdbb269f25dd2032da343e06f10224d62f537bdb -PODFILE CHECKSUM: 2bf7639359fecebe56e9757d88f4eb48864652d2 +PODFILE CHECKSUM: 97324ae5888b01db2f2adc4dcc239e2e7d6867f7 COCOAPODS: 1.11.3 diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index e8773d8c6..e593f16fd 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -761,6 +761,10 @@ FD9004142818AD0B00ABAAF6 /* _002_SetupStandardJobs.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD9004132818AD0B00ABAAF6 /* _002_SetupStandardJobs.swift */; }; FD9004152818B46300ABAAF6 /* JobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDF0B7432804EF1B004C14C5 /* JobRunner.swift */; }; FD9004162818B46700ABAAF6 /* JobRunnerError.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDE77F68280F9EDA002CFC5D /* JobRunnerError.swift */; }; + FD96F3A529DBC3DC00401309 /* MessageSendJobSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A429DBC3DC00401309 /* MessageSendJobSpec.swift */; }; + FD96F3A729DBD43D00401309 /* MockJobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A629DBD43D00401309 /* MockJobRunner.swift */; }; + FD96F3A829DBD4AD00401309 /* MockJobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A629DBD43D00401309 /* MockJobRunner.swift */; }; + FD96F3A929DBD4AD00401309 /* MockJobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A629DBD43D00401309 /* MockJobRunner.swift */; }; FDA8EAFE280E8B78002B68E5 /* FailedMessageSendsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDA8EAFD280E8B78002B68E5 /* FailedMessageSendsJob.swift */; }; FDA8EB00280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDA8EAFF280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift */; }; FDA8EB10280F8238002B68E5 /* Codable+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDA8EB0F280F8238002B68E5 /* Codable+Utilities.swift */; }; @@ -1841,6 +1845,8 @@ FD87DD0328B8727D00AF0F98 /* Configuration.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Configuration.swift; sourceTree = ""; }; FD90040E2818AB6D00ABAAF6 /* GetSnodePoolJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GetSnodePoolJob.swift; sourceTree = ""; }; FD9004132818AD0B00ABAAF6 /* _002_SetupStandardJobs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _002_SetupStandardJobs.swift; sourceTree = ""; }; + FD96F3A429DBC3DC00401309 /* MessageSendJobSpec.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageSendJobSpec.swift; sourceTree = ""; }; + FD96F3A629DBD43D00401309 /* MockJobRunner.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockJobRunner.swift; sourceTree = ""; }; FDA8EAFD280E8B78002B68E5 /* FailedMessageSendsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FailedMessageSendsJob.swift; sourceTree = ""; }; FDA8EAFF280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FailedAttachmentDownloadsJob.swift; sourceTree = ""; }; FDA8EB0F280F8238002B68E5 /* Codable+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Codable+Utilities.swift"; sourceTree = ""; }; @@ -3928,6 +3934,7 @@ children = ( FDC290A527D860CE005DAE71 /* Mock.swift */, FDFD645C27F273F300808CA1 /* MockGeneralCache.swift */, + FD96F3A629DBD43D00401309 /* MockJobRunner.swift */, FD83B9BD27CF2243005E1583 /* TestConstants.swift */, FDC290A727D9B46D005DAE71 /* NimbleExtensions.swift */, FD078E4727E02561000769AF /* CommonMockedExtensions.swift */, @@ -3970,6 +3977,22 @@ path = JobRunner; sourceTree = ""; }; + FD96F3A229DBC3BA00401309 /* Jobs */ = { + isa = PBXGroup; + children = ( + FD96F3A329DBC3D000401309 /* Types */, + ); + path = Jobs; + sourceTree = ""; + }; + FD96F3A329DBC3D000401309 /* Types */ = { + isa = PBXGroup; + children = ( + FD96F3A429DBC3DC00401309 /* MessageSendJobSpec.swift */, + ); + path = Types; + sourceTree = ""; + }; FDC2909227D710A9005DAE71 /* Types */ = { isa = PBXGroup; children = ( @@ -4059,6 +4082,7 @@ FDC4389B27BA01E300C60D73 /* _TestUtilities */, FD3C905D27E410DB00CD579F /* Common Networking */, FD3C906527E416A200CD579F /* Contacts */, + FD96F3A229DBC3BA00401309 /* Jobs */, FDC4389827BA001800C60D73 /* Open Groups */, FD3C906B27E43C2400CD579F /* Sending & Receiving */, FD3C906827E417B100CD579F /* Utilities */, @@ -5794,6 +5818,7 @@ FD71161528D00D6700B47552 /* ThreadDisappearingMessagesViewModelSpec.swift in Sources */, FD23EA5E28ED00FD0058676E /* NimbleExtensions.swift in Sources */, FD23EA5F28ED00FF0058676E /* CommonMockedExtensions.swift in Sources */, + FD96F3A829DBD4AD00401309 /* MockJobRunner.swift in Sources */, FD23EA5D28ED00FA0058676E /* TestConstants.swift in Sources */, FD71161A28D00E1100B47552 /* NotificationContentViewModelSpec.swift in Sources */, FD23EA5C28ED00F80058676E /* Mock.swift in Sources */, @@ -5807,6 +5832,7 @@ buildActionMask = 2147483647; files = ( FD2AAAF228ED57B500A49611 /* SynchronousStorage.swift in Sources */, + FD96F3A929DBD4AD00401309 /* MockJobRunner.swift in Sources */, FD078E4927E02576000769AF /* CommonMockedExtensions.swift in Sources */, FD83B9BF27CF2294005E1583 /* TestConstants.swift in Sources */, FD83B9BB27CF20AF005E1583 /* SessionIdSpec.swift in Sources */, @@ -5827,9 +5853,11 @@ FD3C905C27E3FBEF00CD579F /* BatchRequestInfoSpec.swift in Sources */, FD859EFA27C2F5C500510D0C /* MockGenericHash.swift in Sources */, FDC2909427D710B4005DAE71 /* SOGSEndpointSpec.swift in Sources */, + FD96F3A529DBC3DC00401309 /* MessageSendJobSpec.swift in Sources */, FDC290B327DFF9F5005DAE71 /* TestOnionRequestAPI.swift in Sources */, FDC2909127D709CA005DAE71 /* SOGSMessageSpec.swift in Sources */, FD3C906A27E417CE00CD579F /* SodiumUtilitiesSpec.swift in Sources */, + FD96F3A729DBD43D00401309 /* MockJobRunner.swift in Sources */, FD3C907127E445E500CD579F /* MessageReceiverDecryptionSpec.swift in Sources */, FDC2909627D71252005DAE71 /* SOGSErrorSpec.swift in Sources */, FDC2908727D7047F005DAE71 /* RoomSpec.swift in Sources */, diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 037e39647..8f533f286 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -2015,7 +2015,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)) + kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread @@ -2269,7 +2270,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .screenshot + kind: .screenshot, + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread diff --git a/Session/Media Viewing & Editing/MediaPageViewController.swift b/Session/Media Viewing & Editing/MediaPageViewController.swift index 49d9374e5..0ffc3cd42 100644 --- a/Session/Media Viewing & Editing/MediaPageViewController.swift +++ b/Session/Media Viewing & Editing/MediaPageViewController.swift @@ -540,7 +540,8 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou message: DataExtractionNotification( kind: .mediaSaved( timestamp: UInt64(currentViewController.galleryItem.interactionTimestampMs) - ) + ), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, // Show no interaction for the current user in: thread diff --git a/Session/Meta/AppDelegate.swift b/Session/Meta/AppDelegate.swift index d91851597..dc7d58d64 100644 --- a/Session/Meta/AppDelegate.swift +++ b/Session/Meta/AppDelegate.swift @@ -254,7 +254,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD private func completePostMigrationSetup(needsConfigSync: Bool) { Configuration.performMainSetup() - JobRunner.add(executor: SyncPushTokensJob.self, for: .syncPushTokens) + JobRunner.setExecutor(SyncPushTokensJob.self, for: .syncPushTokens) /// Setup the UI /// diff --git a/SessionMessagingKit/Configuration.swift b/SessionMessagingKit/Configuration.swift index b50ac3b94..2edd387b3 100644 --- a/SessionMessagingKit/Configuration.swift +++ b/SessionMessagingKit/Configuration.swift @@ -34,17 +34,17 @@ public enum SNMessagingKit { // Just to make the external API nice public static func configure() { // Configure the job executors - JobRunner.add(executor: DisappearingMessagesJob.self, for: .disappearingMessages) - JobRunner.add(executor: FailedMessageSendsJob.self, for: .failedMessageSends) - JobRunner.add(executor: FailedAttachmentDownloadsJob.self, for: .failedAttachmentDownloads) - JobRunner.add(executor: UpdateProfilePictureJob.self, for: .updateProfilePicture) - JobRunner.add(executor: RetrieveDefaultOpenGroupRoomsJob.self, for: .retrieveDefaultOpenGroupRooms) - JobRunner.add(executor: GarbageCollectionJob.self, for: .garbageCollection) - JobRunner.add(executor: MessageSendJob.self, for: .messageSend) - JobRunner.add(executor: MessageReceiveJob.self, for: .messageReceive) - JobRunner.add(executor: NotifyPushServerJob.self, for: .notifyPushServer) - JobRunner.add(executor: SendReadReceiptsJob.self, for: .sendReadReceipts) - JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload) - JobRunner.add(executor: AttachmentUploadJob.self, for: .attachmentUpload) + JobRunner.setExecutor(DisappearingMessagesJob.self, for: .disappearingMessages) + JobRunner.setExecutor(FailedMessageSendsJob.self, for: .failedMessageSends) + JobRunner.setExecutor(FailedAttachmentDownloadsJob.self, for: .failedAttachmentDownloads) + JobRunner.setExecutor(UpdateProfilePictureJob.self, for: .updateProfilePicture) + JobRunner.setExecutor(RetrieveDefaultOpenGroupRoomsJob.self, for: .retrieveDefaultOpenGroupRooms) + JobRunner.setExecutor(GarbageCollectionJob.self, for: .garbageCollection) + JobRunner.setExecutor(MessageSendJob.self, for: .messageSend) + JobRunner.setExecutor(MessageReceiveJob.self, for: .messageReceive) + JobRunner.setExecutor(NotifyPushServerJob.self, for: .notifyPushServer) + JobRunner.setExecutor(SendReadReceiptsJob.self, for: .sendReadReceipts) + JobRunner.setExecutor(AttachmentDownloadJob.self, for: .attachmentDownload) + JobRunner.setExecutor(AttachmentUploadJob.self, for: .attachmentUpload) } } diff --git a/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift index 14934a4e7..4eeb8e27c 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift @@ -42,8 +42,8 @@ public enum AttachmentDownloadJob: JobExecutor { // the same attachment multiple times at the same time (it also adds a "clean up" mechanism // if an attachment ends up stuck in a "downloading" state incorrectly guard attachment.state != .downloading else { - let otherCurrentJobAttachmentIds: Set = JobRunner - .detailsForCurrentlyRunningJobs(of: .attachmentDownload) + let otherCurrentJobAttachmentIds: Set = dependencies.jobRunner + .detailsFor(state: .running, variant: .attachmentDownload) .filter { key, _ in key != job.id } .values .compactMap { data -> String? in diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 050dc1bfa..ef9ae90d4 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -24,7 +24,7 @@ public enum MessageSendJob: JobExecutor { let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { - failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) + failure(job, JobRunnerError.missingRequiredDetails, true, dependencies) return } @@ -37,7 +37,7 @@ public enum MessageSendJob: JobExecutor { let jobId: Int64 = job.id, let interactionId: Int64 = job.interactionId else { - failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) + failure(job, JobRunnerError.missingRequiredDetails, true, dependencies) return } @@ -89,16 +89,16 @@ public enum MessageSendJob: JobExecutor { } .filter { stateInfo in // Don't add a new job if there is one already in the queue - !JobRunner.hasPendingOrRunningJob( - with: .attachmentUpload, - details: AttachmentUploadJob.Details( + !dependencies.jobRunner.hasJob( + of: .attachmentUpload, + with: AttachmentUploadJob.Details( messageSendJobId: jobId, attachmentId: stateInfo.attachmentId ) ) } .compactMap { stateInfo -> (jobId: Int64, job: Job)? in - JobRunner + dependencies.jobRunner .insert( db, job: Job( @@ -131,8 +131,8 @@ public enum MessageSendJob: JobExecutor { let hasPendingUploads: Bool = allAttachmentStateInfo.contains(where: { $0.state != .uploaded }) return ( - (isMissingFileIds && !hasPendingUploads), - hasPendingUploads, + (isMissingFileIds && !hasPendingUploads), // shouldFail + hasPendingUploads, // shouldDefer fileIds ) } diff --git a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift index d54549df1..627d58ea8 100644 --- a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift +++ b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift @@ -27,8 +27,13 @@ public final class DataExtractionNotification: ControlMessage { // MARK: - Initialization - public init(kind: Kind) { - super.init() + public init( + kind: Kind, + sentTimestamp: UInt64? = nil + ) { + super.init( + sentTimestamp: sentTimestamp + ) self.kind = kind } diff --git a/SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift b/SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift new file mode 100644 index 000000000..f828c4394 --- /dev/null +++ b/SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift @@ -0,0 +1,415 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB + +import Quick +import Nimble + +@testable import SessionMessagingKit +@testable import SessionUtilitiesKit + +class MessageSendJobSpec: QuickSpec { + // MARK: - Spec + + override func spec() { + var job: Job! + var interaction: Interaction! + var attachment1: Attachment! + var interactionAttachment1: InteractionAttachment! + var mockStorage: Storage! + var mockJobRunner: MockJobRunner! + var dependencies: Dependencies! + + // MARK: - JobRunner + + describe("a MessageSendJob") { + beforeEach { + mockStorage = Storage( + customWriter: try! DatabaseQueue(), + customMigrations: [ + SNUtilitiesKit.migrations(), + SNMessagingKit.migrations() + ] + ) + mockJobRunner = MockJobRunner() + dependencies = Dependencies( + storage: mockStorage, + jobRunner: mockJobRunner, + date: Date(timeIntervalSince1970: 1234567890) + ) + attachment1 = Attachment( + id: "200", + variant: .standard, + state: .failedDownload, + contentType: "text/plain", + byteCount: 200 + ) + + mockStorage.write { db in + try SessionThread.fetchOrCreate(db, id: "Test1", variant: .contact) + } + + mockJobRunner + .when { + $0.hasJob( + of: any(), + inState: .running, + with: AttachmentUploadJob.Details( + messageSendJobId: 1, + attachmentId: attachment1.id + ) + ) + } + .thenReturn(false) + mockJobRunner + .when { $0.insert(any(), job: any(), before: any(), dependencies: dependencies) } + .then { args in + let db: Database = args[0] as! Database + var job: Job = args[1] as! Job + job.id = 1000 + + try! job.insert(db) + } + .thenReturn((1000, Job(variant: .messageSend))) + } + + afterEach { + job = nil + mockStorage = nil + dependencies = nil + } + + it("fails when not given any details") { + job = Job(variant: .messageSend) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + it("fails when not given incorrect details") { + job = Job( + variant: .messageSend, + details: MessageReceiveJob.Details(messages: [], calledFromBackgroundPoller: false) + ) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + context("of VisibleMessage") { + beforeEach { + interaction = Interaction( + id: 100, + serverHash: nil, + messageUuid: nil, + threadId: "Test1", + authorId: "Test", + variant: .standardOutgoing, + body: "Test", + timestampMs: 1234567890, + receivedAtTimestampMs: 1234567900, + wasRead: false, + hasMention: false, + expiresInSeconds: nil, + expiresStartedAtMs: nil, + linkPreviewUrl: nil, + openGroupServerMessageId: nil, + openGroupWhisperMods: false, + openGroupWhisperTo: nil + ) + job = Job( + variant: .messageSend, + interactionId: interaction.id!, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + + mockStorage.write { db in + try interaction.insert(db) + try job.insert(db) + } + } + + it("fails when there is no job id") { + job = Job( + variant: .messageSend, + interactionId: interaction.id!, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + it("fails when there is no interaction id") { + job = Job( + variant: .messageSend, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + it("fails when there is no interaction for the provided interaction id") { + job = Job( + variant: .messageSend, + interactionId: 12345, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + mockStorage.write { db in try job.insert(db) } + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(StorageError.objectNotFound)) + expect(permanentFailure).to(beTrue()) + } + context("with an attachment") { + beforeEach { + interactionAttachment1 = InteractionAttachment( + albumIndex: 0, + interactionId: interaction.id!, + attachmentId: attachment1.id + ) + + mockStorage.write { db in + try attachment1.insert(db) + try interactionAttachment1.insert(db) + } + } + + it("it fails when trying to send with an attachment which previously failed to download") { + mockStorage.write { db in + try attachment1.with(state: .failedDownload).save(db) + } + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(AttachmentError.notUploaded)) + expect(permanentFailure).to(beTrue()) + } + + it("it fails when trying to send with an attachment that has an invalid downloadUrl") { + mockStorage.write { db in + try attachment1 + .with( + state: .uploaded, + downloadUrl: nil + ) + .save(db) + } + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(AttachmentError.notUploaded)) + expect(permanentFailure).to(beTrue()) + } + + context("with a pending upload") { + beforeEach { + mockStorage.write { db in + try attachment1.with(state: .uploading).save(db) + } + } + + it("it defers when trying to send with an attachment which is still pending upload") { + var didDefer: Bool = false + + mockStorage.write { db in + try attachment1.with(state: .uploading).save(db) + } + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in didDefer = true }, + dependencies: dependencies + ) + + expect(didDefer).to(beTrue()) + } + + it("inserts an attachment upload job before the message send job") { + mockJobRunner + .when { + $0.hasJob( + of: any(), + inState: .running, + with: AttachmentUploadJob.Details( + messageSendJobId: 1, + attachmentId: "200" + ) + ) + } + .thenReturn(false) + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(mockJobRunner) + .to(call(.exactly(times: 1), matchingParameters: true) { + $0.insert( + any(), + job: Job( + variant: .attachmentUpload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + interactionId: 100, + details: AttachmentUploadJob.Details( + messageSendJobId: 1, + attachmentId: "200" + ) + ), + before: job, + dependencies: dependencies + ) + }) + } + + it("creates a dependency between the new job and the existing one") { + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(mockStorage.read { db in try JobDependencies.fetchOne(db) }) + .to(equal(JobDependencies(jobId: 9, dependantId: 1000))) + } + } + } + } + } + } +} diff --git a/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift b/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift index 03618a3b5..a8b3743e0 100644 --- a/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift +++ b/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift @@ -3568,11 +3568,11 @@ class OpenGroupManagerSpec: QuickSpec { it("adds the image retrieval promise to the cache") { class TestNeverReturningApi: OnionRequestAPIType { - static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String) -> Promise<(OnionRequestResponseInfoType, Data?)> { + static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String, timeout: TimeInterval) -> Promise<(OnionRequestResponseInfoType, Data?)> { return Promise<(OnionRequestResponseInfoType, Data?)>.pending().promise } - static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?) -> Promise { + static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?, timeout: TimeInterval) -> Promise { return Promise.value(Data()) } } diff --git a/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift b/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift index 67b7dde86..3da2fbdd4 100644 --- a/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift +++ b/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift @@ -34,7 +34,7 @@ class TestOnionRequestAPI: OnionRequestAPIType { class var mockResponse: Data? { return nil } - static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String) -> Promise<(OnionRequestResponseInfoType, Data?)> { + static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String, timeout: TimeInterval) -> Promise<(OnionRequestResponseInfoType, Data?)> { let responseInfo: ResponseInfo = ResponseInfo( requestData: RequestData( urlString: request.url?.absoluteString, @@ -54,7 +54,7 @@ class TestOnionRequestAPI: OnionRequestAPIType { return Promise.value((responseInfo, mockResponse)) } - static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?) -> Promise { + static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?, timeout: TimeInterval) -> Promise { return Promise.value(mockResponse!) } } diff --git a/SessionSnodeKit/Configuration.swift b/SessionSnodeKit/Configuration.swift index 259ac87ab..5035bfa78 100644 --- a/SessionSnodeKit/Configuration.swift +++ b/SessionSnodeKit/Configuration.swift @@ -24,6 +24,6 @@ public enum SNSnodeKit { // Just to make the external API nice public static func configure() { // Configure the job executors - JobRunner.add(executor: GetSnodePoolJob.self, for: .getSnodePool) + JobRunner.setExecutor(GetSnodePoolJob.self, for: .getSnodePool) } } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 037d83fc3..b2f7dc8fd 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { +public struct Job: Codable, Hashable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "job" } internal static let dependencyForeignKey = ForeignKey([Columns.id], to: [JobDependencies.Columns.dependantId]) public static let dependantJobDependency = hasMany( @@ -184,7 +184,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer // MARK: - Initialization - fileprivate init( + internal init( id: Int64?, failureCount: UInt, variant: Variant, diff --git a/SessionUtilitiesKit/Database/Models/JobDependencies.swift b/SessionUtilitiesKit/Database/Models/JobDependencies.swift index 9cda7ceb1..3613d6912 100644 --- a/SessionUtilitiesKit/Database/Models/JobDependencies.swift +++ b/SessionUtilitiesKit/Database/Models/JobDependencies.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { +public struct JobDependencies: Codable, Hashable, Equatable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "jobDependencies" } internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) diff --git a/SessionUtilitiesKit/General/Array+Utilities.swift b/SessionUtilitiesKit/General/Array+Utilities.swift index cf350e86d..77445b64b 100644 --- a/SessionUtilitiesKit/General/Array+Utilities.swift +++ b/SessionUtilitiesKit/General/Array+Utilities.swift @@ -45,6 +45,14 @@ public extension Array { return updatedArray } + func inserting(contentsOf other: [Element]?, at index: Int) -> [Element] { + guard let other: [Element] = other else { return self } + + var updatedArray: [Element] = self + updatedArray.insert(contentsOf: other, at: 0) + return updatedArray + } + func grouped(by keyForValue: (Element) throws -> Key) -> [Key: [Element]] { return ((try? Dictionary(grouping: self, by: keyForValue)) ?? [:]) } diff --git a/SessionUtilitiesKit/General/Dependencies.swift b/SessionUtilitiesKit/General/Dependencies.swift index e9c576639..61e2f510d 100644 --- a/SessionUtilitiesKit/General/Dependencies.swift +++ b/SessionUtilitiesKit/General/Dependencies.swift @@ -16,6 +16,12 @@ open class Dependencies { set { _storage.mutate { $0 = newValue } } } + public var _jobRunner: Atomic + public var jobRunner: JobRunnerType { + get { Dependencies.getValueSettingIfNull(&_jobRunner) { JobRunner.instance } } + set { _jobRunner.mutate { $0 = newValue } } + } + public var _scheduler: Atomic public var scheduler: ValueObservationScheduler { get { Dependencies.getValueSettingIfNull(&_scheduler) { Storage.defaultPublisherScheduler } } @@ -39,12 +45,14 @@ open class Dependencies { public init( generalCache: Atomic? = nil, storage: Storage? = nil, + jobRunner: JobRunnerType? = nil, scheduler: ValueObservationScheduler? = nil, standardUserDefaults: UserDefaultsType? = nil, date: Date? = nil ) { _generalCache = Atomic(generalCache) _storage = Atomic(storage) + _jobRunner = Atomic(jobRunner) _scheduler = Atomic(scheduler) _standardUserDefaults = Atomic(standardUserDefaults) _date = Atomic(date) diff --git a/SessionUtilitiesKit/General/Set+Utilities.swift b/SessionUtilitiesKit/General/Set+Utilities.swift index 5fb2d416b..f6f45d27a 100644 --- a/SessionUtilitiesKit/General/Set+Utilities.swift +++ b/SessionUtilitiesKit/General/Set+Utilities.swift @@ -3,6 +3,12 @@ import Foundation public extension Set { + mutating func insert(contentsOf value: Set?) { + guard let value: Set = value else { return } + + value.forEach { self.insert($0) } + } + func inserting(_ value: Element?) -> Set { guard let value: Element = value else { return self } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index fafb086d2..484a36aa7 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -3,6 +3,62 @@ import Foundation import GRDB +public protocol JobRunnerType { + // MARK: - Configuration + + func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) + func canStart(queue: JobQueue) -> Bool + + // MARK: - State Management + + func isCurrentlyRunning(_ job: Job?) -> Bool + func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState, with jobDetails: T) -> Bool + func detailsFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: Data?] + + func appDidFinishLaunching(dependencies: Dependencies) + func appDidBecomeActive(dependencies: Dependencies) + func startNonBlockingQueues(dependencies: Dependencies) + func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, onComplete: (() -> ())?) + + // MARK: - Job Scheduling + + func add(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) + func upsert(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) + @discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job, dependencies: Dependencies) -> (Int64, Job)? +} + +public extension JobRunnerType { + func stopAndClearPendingJobs(exceptForVariant: Job.Variant? = nil, onComplete: (() -> ())? = nil) { + stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) + } + + func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState = .any, with jobDetails: T) -> Bool { + return hasJob(of: variant, inState: state, with: jobDetails) + } + + func details() -> [Int64: Data?] { return detailsFor(jobs: nil, state: .any, variant: nil) } + + func detailsFor(jobs: [Job]) -> [Int64: Data?] { + return detailsFor(jobs: jobs, state: .any, variant: nil) + } + + func detailsFor(jobs: [Job], state: JobRunner.JobState) -> [Int64: Data?] { + return detailsFor(jobs: jobs, state: state, variant: nil) + } + + func detailsFor(state: JobRunner.JobState) -> [Int64: Data?] { + return detailsFor(jobs: nil, state: state, variant: nil) + } + + func detailsFor(state: JobRunner.JobState, variant: Job.Variant) -> [Int64: Data?] { + return detailsFor(jobs: nil, state: state, variant: variant) + } + + func detailsFor(variant: Job.Variant) -> [Int64: Data?] { + return detailsFor(jobs: nil, state: .any, variant: variant) + } +} + public protocol JobExecutor { /// The maximum number of times the job can fail before it fails permanently /// @@ -35,7 +91,20 @@ public protocol JobExecutor { ) } -public final class JobRunner { +public final class JobRunner: JobRunnerType { + public struct JobState: OptionSet, Hashable { + public let rawValue: UInt8 + + public init(rawValue: UInt8) { + self.rawValue = rawValue + } + + public static let pending: JobState = JobState(rawValue: 1 << 0) + public static let running: JobState = JobState(rawValue: 1 << 1) + + public static let any: JobState = [ .pending, .running ] + } + public enum JobResult { case succeeded case failed @@ -45,19 +114,33 @@ public final class JobRunner { // MARK: - Variables + private let allowToExecuteJobs: Bool private let blockingQueue: Atomic private let queues: Atomic<[Job.Variant: JobQueue]> + internal var appReadyToStartQueues: Atomic = Atomic(false) internal var perSessionJobsCompleted: Atomic> = Atomic([]) internal var hasCompletedInitialBecomeActive: Atomic = Atomic(false) internal var shutdownBackgroundTask: Atomic = Atomic(nil) - internal var canStartQueues: Atomic = Atomic(false) // MARK: - Initialization - init(dependencies: Dependencies = Dependencies()) { - var jobVariants: Set = Job.Variant.allCases.asSet() - + init( + isTestingJobRunner: Bool = false, + variantsToExclude: [Job.Variant] = [], + dependencies: Dependencies = Dependencies() + ) { + var jobVariants: Set = Job.Variant.allCases + .filter { !variantsToExclude.contains($0) } + .asSet() + + self.allowToExecuteJobs = ( + isTestingJobRunner || ( + HasAppContext() && + CurrentAppContext().isMainApp && + !CurrentAppContext().isRunningTests + ) + ) self.blockingQueue = Atomic( JobQueue( type: .blocking, @@ -66,7 +149,7 @@ public final class JobRunner { onQueueDrained: { // Once all blocking jobs have been completed we want to start running // the remaining job queues - JobRunner.startNonBlockingQueues(dependencies: dependencies) + dependencies.jobRunner.startNonBlockingQueues(dependencies: dependencies) } ) ) @@ -127,15 +210,92 @@ public final class JobRunner { // MARK: - Configuration - internal func add(executor: JobExecutor.Type, for variant: Job.Variant) { - queues.wrappedValue[variant]?.addExecutor(executor, for: variant) + public func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + blockingQueue.wrappedValue?.setExecutor(executor, for: variant) // The blocking queue can run any job + queues.wrappedValue[variant]?.setExecutor(executor, for: variant) } - // MARK: - Execution + public func canStart(queue: JobQueue) -> Bool { + return ( + allowToExecuteJobs && + appReadyToStartQueues.wrappedValue + ) + } + + // MARK: - State Management + + public func isCurrentlyRunning(_ job: Job?) -> Bool { + guard let job: Job = job else { return false } + + return !detailsFor(jobs: [job], state: .running).isEmpty + } + + public func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState, with jobDetails: T) -> Bool { + guard let detailsData: Data = try? JSONEncoder().encode(jobDetails) else { return false } + + return detailsFor(state: state, variant: variant).values.contains(detailsData) + } + + public func detailsFor( + jobs: [Job]?, + state: JobRunner.JobState, + variant: Job.Variant? + ) -> [Int64: Data?] { + var result: [(Int64, Data?)] = [] + let targetKeys: [JobQueue.JobKey] = (jobs?.compactMap { JobQueue.JobKey($0) } ?? []) + let targetVariants: [Job.Variant] = (variant.map { [$0] } ?? jobs?.map { $0.variant }) + .defaulting(to: []) + + // Insert the state of any pending jobs + if state.contains(.pending) { + func detailsFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, Data?)] { + return (queue?.pendingJobsQueue.wrappedValue + .filter { variants.isEmpty || variants.contains($0.variant) } + .compactMap { job -> (Int64, Data?)? in + guard let jobKey: JobQueue.JobKey = JobQueue.JobKey(job) else { return nil } + guard !targetKeys.isEmpty else { return (jobKey.id, job.details) } + + return (targetKeys.contains(jobKey) ? (jobKey.id, job.details) : nil) + }) + .defaulting(to: []) + } + + result.append(contentsOf: detailsFor(queue: blockingQueue.wrappedValue, variants: targetVariants)) + queues.wrappedValue + .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) } + .values + .forEach { queue in result.append(contentsOf: detailsFor(queue: queue, variants: targetVariants)) } + } + + // Insert the state of any running jobs + if state.contains(.running) { + func detailsFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, Data?)] { + return (queue?.detailsForCurrentlyRunningJobs.wrappedValue + .filter { variants.isEmpty || variants.contains($0.key.variant) } + .compactMap { jobKey, details -> (Int64, Data?)? in + guard !targetKeys.isEmpty else { return (jobKey.id, details) } + + return (targetKeys.contains(jobKey) ? (jobKey.id, details) : nil) + }) + .defaulting(to: []) + } + + result.append(contentsOf: detailsFor(queue: blockingQueue.wrappedValue, variants: targetVariants)) + queues.wrappedValue + .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) } + .values + .forEach { queue in result.append(contentsOf: detailsFor(queue: queue, variants: targetVariants)) } + } + + return result + .reduce(into: [:]) { result, next in + result[next.0] = next.1 + } + } - internal func appDidFinishLaunching(dependencies: Dependencies) { + public func appDidFinishLaunching(dependencies: Dependencies) { // Flag that the JobRunner can start it's queues - canStartQueues.mutate { $0 = true } + appReadyToStartQueues.mutate { $0 = true } // Note: 'appDidBecomeActive' will run on first launch anyway so we can // leave those jobs out and can wait until then to start the JobRunner @@ -185,9 +345,9 @@ public final class JobRunner { } } - internal func appDidBecomeActive(dependencies: Dependencies) { + public func appDidBecomeActive(dependencies: Dependencies) { // Flag that the JobRunner can start it's queues - canStartQueues.mutate { $0 = true } + appReadyToStartQueues.mutate { $0 = true } // If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it // can result in the database being suspended and us being unable to interact with it at all @@ -232,7 +392,73 @@ public final class JobRunner { self.hasCompletedInitialBecomeActive.mutate { $0 = true } } - internal func add( + public func startNonBlockingQueues(dependencies: Dependencies) { + queues.wrappedValue.forEach { _, queue in + queue.start(dependencies: dependencies) + } + } + + public func stopAndClearPendingJobs( + exceptForVariant: Job.Variant?, + onComplete: (() -> ())? + ) { + // Inform the JobRunner that it can't start any queues (this is to prevent queues from + // rescheduling themselves while in the background, when the app restarts or becomes active + // the JobRunenr will update this flag) + appReadyToStartQueues.mutate { $0 = false } + + // Stop all queues except for the one containing the `exceptForVariant` + queues.wrappedValue + .values + .filter { queue -> Bool in + guard let exceptForVariant: Job.Variant = exceptForVariant else { return true } + + return !queue.jobVariants.contains(exceptForVariant) + } + .forEach { $0.stopAndClearPendingJobs() } + + // Ensure the queue is actually running (if not the trigger the callback immediately) + guard + let exceptForVariant: Job.Variant = exceptForVariant, + let queue: JobQueue = queues.wrappedValue[exceptForVariant], + queue.isRunning.wrappedValue == true + else { + onComplete?() + return + } + + let oldQueueDrained: (() -> ())? = queue.onQueueDrained + + // Create a backgroundTask to give the queue the chance to properly be drained + shutdownBackgroundTask.mutate { + $0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in + // If the background task didn't succeed then trigger the onComplete (and hope we have + // enough time to complete it's logic) + guard state != .cancelled else { + queue?.onQueueDrained = oldQueueDrained + return + } + guard state != .success else { return } + + onComplete?() + queue?.onQueueDrained = oldQueueDrained + queue?.stopAndClearPendingJobs() + } + } + + // Add a callback to be triggered once the queue is drained + queue.onQueueDrained = { [weak self, weak queue] in + oldQueueDrained?() + queue?.onQueueDrained = oldQueueDrained + onComplete?() + + self?.shutdownBackgroundTask.mutate { $0 = nil } + } + } + + // MARK: - Execution + + public func add( _ db: Database, job: Job?, canStartJob: Bool, @@ -262,7 +488,7 @@ public final class JobRunner { } } - internal func upsert( + public func upsert( _ db: Database, job: Job?, canStartJob: Bool, @@ -285,7 +511,7 @@ public final class JobRunner { } } - @discardableResult internal func insert( + @discardableResult public func insert( _ db: Database, job: Job?, before otherJob: Job, @@ -315,75 +541,6 @@ public final class JobRunner { return (jobId, updatedJob) } - internal func stopAndClearPendingJobs( - exceptForVariant: Job.Variant? = nil, - onComplete: (() -> ())? = nil - ) { - // Inform the JobRunner that it can't start any queues (this is to prevent queues from - // rescheduling themselves while in the background, when the app restarts or becomes active - // the JobRunenr will update this flag) - canStartQueues.mutate { $0 = false } - - // Stop all queues except for the one containing the `exceptForVariant` - queues.wrappedValue - .values - .filter { queue -> Bool in - guard let exceptForVariant: Job.Variant = exceptForVariant else { return true } - - return !queue.jobVariants.contains(exceptForVariant) - } - .forEach { $0.stopAndClearPendingJobs() } - - // Ensure the queue is actually running (if not the trigger the callback immediately) - guard - let exceptForVariant: Job.Variant = exceptForVariant, - let queue: JobQueue = queues.wrappedValue[exceptForVariant], - queue.isRunning.wrappedValue == true - else { - onComplete?() - return - } - - let oldQueueDrained: (() -> ())? = queue.onQueueDrained - - // Create a backgroundTask to give the queue the chance to properly be drained - shutdownBackgroundTask.mutate { - $0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in - // If the background task didn't succeed then trigger the onComplete (and hope we have - // enough time to complete it's logic) - guard state != .cancelled else { - queue?.onQueueDrained = oldQueueDrained - return - } - guard state != .success else { return } - - onComplete?() - queue?.onQueueDrained = oldQueueDrained - queue?.stopAndClearPendingJobs() - } - } - - // Add a callback to be triggered once the queue is drained - queue.onQueueDrained = { [weak self, weak queue] in - oldQueueDrained?() - queue?.onQueueDrained = oldQueueDrained - onComplete?() - - self?.shutdownBackgroundTask.mutate { $0 = nil } - } - } - - internal func isCurrentlyRunning(_ job: Job?) -> Bool { - guard let job: Job = job, let jobId: Int64 = job.id else { return false } - - return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true) - } - - internal func detailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { - return (queues.wrappedValue[variant]?.detailsForAllCurrentlyRunningJobs()) - .defaulting(to: [:]) - } - internal func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else { callback(.notFound) @@ -393,13 +550,6 @@ public final class JobRunner { queue.afterCurrentlyRunningJob(jobId, callback: callback) } - internal func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { - guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } - guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false } - - return targetQueue.hasPendingOrRunningJob(with: detailsData) - } - internal func removePendingJob(_ job: Job?) { guard let job: Job = job, let jobId: Int64 = job.id else { return } @@ -421,100 +571,9 @@ public final class JobRunner { } } -// MARK: - JobRunner Singleton - -public extension JobRunner { - private static let instance: JobRunner = JobRunner() - - // MARK: - Static Access - - static func add(executor: JobExecutor.Type, for variant: Job.Variant) { - instance.add(executor: executor, for: variant) - } - - static func appDidFinishLaunching(dependencies: Dependencies = Dependencies()) { - instance.appDidFinishLaunching(dependencies: dependencies) - } - - static func appDidBecomeActive(dependencies: Dependencies = Dependencies()) { - instance.appDidBecomeActive(dependencies: dependencies) - } - - /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start - /// the JobRunner - /// - /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` - /// is in the future then the job won't be started - static func add( - _ db: Database, - job: Job?, - canStartJob: Bool = true, - dependencies: Dependencies = Dependencies() - ) { instance.add(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } - - /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start - /// the JobRunner - /// - /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` - /// is in the future then the job won't be started - static func upsert( - _ db: Database, - job: Job?, - canStartJob: Bool = true, - dependencies: Dependencies = Dependencies() - ) { instance.upsert(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } - - @discardableResult static func insert( - _ db: Database, - job: Job?, - before otherJob: Job, - dependencies: Dependencies = Dependencies() - ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob, dependencies: dependencies) } - - /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run - /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their - /// failure - they _should_ be picked up again the next time the app is launched) - static func stopAndClearPendingJobs( - exceptForVariant: Job.Variant? = nil, - onComplete: (() -> ())? = nil - ) { instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } - - static func isCurrentlyRunning(_ job: Job?) -> Bool { - return instance.isCurrentlyRunning(job) - } - - static func detailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { - return instance.detailsForCurrentlyRunningJobs(of: variant) - } - - static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { - instance.afterCurrentlyRunningJob(job, callback: callback) - } - - static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { - return instance.hasPendingOrRunningJob(with: variant, details: details) - } - - static func removePendingJob(_ job: Job?) { - instance.removePendingJob(job) - } - - // MARK: - Internal Static Access - - fileprivate static func canStart(queue: JobQueue) -> Bool { - return instance.canStartQueues.wrappedValue - } - - fileprivate static func startNonBlockingQueues(dependencies: Dependencies) { - instance.queues.wrappedValue.forEach { _, queue in - queue.start(dependencies: dependencies) - } - } -} - // MARK: - JobQueue -private final class JobQueue { +public final class JobQueue { fileprivate enum QueueType: Hashable { case blocking case general(number: Int) @@ -577,6 +636,18 @@ private final class JobQueue { } } + fileprivate struct JobKey: Equatable, Hashable { + fileprivate let id: Int64 + fileprivate let variant: Job.Variant + + fileprivate init?(_ job: Job?) { + guard let id: Int64 = job?.id, let variant: Job.Variant = job?.variant else { return nil } + + self.id = id + self.variant = variant + } + } + private static let deferralLoopThreshold: Int = 3 private let type: QueueType @@ -606,17 +677,17 @@ private final class JobQueue { private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) private var nextTrigger: Atomic = Atomic(nil) fileprivate var isRunning: Atomic = Atomic(false) - private var queue: Atomic<[Job]> = Atomic([]) - private var jobsCurrentlyRunning: Atomic> = Atomic([]) + fileprivate var pendingJobsQueue: Atomic<[Job]> = Atomic([]) + fileprivate var jobsCurrentlyRunning: Atomic> = Atomic([]) + fileprivate var detailsForCurrentlyRunningJobs: Atomic<[JobKey: Data?]> = Atomic([:]) private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) - private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) - fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } + fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty } // MARK: - Initialization - init( + fileprivate init( type: QueueType, executionType: ExecutionType = .serial, qos: DispatchQoS, @@ -633,7 +704,7 @@ private final class JobQueue { // MARK: - Configuration - fileprivate func addExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + fileprivate func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { executorMap.mutate { $0[variant] = executor } } @@ -651,7 +722,7 @@ private final class JobQueue { return } - queue.mutate { $0.append(job) } + pendingJobsQueue.mutate { $0.append(job) } } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start @@ -665,21 +736,21 @@ private final class JobQueue { return } - // Lock the queue while checking the index and inserting to ensure we don't run into + // Lock the pendingJobsQueue while checking the index and inserting to ensure we don't run into // any multi-threading shenanigans // - // Note: currently running jobs are removed from the queue so we don't need to check + // Note: currently running jobs are removed from the pendingJobsQueue so we don't need to check // the 'jobsCurrentlyRunning' set var didUpdateExistingJob: Bool = false - queue.mutate { queue in + pendingJobsQueue.mutate { queue in if let jobIndex: Array.Index = queue.firstIndex(where: { $0.id == jobId }) { queue[jobIndex] = job didUpdateExistingJob = true } } - // If we didn't update an existing job then we need to add it to the queue + // If we didn't update an existing job then we need to add it to the pendingJobsQueue guard !didUpdateExistingJob else { return } add(job, canStartJob: canStartJob, dependencies: dependencies) @@ -692,10 +763,10 @@ private final class JobQueue { } // Insert the job before the current job (re-adding the current job to - // the start of the queue if it's not in there) - this will mean the new + // the start of the pendingJobsQueue if it's not in there) - this will mean the new // job will run and then the otherJob will run (or run again) once it's // done - queue.mutate { + pendingJobsQueue.mutate { guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else { $0.insert(contentsOf: [job, otherJob], at: 0) return @@ -710,7 +781,7 @@ private final class JobQueue { canStart: Bool, dependencies: Dependencies ) { - queue.mutate { $0.append(contentsOf: jobs) } + pendingJobsQueue.mutate { $0.append(contentsOf: jobs) } // Start the job runner if needed if canStart && !isRunning.wrappedValue { @@ -723,8 +794,8 @@ private final class JobQueue { canStart: Bool, dependencies: Dependencies ) { - queue.mutate { queue in - // Avoid re-adding jobs to the queue that are already in it (this can + pendingJobsQueue.mutate { queue in + // Avoid re-adding jobs to the pendingJobsQueue that are already in it (this can // happen if the user sends the app to the background before the 'onActive' // jobs and then brings it back to the foreground) let jobsNotAlreadyInQueue: [Job] = jobs @@ -739,16 +810,8 @@ private final class JobQueue { } } - fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool { - return jobsCurrentlyRunning.wrappedValue.contains(jobId) - } - - fileprivate func detailsForAllCurrentlyRunningJobs() -> [Int64: Data?] { - return detailsForCurrentlyRunningJobs.wrappedValue - } - fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { - guard isCurrentlyRunning(jobId) else { + guard jobsCurrentlyRunning.wrappedValue.contains(jobId) else { callback(.notFound) return } @@ -758,14 +821,8 @@ private final class JobQueue { } } - fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { - let pendingJobs: [Job] = queue.wrappedValue - - return pendingJobs.contains { job in job.details == detailsData } - } - fileprivate func removePendingJob(_ jobId: Int64) { - queue.mutate { queue in + pendingJobsQueue.mutate { queue in queue = queue.filter { $0.id != jobId } } } @@ -773,17 +830,12 @@ private final class JobQueue { // MARK: - Job Running fileprivate func start( - force: Bool = false, + forceWhenAlreadyRunning: Bool = false, dependencies: Dependencies ) { - // We only want the JobRunner to run in the main app - guard - HasAppContext() && - CurrentAppContext().isMainApp && - !CurrentAppContext().isRunningTests && - JobRunner.canStart(queue: self) - else { return } - guard force || !isRunning.wrappedValue else { return } + // Only start if the JobRunner is allowed to start the queue + guard dependencies.jobRunner.canStart(queue: self) else { return } + guard forceWhenAlreadyRunning || !isRunning.wrappedValue else { return } // The JobRunner runs synchronously we need to ensure this doesn't start // on the main thread (if it is on the main thread then swap to a different thread) @@ -804,7 +856,7 @@ private final class JobQueue { // Get any pending jobs let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue - let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() + let jobsAlreadyInQueue: Set = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = dependencies.storage.read { db in try Job .filterPendingJobs( @@ -821,7 +873,7 @@ private final class JobQueue { // Determine the number of jobs to run var jobCount: Int = 0 - queue.mutate { queue in + pendingJobsQueue.mutate { queue in queue.append(contentsOf: jobsToRun) jobCount = queue.count } @@ -836,7 +888,7 @@ private final class JobQueue { return } - // Run the first job in the queue + // Run the first job in the pendingJobsQueue if !wasAlreadyRunning { SNLog("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))") } @@ -845,7 +897,7 @@ private final class JobQueue { fileprivate func stopAndClearPendingJobs() { isRunning.mutate { $0 = false } - queue.mutate { $0 = [] } + pendingJobsQueue.mutate { $0 = [] } deferLoopTracker.mutate { $0 = [:] } } @@ -860,7 +912,7 @@ private final class JobQueue { } return } - guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { + guard let (nextJob, numJobsRemaining): (Job, Int) = pendingJobsQueue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { isRunning.mutate { $0 = false } @@ -913,19 +965,21 @@ private final class JobQueue { } // If the 'nextRunTimestamp' for the job is in the future then don't run it yet - guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else { + guard nextJob.nextRunTimestamp <= dependencies.date.timeIntervalSince1970 else { handleJobDeferred(nextJob, dependencies: dependencies) return } // Check if the next job has any dependencies - let dependencyInfo: (expectedCount: Int, jobs: [Job]) = dependencies.storage.read { db in - let numExpectedDependencies: Int = try JobDependencies + let dependencyInfo: (expectedCount: Int, jobs: Set) = dependencies.storage.read { db in + let expectedDependencies: Set = try JobDependencies .filter(JobDependencies.Columns.jobId == nextJob.id) - .fetchCount(db) - let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db) + .fetchSet(db) + let jobDependencies: Set = try Job + .filter(ids: expectedDependencies.compactMap { $0.dependantId }) + .fetchSet(db) - return (numExpectedDependencies, jobDependencies) + return (expectedDependencies.count, jobDependencies) } .defaulting(to: (0, [])) @@ -942,39 +996,15 @@ private final class JobQueue { guard dependencyInfo.jobs.isEmpty else { SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") - let jobDependencyIds: [Int64] = dependencyInfo.jobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = jobDependencyIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependencies which aren't in the queue we should just append them - guard !jobIdsNotInQueue.isEmpty else { - queue.mutate { queue in - queue.append( - contentsOf: dependencyInfo.jobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - queue.append(nextJob) - } - handleJobDeferred(nextJob, dependencies: dependencies) - return - } - - // Otherwise re-add the current job after it's dependencies (if this isn't a concurrent - // queue - don't want to immediately try to start the job again only for it to end up back - // in here) - if executionType != .concurrent { - queue.mutate { queue in - guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else { - queue.append(nextJob) - return - } - - queue.insert(nextJob, at: lastDependencyIndex + 1) - } + /// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue + /// + /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies + /// are successfully completed + pendingJobsQueue.mutate { queue in + queue = queue + .filter { !dependencyInfo.jobs.contains($0) } + .inserting(contentsOf: Array(dependencyInfo.jobs), at: 0) } - handleJobDeferred(nextJob, dependencies: dependencies) return } @@ -993,10 +1023,10 @@ private final class JobQueue { jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) numJobsRunning = jobsCurrentlyRunning.count } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } + detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(JobKey(nextJob), nextJob.details) } SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") - /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to + /// As it turns out Combine doesn't play too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to /// the queue which means an odd situation can occasionally occur where the `finished` event can actually run before the `output` /// event - this can result in unexpected behaviours (for more information see https://github.com/groue/GRDB.swift/issues/1334) /// @@ -1049,7 +1079,7 @@ private final class JobQueue { // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // the 'onQueueDrained' callback and stop - guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStart(queue: self) else { + guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, dependencies.jobRunner.canStart(queue: self) else { if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { self.onQueueDrained?() } @@ -1073,7 +1103,7 @@ private final class JobQueue { // queue (for concurrent queues we want to force them to load in pending jobs and add // them to the queue regardless of whether the queue is already running) internalQueue.async { [weak self] in - self?.start(force: (self?.executionType == .concurrent), dependencies: dependencies) + self?.start(forceWhenAlreadyRunning: (self?.executionType == .concurrent), dependencies: dependencies) } return } @@ -1097,11 +1127,17 @@ private final class JobQueue { shouldStop: Bool, dependencies: Dependencies ) { + /// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is + /// removed so we need to retrieve these records before that happens) + let dependantJobs: [Job] = dependencies.storage + .read { db in try job.dependantJobs.fetchAll(db) } + .defaulting(to: []) + switch job.behaviour { case .runOnce, .runOnceNextLaunch: dependencies.storage.write { db in - // First remove any JobDependencies requiring this job to be completed (if - // we don't then the dependant jobs will automatically be deleted) + /// Since this job has been completed we can update the dependencies so other job that were dependant + /// on this one can be run _ = try JobDependencies .filter(JobDependencies.Columns.dependantId == job.id) .deleteAll(db) @@ -1111,8 +1147,8 @@ private final class JobQueue { case .recurring where shouldStop == true: dependencies.storage.write { db in - // First remove any JobDependencies requiring this job to be completed (if - // we don't then the dependant jobs will automatically be deleted) + /// Since this job has been completed we can update the dependencies so other job that were dependant + /// on this one can be run _ = try JobDependencies .filter(JobDependencies.Columns.dependantId == job.id) .deleteAll(db) @@ -1120,9 +1156,8 @@ private final class JobQueue { _ = try job.delete(db) } - // For `recurring` jobs which have already run, they should automatically run again - // but we want at least 1 second to pass before doing so - the job itself should - // really update it's own 'nextRunTimestamp' (this is just a safety net) + /// For `recurring` jobs which have already run, they should automatically run again but we want at least 1 second + /// to pass before doing so - the job itself should really update it's own `nextRunTimestamp` (this is just a safety net) case .recurring where job.nextRunTimestamp <= Date().timeIntervalSince1970: guard let jobId: Int64 = job.id else { break } @@ -1136,9 +1171,8 @@ private final class JobQueue { ) } - // For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to - // clear their `failureCount` and `nextRunTimestamp` to prevent them from endlessly running - // over and over again + /// For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to clear their + /// `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over and over again case .recurringOnLaunch, .recurringOnActive: guard let jobId: Int64 = job.id, @@ -1159,26 +1193,17 @@ private final class JobQueue { default: break } - // For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other - // dependencies they will be removed again when they try to execute) - if executionType == .concurrent { - let dependantJobs: [Job] = dependencies.storage - .read { db in try job.dependantJobs.fetchAll(db) } - .defaulting(to: []) - let dependantJobIds: [Int64] = dependantJobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = dependantJobIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependant jobs which aren't in the queue we should just append them - if !jobIdsNotInQueue.isEmpty { - queue.mutate { queue in - queue.append( - contentsOf: dependantJobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - } + /// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the + /// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other + /// unrelated jobs) + /// + /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be + /// removed from the queue, replaced by their dependencies + if !dependantJobs.isEmpty { + pendingJobsQueue.mutate { queue in + queue = queue + .filter { !dependantJobs.contains($0) } + .inserting(contentsOf: dependantJobs, at: 0) } } @@ -1197,7 +1222,7 @@ private final class JobQueue { permanentFailure: Bool, dependencies: Dependencies ) { - guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else { + guard dependencies.storage.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else { SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled") performCleanUp(for: job, result: .failed) @@ -1229,7 +1254,7 @@ private final class JobQueue { // Only add it back to the queue if it wasn't a deferral loop if !wasPossibleDeferralLoop { - queue.mutate { $0.insert(job, at: 0) } + pendingJobsQueue.mutate { $0.insert(job, at: 0) } } internalQueue.async { [weak self] in @@ -1242,20 +1267,30 @@ private final class JobQueue { let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0) let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) - Storage.shared.write { db in + dependencies.storage.write { db in + /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try + /// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely) + let dependantJobIds: [Int64] = try job.dependantJobs + .select(.id) + .asRequest(of: Int64.self) + .fetchAll(db) + + if !dependantJobIds.isEmpty { + pendingJobsQueue.mutate { queue in + queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } + } + } + + /// Delete/update the failed jobs and any dependencies + let updatedFailureCount: UInt = (job.failureCount + 1) guard !permanentFailure && ( maxFailureCount < 0 || - job.failureCount + 1 < maxFailureCount + updatedFailureCount <= maxFailureCount ) else { SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")") - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - // If the job permanently failed or we have performed all of our retry attempts // then delete the job and all of it's dependant jobs (it'll probably never succeed) _ = try job.dependantJobs @@ -1263,48 +1298,28 @@ private final class JobQueue { _ = try job.delete(db) - // Remove the dependant jobs from the queue (so we don't try to run a deleted job) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } - performCleanUp(for: job, result: .failed) return } - SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; scheduling retry (failure count is \(job.failureCount + 1))") + SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; scheduling retry (failure count is \(updatedFailureCount))") _ = try job .with( - failureCount: (job.failureCount + 1), + failureCount: updatedFailureCount, nextRunTimestamp: nextRunTimestamp ) .saved(db) // Update the failureCount and nextRunTimestamp on dependant jobs as well (update the - // 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated it'll + // 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated they'll // come after the dependency) try job.dependantJobs .updateAll( db, - Job.Columns.failureCount.set(to: (job.failureCount + 1)), + Job.Columns.failureCount.set(to: updatedFailureCount), Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000))) ) - - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - - // Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying - // to run dependecies indefinitely) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } } performCleanUp(for: job, result: .failed) @@ -1315,7 +1330,7 @@ private final class JobQueue { /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant /// on other jobs, and it should automatically manage those dependencies) - private func handleJobDeferred( + public func handleJobDeferred( _ job: Job, dependencies: Dependencies ) { @@ -1375,7 +1390,7 @@ private final class JobQueue { // The job is removed from the queue before it runs so all we need to to is remove it // from the 'currentlyRunning' set jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: JobKey(job)) } guard shouldTriggerCallbacks else { return } @@ -1391,3 +1406,74 @@ private final class JobQueue { } } } + +// MARK: - JobRunner Singleton +// FIXME: Remove this once the jobRunner is dependency injected everywhere correctly +public extension JobRunner { + internal static let instance: JobRunner = JobRunner() + + // MARK: - Static Access + + static func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + instance.setExecutor(executor, for: variant) + } + + static func appDidFinishLaunching(dependencies: Dependencies = Dependencies()) { + instance.appDidFinishLaunching(dependencies: dependencies) + } + + static func appDidBecomeActive(dependencies: Dependencies = Dependencies()) { + instance.appDidBecomeActive(dependencies: dependencies) + } + + /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start + /// the JobRunner + /// + /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` + /// is in the future then the job won't be started + static func add( + _ db: Database, + job: Job?, + canStartJob: Bool = true, + dependencies: Dependencies = Dependencies() + ) { instance.add(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } + + /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start + /// the JobRunner + /// + /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` + /// is in the future then the job won't be started + static func upsert( + _ db: Database, + job: Job?, + canStartJob: Bool = true, + dependencies: Dependencies = Dependencies() + ) { instance.upsert(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } + + @discardableResult static func insert( + _ db: Database, + job: Job?, + before otherJob: Job, + dependencies: Dependencies = Dependencies() + ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob, dependencies: dependencies) } + + /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run + /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their + /// failure - they _should_ be picked up again the next time the app is launched) + static func stopAndClearPendingJobs( + exceptForVariant: Job.Variant? = nil, + onComplete: (() -> ())? = nil + ) { instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } + + static func isCurrentlyRunning(_ job: Job?) -> Bool { + return instance.isCurrentlyRunning(job) + } + + static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { + instance.afterCurrentlyRunningJob(job, callback: callback) + } + + static func removePendingJob(_ job: Job?) { + instance.removePendingJob(job) + } +} diff --git a/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift b/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift index b6dfcc2d7..8c4db0211 100644 --- a/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift +++ b/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift @@ -9,7 +9,7 @@ import Nimble @testable import SessionUtilitiesKit class JobRunnerSpec: QuickSpec { - public enum TestSuccessfulJob: JobExecutor { + enum TestSuccessfulJob: JobExecutor { static let maxFailureCount: Int = 0 static let requiresThreadId: Bool = false static let requiresInteractionId: Bool = false @@ -22,14 +22,93 @@ class JobRunnerSpec: QuickSpec { deferred: @escaping (Job, Dependencies) -> (), dependencies: Dependencies ) { - success(job, true, dependencies) + guard dependencies.date.timeIntervalSinceNow > 0 else { return success(job, true, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + success(job, true, dependencies) + } + } + } + + enum TestFailedJob: JobExecutor { + static let maxFailureCount: Int = 1 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + guard dependencies.date.timeIntervalSinceNow > 0 else { return failure(job, nil, false, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + failure(job, nil, false, dependencies) + } + } + } + + enum TestPermanentFailureJob: JobExecutor { + static let maxFailureCount: Int = 1 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + guard dependencies.date.timeIntervalSinceNow > 0 else { return failure(job, nil, true, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + failure(job, nil, true, dependencies) + } + } + } + + enum TestDeferredJob: JobExecutor { + static let maxFailureCount: Int = 0 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + guard dependencies.date.timeIntervalSinceNow > 0 else { return deferred(job, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + deferred(job, dependencies) + } } } + struct TestDetails: Codable { + public let intValue: Int64 + public let stringValue: String + } + + struct InvalidDetails: Codable { + func encode(to encoder: Encoder) throws { throw HTTP.Error.parsingFailed } + } + // MARK: - Spec override func spec() { - var jobRunner: JobRunner! + var jobRunner: JobRunnerType! + var job1: Job! + var job2: Job! + var jobDetails: TestDetails! var mockStorage: Storage! var dependencies: Dependencies! @@ -48,20 +127,1149 @@ class JobRunnerSpec: QuickSpec { date: Date(timeIntervalSince1970: 1234567890) ) - jobRunner = JobRunner() + // Migrations add jobs which we don't want so delete them + mockStorage.write { db in try Job.deleteAll(db) } + + job1 = Job( + id: 100, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + jobDetails = TestDetails( + intValue: 100, + stringValue: "200" + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentUpload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + + jobRunner = JobRunner(isTestingJobRunner: true, dependencies: dependencies) + + // Need to assign this to ensure it's used by nested dependencies + dependencies.jobRunner = jobRunner } afterEach { + jobRunner.stopAndClearPendingJobs() jobRunner = nil mockStorage = nil dependencies = nil } + // MARK: -- when configuring context("when configuring") { it("adds an executor correctly") { - // TODO: Test this - jobRunner.add(executor: TestSuccessfulJob.self, for: .messageSend) + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // First check that it fails to start + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + + // Then check that it succeeded to start + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + } + + // MARK: -- when managing state + + context("when managing state") { + + // MARK: ---- by checking if a job is currently running + + context("by checking if a job is currently running") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + } + + it("returns false when not given a job") { + expect(jobRunner.isCurrentlyRunning(nil)).to(beFalse()) + } + + it("returns false when given a job that has not been persisted") { + job1 = Job(variant: .messageSend) + + expect(jobRunner.isCurrentlyRunning(job1)).to(beFalse()) + } + + it("returns false when given a job that is not running") { + expect(jobRunner.isCurrentlyRunning(job1)).to(beFalse()) + } + + it("returns true when given a non blocking job that is running") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + + it("returns true when given a blocking job that is running") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job2)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + } + + // MARK: ---- by getting the details for jobs + + context("by getting the details for jobs") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentUpload) + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentDownload) + } + + it("returns an empty dictionary when there are no jobs") { + expect(jobRunner.details()).to(equal([:])) + } + + it("returns an empty dictionary when there are no jobs matching the filters") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.detailsFor(state: .running, variant: .messageSend)) + .toEventually( + equal([:]), + timeout: .milliseconds(10) + ) + } + + it("can filter to specific jobs") { + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + /// The `canStartJob` value needs to be `true` for the job to be added to the queue but as + /// long as `appDidFinishLaunching` hasn't been called it won't actually start running and + /// as a result we can test the "pending" state + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.details()) + .toEventuallyNot( + beEmpty(), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(jobs: [job1])).to(equal([:])) + expect(jobRunner.detailsFor(jobs: [job2])).to(equal([101: job2.details])) + } + + it("can filter to running jobs") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.detailsFor(state: .running)) + .toEventually( + equal([100: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + expect(Array(jobRunner.details().keys).sorted()).to(equal([100, 101])) + } + + it("can filter to pending jobs") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.detailsFor(state: .pending)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + expect(Array(jobRunner.details().keys).sorted()).to(equal([100, 101])) + } + + it("can filter to specific variants") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.detailsFor(variant: .attachmentUpload)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + expect(Array(jobRunner.details().keys).sorted()).to(equal([100, 101])) + } + + it("includes non blocking jobs") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.detailsFor(state: .running, variant: .attachmentUpload)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + } + + it("includes blocking jobs") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentUpload, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.detailsFor(state: .running, variant: .attachmentUpload)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + } + } + + // MARK: ---- by checking for an existing job + + context("by checking for an existing job") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentUpload) + } + + it("returns false for a queue that doesn't exist") { + jobRunner = JobRunner( + isTestingJobRunner: true, + variantsToExclude: [.attachmentUpload], + dependencies: dependencies + ) + + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beFalse()) + } + + it("returns false when the provided details fail to decode") { + expect(jobRunner.hasJob(of: .attachmentUpload, with: InvalidDetails())) + .to(beFalse()) + } + + it("returns false when there is not a pending or running job") { + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beFalse()) + } + + it("returns true when there is a pending job") { + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + /// The `canStartJob` value needs to be `true` for the job to be added to the queue but as + /// long as `appDidFinishLaunching` hasn't been called it won't actually start running and + /// as a result we can test the "pending" state + canStartJob: true, + dependencies: dependencies + ) + } + + expect(Array(jobRunner.detailsFor(state: .pending, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + + it("returns true when there is a running job") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + + it("returns true when there is a blocking job") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentUpload, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + + it("returns true when there is a non blocking job") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + } + + // MARK: ---- by being notified of app launch + + context("by being notified of app launch") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + } + + it("does not start a job before getting the app launch call") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + + it("does nothing if there are no app launch jobs") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + + it("starts the job queues after completing blocking app launch jobs") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job2)).to(beFalse()) + + // Make sure it starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // Blocking job running but blocked job not + expect(jobRunner.isCurrentlyRunning(job2)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job1)).to(beFalse()) + + // Blocked job eventually starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(20) + ) + } + + it("starts the job queues alongside non blocking app launch jobs") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Make sure it starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job2)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + } + + // MARK: ---- by being notified of app becoming active + + context("by being notified of app becoming active") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + } + + it("does not start a job before getting the app active call") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + + it("does not start the job queues if there are no app active jobs and blocking jobs are running") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Start the blocking job + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // Make sure the other queues don't start + dependencies.date = Date().addingTimeInterval(30 / 1000) // Complete job after delay + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(20) + ) + } + + it("does not start the job queues if there are app active jobs and blocking jobs are running") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .messageSend, + behaviour: .recurringOnActive, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Start the blocking queue + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // Make sure the other queues don't start + dependencies.date = Date().addingTimeInterval(30 / 1000) // Complete job after delay + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(20) + ) + } + + it("starts the job queues if there are no app active jobs") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + + it("starts the job queues if there are app active jobs") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .messageSend, + behaviour: .recurringOnActive, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Make sure the queues are started + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job2)).to(beTrue()) + } + } + } + + // MARK: -- when running jobs + + context("when running jobs") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentUpload) + jobRunner.appDidFinishLaunching(dependencies: dependencies) } + + // MARK: ---- with dependencies + + context("with dependencies") { + it("starts dependencies first") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + } + + it("removes the initial job from the queue") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the initial job is removed from the queue + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + } + + it("starts the initial job when the dependencies succeed") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure the initial job starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + expect(Array(jobRunner.detailsFor(state: .running, variant: .messageSend).keys)) + .toEventually( + equal([100]), + timeout: .milliseconds(20) + ) + } + + it("does not start the initial job if the dependencies fail") { + jobRunner.setExecutor(TestFailedJob.self, for: .attachmentUpload) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Fail job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure there are no running jobs + expect(Array(jobRunner.detailsFor(state: .running).keys)) + .toEventually( + beEmpty(), + timeout: .milliseconds(20) + ) + } + + it("does not delete the initial job if the dependencies fail") { + jobRunner.setExecutor(TestFailedJob.self, for: .attachmentUpload) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Fail job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure there are no running jobs + dependencies.date = Date().addingTimeInterval(20 / 1000) // Delay subsequent runs + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + beEmpty(), + timeout: .milliseconds(20) + ) + + // Stop the queues so it doesn't run out of retry attempts + jobRunner.stopAndClearPendingJobs(exceptForVariant: nil, onComplete: nil) + + // Make sure the jobs still exist + expect(mockStorage.read { db in try Job.fetchCount(db) }).to(equal(2)) + } + + it("deletes the initial job if the dependencies permanently fail") { + jobRunner.setExecutor(TestPermanentFailureJob.self, for: .attachmentUpload) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Fail job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure there are no running jobs + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + beEmpty(), + timeout: .milliseconds(20) + ) + + // Make sure the jobs were deleted + expect(mockStorage.read { db in try Job.fetchCount(db) }).to(equal(0)) + } + } + } } } diff --git a/_SharedTestUtilities/CommonMockedExtensions.swift b/_SharedTestUtilities/CommonMockedExtensions.swift index 052931525..b70e06327 100644 --- a/_SharedTestUtilities/CommonMockedExtensions.swift +++ b/_SharedTestUtilities/CommonMockedExtensions.swift @@ -1,8 +1,10 @@ // Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Foundation +import GRDB import Sodium import Curve25519Kit +import SessionUtilitiesKit extension Box.KeyPair: Mocked { static var mockValue: Box.KeyPair = Box.KeyPair( @@ -19,3 +21,19 @@ extension ECKeyPair: Mocked { ) } } + +extension Database: Mocked { + static var mockValue: Database { + var result: Database! + try! DatabaseQueue().read { result = $0 } + return result! + } +} + +extension Job: Mocked { + static var mockValue: Job = Job(variant: .messageSend) +} + +extension Job.Variant: Mocked { + static var mockValue: Job.Variant = .messageSend +} diff --git a/_SharedTestUtilities/MockJobRunner.swift b/_SharedTestUtilities/MockJobRunner.swift new file mode 100644 index 000000000..494643ed2 --- /dev/null +++ b/_SharedTestUtilities/MockJobRunner.swift @@ -0,0 +1,56 @@ +// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB +import SessionUtilitiesKit + +@testable import SessionMessagingKit + +class MockJobRunner: Mock, JobRunnerType { + // MARK: - Configuration + + func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + accept(args: [executor, variant]) + } + + func canStart(queue: JobQueue) -> Bool { + return accept(args: [queue]) as! Bool + } + + // MARK: - State Management + + func isCurrentlyRunning(_ job: Job?) -> Bool { + return accept(args: [job]) as! Bool + } + + func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState, with jobDetails: T) -> Bool { + return accept(args: [variant, state, jobDetails]) as! Bool + } + + func detailsFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: Data?] { + return accept(args: [jobs, state, variant]) as! [Int64: Data?] + } + + func appDidFinishLaunching(dependencies: Dependencies) {} + func appDidBecomeActive(dependencies: Dependencies) {} + func startNonBlockingQueues(dependencies: Dependencies) {} + + func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, onComplete: (() -> ())?) { + accept(args: [exceptForVariant, onComplete]) + onComplete?() + } + + // MARK: - Job Scheduling + + func add(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) { + accept(args: [db, job, canStartJob]) + } + + func upsert(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) { + accept(args: [db, job, canStartJob]) + } + + func insert(_ db: Database, job: Job?, before otherJob: Job, dependencies: Dependencies) -> (Int64, Job)? { + return accept(args: [db, job, otherJob]) as? (Int64, Job) + } +}