From ffdc59b7044ac7893f5cd28fac4b0d26db0620d9 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 5 Apr 2023 16:49:38 +1000 Subject: [PATCH] Fixed a few issues with the JobRunner Updated the JobRunner to support dependency injection Updated the DataExtractionNotification to take a 'sentTimestamp' when created to reduce the chance for duplicates being sent Fixed an issue where checking current and pending jobs wasn't including blocking jobs Fixed an issue where the 'hasPendingOrRunningJob' check didn't actually include running jobs Fixed some odd behaviours with job dependencies Fixed an incorrect failure count check --- Podfile.lock | 2 +- Session.xcodeproj/project.pbxproj | 28 + .../ConversationVC+Interaction.swift | 6 +- .../MediaPageViewController.swift | 3 +- Session/Meta/AppDelegate.swift | 2 +- SessionMessagingKit/Configuration.swift | 24 +- .../Jobs/Types/AttachmentDownloadJob.swift | 4 +- .../Jobs/Types/MessageSendJob.swift | 16 +- .../DataExtractionNotification.swift | 9 +- .../Jobs/Types/MessageSendJobSpec.swift | 415 ++++++ .../Open Groups/OpenGroupManagerSpec.swift | 4 +- .../_TestUtilities/TestOnionRequestAPI.swift | 4 +- SessionSnodeKit/Configuration.swift | 2 +- SessionUtilitiesKit/Database/Models/Job.swift | 4 +- .../Database/Models/JobDependencies.swift | 2 +- .../General/Array+Utilities.swift | 8 + .../General/Dependencies.swift | 8 + .../General/Set+Utilities.swift | 6 + SessionUtilitiesKit/JobRunner/JobRunner.swift | 760 +++++----- .../JobRunner/JobRunnerSpec.swift | 1220 ++++++++++++++++- .../CommonMockedExtensions.swift | 18 + _SharedTestUtilities/MockJobRunner.swift | 56 + 22 files changed, 2221 insertions(+), 380 deletions(-) create mode 100644 SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift create mode 100644 _SharedTestUtilities/MockJobRunner.swift diff --git a/Podfile.lock b/Podfile.lock index 0239f9c29..9cd0a2706 100644 --- a/Podfile.lock +++ b/Podfile.lock @@ -242,6 +242,6 @@ SPEC CHECKSUMS: YYImage: f1ddd15ac032a58b78bbed1e012b50302d318331 ZXingObjC: fdbb269f25dd2032da343e06f10224d62f537bdb -PODFILE CHECKSUM: 2bf7639359fecebe56e9757d88f4eb48864652d2 +PODFILE CHECKSUM: 97324ae5888b01db2f2adc4dcc239e2e7d6867f7 COCOAPODS: 1.11.3 diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index e8773d8c6..e593f16fd 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -761,6 +761,10 @@ FD9004142818AD0B00ABAAF6 /* _002_SetupStandardJobs.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD9004132818AD0B00ABAAF6 /* _002_SetupStandardJobs.swift */; }; FD9004152818B46300ABAAF6 /* JobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDF0B7432804EF1B004C14C5 /* JobRunner.swift */; }; FD9004162818B46700ABAAF6 /* JobRunnerError.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDE77F68280F9EDA002CFC5D /* JobRunnerError.swift */; }; + FD96F3A529DBC3DC00401309 /* MessageSendJobSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A429DBC3DC00401309 /* MessageSendJobSpec.swift */; }; + FD96F3A729DBD43D00401309 /* MockJobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A629DBD43D00401309 /* MockJobRunner.swift */; }; + FD96F3A829DBD4AD00401309 /* MockJobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A629DBD43D00401309 /* MockJobRunner.swift */; }; + FD96F3A929DBD4AD00401309 /* MockJobRunner.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD96F3A629DBD43D00401309 /* MockJobRunner.swift */; }; FDA8EAFE280E8B78002B68E5 /* FailedMessageSendsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDA8EAFD280E8B78002B68E5 /* FailedMessageSendsJob.swift */; }; FDA8EB00280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDA8EAFF280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift */; }; FDA8EB10280F8238002B68E5 /* Codable+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDA8EB0F280F8238002B68E5 /* Codable+Utilities.swift */; }; @@ -1841,6 +1845,8 @@ FD87DD0328B8727D00AF0F98 /* Configuration.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Configuration.swift; sourceTree = ""; }; FD90040E2818AB6D00ABAAF6 /* GetSnodePoolJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GetSnodePoolJob.swift; sourceTree = ""; }; FD9004132818AD0B00ABAAF6 /* _002_SetupStandardJobs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _002_SetupStandardJobs.swift; sourceTree = ""; }; + FD96F3A429DBC3DC00401309 /* MessageSendJobSpec.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageSendJobSpec.swift; sourceTree = ""; }; + FD96F3A629DBD43D00401309 /* MockJobRunner.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockJobRunner.swift; sourceTree = ""; }; FDA8EAFD280E8B78002B68E5 /* FailedMessageSendsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FailedMessageSendsJob.swift; sourceTree = ""; }; FDA8EAFF280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FailedAttachmentDownloadsJob.swift; sourceTree = ""; }; FDA8EB0F280F8238002B68E5 /* Codable+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Codable+Utilities.swift"; sourceTree = ""; }; @@ -3928,6 +3934,7 @@ children = ( FDC290A527D860CE005DAE71 /* Mock.swift */, FDFD645C27F273F300808CA1 /* MockGeneralCache.swift */, + FD96F3A629DBD43D00401309 /* MockJobRunner.swift */, FD83B9BD27CF2243005E1583 /* TestConstants.swift */, FDC290A727D9B46D005DAE71 /* NimbleExtensions.swift */, FD078E4727E02561000769AF /* CommonMockedExtensions.swift */, @@ -3970,6 +3977,22 @@ path = JobRunner; sourceTree = ""; }; + FD96F3A229DBC3BA00401309 /* Jobs */ = { + isa = PBXGroup; + children = ( + FD96F3A329DBC3D000401309 /* Types */, + ); + path = Jobs; + sourceTree = ""; + }; + FD96F3A329DBC3D000401309 /* Types */ = { + isa = PBXGroup; + children = ( + FD96F3A429DBC3DC00401309 /* MessageSendJobSpec.swift */, + ); + path = Types; + sourceTree = ""; + }; FDC2909227D710A9005DAE71 /* Types */ = { isa = PBXGroup; children = ( @@ -4059,6 +4082,7 @@ FDC4389B27BA01E300C60D73 /* _TestUtilities */, FD3C905D27E410DB00CD579F /* Common Networking */, FD3C906527E416A200CD579F /* Contacts */, + FD96F3A229DBC3BA00401309 /* Jobs */, FDC4389827BA001800C60D73 /* Open Groups */, FD3C906B27E43C2400CD579F /* Sending & Receiving */, FD3C906827E417B100CD579F /* Utilities */, @@ -5794,6 +5818,7 @@ FD71161528D00D6700B47552 /* ThreadDisappearingMessagesViewModelSpec.swift in Sources */, FD23EA5E28ED00FD0058676E /* NimbleExtensions.swift in Sources */, FD23EA5F28ED00FF0058676E /* CommonMockedExtensions.swift in Sources */, + FD96F3A829DBD4AD00401309 /* MockJobRunner.swift in Sources */, FD23EA5D28ED00FA0058676E /* TestConstants.swift in Sources */, FD71161A28D00E1100B47552 /* NotificationContentViewModelSpec.swift in Sources */, FD23EA5C28ED00F80058676E /* Mock.swift in Sources */, @@ -5807,6 +5832,7 @@ buildActionMask = 2147483647; files = ( FD2AAAF228ED57B500A49611 /* SynchronousStorage.swift in Sources */, + FD96F3A929DBD4AD00401309 /* MockJobRunner.swift in Sources */, FD078E4927E02576000769AF /* CommonMockedExtensions.swift in Sources */, FD83B9BF27CF2294005E1583 /* TestConstants.swift in Sources */, FD83B9BB27CF20AF005E1583 /* SessionIdSpec.swift in Sources */, @@ -5827,9 +5853,11 @@ FD3C905C27E3FBEF00CD579F /* BatchRequestInfoSpec.swift in Sources */, FD859EFA27C2F5C500510D0C /* MockGenericHash.swift in Sources */, FDC2909427D710B4005DAE71 /* SOGSEndpointSpec.swift in Sources */, + FD96F3A529DBC3DC00401309 /* MessageSendJobSpec.swift in Sources */, FDC290B327DFF9F5005DAE71 /* TestOnionRequestAPI.swift in Sources */, FDC2909127D709CA005DAE71 /* SOGSMessageSpec.swift in Sources */, FD3C906A27E417CE00CD579F /* SodiumUtilitiesSpec.swift in Sources */, + FD96F3A729DBD43D00401309 /* MockJobRunner.swift in Sources */, FD3C907127E445E500CD579F /* MessageReceiverDecryptionSpec.swift in Sources */, FDC2909627D71252005DAE71 /* SOGSErrorSpec.swift in Sources */, FDC2908727D7047F005DAE71 /* RoomSpec.swift in Sources */, diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 037e39647..8f533f286 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -2015,7 +2015,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)) + kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread @@ -2269,7 +2270,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .screenshot + kind: .screenshot, + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread diff --git a/Session/Media Viewing & Editing/MediaPageViewController.swift b/Session/Media Viewing & Editing/MediaPageViewController.swift index 49d9374e5..0ffc3cd42 100644 --- a/Session/Media Viewing & Editing/MediaPageViewController.swift +++ b/Session/Media Viewing & Editing/MediaPageViewController.swift @@ -540,7 +540,8 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou message: DataExtractionNotification( kind: .mediaSaved( timestamp: UInt64(currentViewController.galleryItem.interactionTimestampMs) - ) + ), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, // Show no interaction for the current user in: thread diff --git a/Session/Meta/AppDelegate.swift b/Session/Meta/AppDelegate.swift index d91851597..dc7d58d64 100644 --- a/Session/Meta/AppDelegate.swift +++ b/Session/Meta/AppDelegate.swift @@ -254,7 +254,7 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD private func completePostMigrationSetup(needsConfigSync: Bool) { Configuration.performMainSetup() - JobRunner.add(executor: SyncPushTokensJob.self, for: .syncPushTokens) + JobRunner.setExecutor(SyncPushTokensJob.self, for: .syncPushTokens) /// Setup the UI /// diff --git a/SessionMessagingKit/Configuration.swift b/SessionMessagingKit/Configuration.swift index b50ac3b94..2edd387b3 100644 --- a/SessionMessagingKit/Configuration.swift +++ b/SessionMessagingKit/Configuration.swift @@ -34,17 +34,17 @@ public enum SNMessagingKit { // Just to make the external API nice public static func configure() { // Configure the job executors - JobRunner.add(executor: DisappearingMessagesJob.self, for: .disappearingMessages) - JobRunner.add(executor: FailedMessageSendsJob.self, for: .failedMessageSends) - JobRunner.add(executor: FailedAttachmentDownloadsJob.self, for: .failedAttachmentDownloads) - JobRunner.add(executor: UpdateProfilePictureJob.self, for: .updateProfilePicture) - JobRunner.add(executor: RetrieveDefaultOpenGroupRoomsJob.self, for: .retrieveDefaultOpenGroupRooms) - JobRunner.add(executor: GarbageCollectionJob.self, for: .garbageCollection) - JobRunner.add(executor: MessageSendJob.self, for: .messageSend) - JobRunner.add(executor: MessageReceiveJob.self, for: .messageReceive) - JobRunner.add(executor: NotifyPushServerJob.self, for: .notifyPushServer) - JobRunner.add(executor: SendReadReceiptsJob.self, for: .sendReadReceipts) - JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload) - JobRunner.add(executor: AttachmentUploadJob.self, for: .attachmentUpload) + JobRunner.setExecutor(DisappearingMessagesJob.self, for: .disappearingMessages) + JobRunner.setExecutor(FailedMessageSendsJob.self, for: .failedMessageSends) + JobRunner.setExecutor(FailedAttachmentDownloadsJob.self, for: .failedAttachmentDownloads) + JobRunner.setExecutor(UpdateProfilePictureJob.self, for: .updateProfilePicture) + JobRunner.setExecutor(RetrieveDefaultOpenGroupRoomsJob.self, for: .retrieveDefaultOpenGroupRooms) + JobRunner.setExecutor(GarbageCollectionJob.self, for: .garbageCollection) + JobRunner.setExecutor(MessageSendJob.self, for: .messageSend) + JobRunner.setExecutor(MessageReceiveJob.self, for: .messageReceive) + JobRunner.setExecutor(NotifyPushServerJob.self, for: .notifyPushServer) + JobRunner.setExecutor(SendReadReceiptsJob.self, for: .sendReadReceipts) + JobRunner.setExecutor(AttachmentDownloadJob.self, for: .attachmentDownload) + JobRunner.setExecutor(AttachmentUploadJob.self, for: .attachmentUpload) } } diff --git a/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift index 14934a4e7..4eeb8e27c 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift @@ -42,8 +42,8 @@ public enum AttachmentDownloadJob: JobExecutor { // the same attachment multiple times at the same time (it also adds a "clean up" mechanism // if an attachment ends up stuck in a "downloading" state incorrectly guard attachment.state != .downloading else { - let otherCurrentJobAttachmentIds: Set = JobRunner - .detailsForCurrentlyRunningJobs(of: .attachmentDownload) + let otherCurrentJobAttachmentIds: Set = dependencies.jobRunner + .detailsFor(state: .running, variant: .attachmentDownload) .filter { key, _ in key != job.id } .values .compactMap { data -> String? in diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 050dc1bfa..ef9ae90d4 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -24,7 +24,7 @@ public enum MessageSendJob: JobExecutor { let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { - failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) + failure(job, JobRunnerError.missingRequiredDetails, true, dependencies) return } @@ -37,7 +37,7 @@ public enum MessageSendJob: JobExecutor { let jobId: Int64 = job.id, let interactionId: Int64 = job.interactionId else { - failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) + failure(job, JobRunnerError.missingRequiredDetails, true, dependencies) return } @@ -89,16 +89,16 @@ public enum MessageSendJob: JobExecutor { } .filter { stateInfo in // Don't add a new job if there is one already in the queue - !JobRunner.hasPendingOrRunningJob( - with: .attachmentUpload, - details: AttachmentUploadJob.Details( + !dependencies.jobRunner.hasJob( + of: .attachmentUpload, + with: AttachmentUploadJob.Details( messageSendJobId: jobId, attachmentId: stateInfo.attachmentId ) ) } .compactMap { stateInfo -> (jobId: Int64, job: Job)? in - JobRunner + dependencies.jobRunner .insert( db, job: Job( @@ -131,8 +131,8 @@ public enum MessageSendJob: JobExecutor { let hasPendingUploads: Bool = allAttachmentStateInfo.contains(where: { $0.state != .uploaded }) return ( - (isMissingFileIds && !hasPendingUploads), - hasPendingUploads, + (isMissingFileIds && !hasPendingUploads), // shouldFail + hasPendingUploads, // shouldDefer fileIds ) } diff --git a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift index d54549df1..627d58ea8 100644 --- a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift +++ b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift @@ -27,8 +27,13 @@ public final class DataExtractionNotification: ControlMessage { // MARK: - Initialization - public init(kind: Kind) { - super.init() + public init( + kind: Kind, + sentTimestamp: UInt64? = nil + ) { + super.init( + sentTimestamp: sentTimestamp + ) self.kind = kind } diff --git a/SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift b/SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift new file mode 100644 index 000000000..f828c4394 --- /dev/null +++ b/SessionMessagingKitTests/Jobs/Types/MessageSendJobSpec.swift @@ -0,0 +1,415 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB + +import Quick +import Nimble + +@testable import SessionMessagingKit +@testable import SessionUtilitiesKit + +class MessageSendJobSpec: QuickSpec { + // MARK: - Spec + + override func spec() { + var job: Job! + var interaction: Interaction! + var attachment1: Attachment! + var interactionAttachment1: InteractionAttachment! + var mockStorage: Storage! + var mockJobRunner: MockJobRunner! + var dependencies: Dependencies! + + // MARK: - JobRunner + + describe("a MessageSendJob") { + beforeEach { + mockStorage = Storage( + customWriter: try! DatabaseQueue(), + customMigrations: [ + SNUtilitiesKit.migrations(), + SNMessagingKit.migrations() + ] + ) + mockJobRunner = MockJobRunner() + dependencies = Dependencies( + storage: mockStorage, + jobRunner: mockJobRunner, + date: Date(timeIntervalSince1970: 1234567890) + ) + attachment1 = Attachment( + id: "200", + variant: .standard, + state: .failedDownload, + contentType: "text/plain", + byteCount: 200 + ) + + mockStorage.write { db in + try SessionThread.fetchOrCreate(db, id: "Test1", variant: .contact) + } + + mockJobRunner + .when { + $0.hasJob( + of: any(), + inState: .running, + with: AttachmentUploadJob.Details( + messageSendJobId: 1, + attachmentId: attachment1.id + ) + ) + } + .thenReturn(false) + mockJobRunner + .when { $0.insert(any(), job: any(), before: any(), dependencies: dependencies) } + .then { args in + let db: Database = args[0] as! Database + var job: Job = args[1] as! Job + job.id = 1000 + + try! job.insert(db) + } + .thenReturn((1000, Job(variant: .messageSend))) + } + + afterEach { + job = nil + mockStorage = nil + dependencies = nil + } + + it("fails when not given any details") { + job = Job(variant: .messageSend) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + it("fails when not given incorrect details") { + job = Job( + variant: .messageSend, + details: MessageReceiveJob.Details(messages: [], calledFromBackgroundPoller: false) + ) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + context("of VisibleMessage") { + beforeEach { + interaction = Interaction( + id: 100, + serverHash: nil, + messageUuid: nil, + threadId: "Test1", + authorId: "Test", + variant: .standardOutgoing, + body: "Test", + timestampMs: 1234567890, + receivedAtTimestampMs: 1234567900, + wasRead: false, + hasMention: false, + expiresInSeconds: nil, + expiresStartedAtMs: nil, + linkPreviewUrl: nil, + openGroupServerMessageId: nil, + openGroupWhisperMods: false, + openGroupWhisperTo: nil + ) + job = Job( + variant: .messageSend, + interactionId: interaction.id!, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + + mockStorage.write { db in + try interaction.insert(db) + try job.insert(db) + } + } + + it("fails when there is no job id") { + job = Job( + variant: .messageSend, + interactionId: interaction.id!, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + it("fails when there is no interaction id") { + job = Job( + variant: .messageSend, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(JobRunnerError.missingRequiredDetails)) + expect(permanentFailure).to(beTrue()) + } + + it("fails when there is no interaction for the provided interaction id") { + job = Job( + variant: .messageSend, + interactionId: 12345, + details: MessageSendJob.Details( + destination: .contact(publicKey: "Test"), + message: VisibleMessage( + text: "Test" + ) + ) + ) + mockStorage.write { db in try job.insert(db) } + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(StorageError.objectNotFound)) + expect(permanentFailure).to(beTrue()) + } + context("with an attachment") { + beforeEach { + interactionAttachment1 = InteractionAttachment( + albumIndex: 0, + interactionId: interaction.id!, + attachmentId: attachment1.id + ) + + mockStorage.write { db in + try attachment1.insert(db) + try interactionAttachment1.insert(db) + } + } + + it("it fails when trying to send with an attachment which previously failed to download") { + mockStorage.write { db in + try attachment1.with(state: .failedDownload).save(db) + } + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(AttachmentError.notUploaded)) + expect(permanentFailure).to(beTrue()) + } + + it("it fails when trying to send with an attachment that has an invalid downloadUrl") { + mockStorage.write { db in + try attachment1 + .with( + state: .uploaded, + downloadUrl: nil + ) + .save(db) + } + + var error: Error? = nil + var permanentFailure: Bool = false + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, runError, runPermanentFailure, _ in + error = runError + permanentFailure = runPermanentFailure + }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(error).to(matchError(AttachmentError.notUploaded)) + expect(permanentFailure).to(beTrue()) + } + + context("with a pending upload") { + beforeEach { + mockStorage.write { db in + try attachment1.with(state: .uploading).save(db) + } + } + + it("it defers when trying to send with an attachment which is still pending upload") { + var didDefer: Bool = false + + mockStorage.write { db in + try attachment1.with(state: .uploading).save(db) + } + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in didDefer = true }, + dependencies: dependencies + ) + + expect(didDefer).to(beTrue()) + } + + it("inserts an attachment upload job before the message send job") { + mockJobRunner + .when { + $0.hasJob( + of: any(), + inState: .running, + with: AttachmentUploadJob.Details( + messageSendJobId: 1, + attachmentId: "200" + ) + ) + } + .thenReturn(false) + + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(mockJobRunner) + .to(call(.exactly(times: 1), matchingParameters: true) { + $0.insert( + any(), + job: Job( + variant: .attachmentUpload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + interactionId: 100, + details: AttachmentUploadJob.Details( + messageSendJobId: 1, + attachmentId: "200" + ) + ), + before: job, + dependencies: dependencies + ) + }) + } + + it("creates a dependency between the new job and the existing one") { + MessageSendJob.run( + job, + queue: .main, + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in }, + dependencies: dependencies + ) + + expect(mockStorage.read { db in try JobDependencies.fetchOne(db) }) + .to(equal(JobDependencies(jobId: 9, dependantId: 1000))) + } + } + } + } + } + } +} diff --git a/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift b/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift index 03618a3b5..a8b3743e0 100644 --- a/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift +++ b/SessionMessagingKitTests/Open Groups/OpenGroupManagerSpec.swift @@ -3568,11 +3568,11 @@ class OpenGroupManagerSpec: QuickSpec { it("adds the image retrieval promise to the cache") { class TestNeverReturningApi: OnionRequestAPIType { - static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String) -> Promise<(OnionRequestResponseInfoType, Data?)> { + static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String, timeout: TimeInterval) -> Promise<(OnionRequestResponseInfoType, Data?)> { return Promise<(OnionRequestResponseInfoType, Data?)>.pending().promise } - static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?) -> Promise { + static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?, timeout: TimeInterval) -> Promise { return Promise.value(Data()) } } diff --git a/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift b/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift index 67b7dde86..3da2fbdd4 100644 --- a/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift +++ b/SessionMessagingKitTests/_TestUtilities/TestOnionRequestAPI.swift @@ -34,7 +34,7 @@ class TestOnionRequestAPI: OnionRequestAPIType { class var mockResponse: Data? { return nil } - static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String) -> Promise<(OnionRequestResponseInfoType, Data?)> { + static func sendOnionRequest(_ request: URLRequest, to server: String, using version: OnionRequestAPIVersion, with x25519PublicKey: String, timeout: TimeInterval) -> Promise<(OnionRequestResponseInfoType, Data?)> { let responseInfo: ResponseInfo = ResponseInfo( requestData: RequestData( urlString: request.url?.absoluteString, @@ -54,7 +54,7 @@ class TestOnionRequestAPI: OnionRequestAPIType { return Promise.value((responseInfo, mockResponse)) } - static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?) -> Promise { + static func sendOnionRequest(to snode: Snode, invoking method: SnodeAPIEndpoint, with parameters: JSON, associatedWith publicKey: String?, timeout: TimeInterval) -> Promise { return Promise.value(mockResponse!) } } diff --git a/SessionSnodeKit/Configuration.swift b/SessionSnodeKit/Configuration.swift index 259ac87ab..5035bfa78 100644 --- a/SessionSnodeKit/Configuration.swift +++ b/SessionSnodeKit/Configuration.swift @@ -24,6 +24,6 @@ public enum SNSnodeKit { // Just to make the external API nice public static func configure() { // Configure the job executors - JobRunner.add(executor: GetSnodePoolJob.self, for: .getSnodePool) + JobRunner.setExecutor(GetSnodePoolJob.self, for: .getSnodePool) } } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 037d83fc3..b2f7dc8fd 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { +public struct Job: Codable, Hashable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "job" } internal static let dependencyForeignKey = ForeignKey([Columns.id], to: [JobDependencies.Columns.dependantId]) public static let dependantJobDependency = hasMany( @@ -184,7 +184,7 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer // MARK: - Initialization - fileprivate init( + internal init( id: Int64?, failureCount: UInt, variant: Variant, diff --git a/SessionUtilitiesKit/Database/Models/JobDependencies.swift b/SessionUtilitiesKit/Database/Models/JobDependencies.swift index 9cda7ceb1..3613d6912 100644 --- a/SessionUtilitiesKit/Database/Models/JobDependencies.swift +++ b/SessionUtilitiesKit/Database/Models/JobDependencies.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { +public struct JobDependencies: Codable, Hashable, Equatable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "jobDependencies" } internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) diff --git a/SessionUtilitiesKit/General/Array+Utilities.swift b/SessionUtilitiesKit/General/Array+Utilities.swift index cf350e86d..77445b64b 100644 --- a/SessionUtilitiesKit/General/Array+Utilities.swift +++ b/SessionUtilitiesKit/General/Array+Utilities.swift @@ -45,6 +45,14 @@ public extension Array { return updatedArray } + func inserting(contentsOf other: [Element]?, at index: Int) -> [Element] { + guard let other: [Element] = other else { return self } + + var updatedArray: [Element] = self + updatedArray.insert(contentsOf: other, at: 0) + return updatedArray + } + func grouped(by keyForValue: (Element) throws -> Key) -> [Key: [Element]] { return ((try? Dictionary(grouping: self, by: keyForValue)) ?? [:]) } diff --git a/SessionUtilitiesKit/General/Dependencies.swift b/SessionUtilitiesKit/General/Dependencies.swift index e9c576639..61e2f510d 100644 --- a/SessionUtilitiesKit/General/Dependencies.swift +++ b/SessionUtilitiesKit/General/Dependencies.swift @@ -16,6 +16,12 @@ open class Dependencies { set { _storage.mutate { $0 = newValue } } } + public var _jobRunner: Atomic + public var jobRunner: JobRunnerType { + get { Dependencies.getValueSettingIfNull(&_jobRunner) { JobRunner.instance } } + set { _jobRunner.mutate { $0 = newValue } } + } + public var _scheduler: Atomic public var scheduler: ValueObservationScheduler { get { Dependencies.getValueSettingIfNull(&_scheduler) { Storage.defaultPublisherScheduler } } @@ -39,12 +45,14 @@ open class Dependencies { public init( generalCache: Atomic? = nil, storage: Storage? = nil, + jobRunner: JobRunnerType? = nil, scheduler: ValueObservationScheduler? = nil, standardUserDefaults: UserDefaultsType? = nil, date: Date? = nil ) { _generalCache = Atomic(generalCache) _storage = Atomic(storage) + _jobRunner = Atomic(jobRunner) _scheduler = Atomic(scheduler) _standardUserDefaults = Atomic(standardUserDefaults) _date = Atomic(date) diff --git a/SessionUtilitiesKit/General/Set+Utilities.swift b/SessionUtilitiesKit/General/Set+Utilities.swift index 5fb2d416b..f6f45d27a 100644 --- a/SessionUtilitiesKit/General/Set+Utilities.swift +++ b/SessionUtilitiesKit/General/Set+Utilities.swift @@ -3,6 +3,12 @@ import Foundation public extension Set { + mutating func insert(contentsOf value: Set?) { + guard let value: Set = value else { return } + + value.forEach { self.insert($0) } + } + func inserting(_ value: Element?) -> Set { guard let value: Element = value else { return self } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index fafb086d2..484a36aa7 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -3,6 +3,62 @@ import Foundation import GRDB +public protocol JobRunnerType { + // MARK: - Configuration + + func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) + func canStart(queue: JobQueue) -> Bool + + // MARK: - State Management + + func isCurrentlyRunning(_ job: Job?) -> Bool + func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState, with jobDetails: T) -> Bool + func detailsFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: Data?] + + func appDidFinishLaunching(dependencies: Dependencies) + func appDidBecomeActive(dependencies: Dependencies) + func startNonBlockingQueues(dependencies: Dependencies) + func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, onComplete: (() -> ())?) + + // MARK: - Job Scheduling + + func add(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) + func upsert(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) + @discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job, dependencies: Dependencies) -> (Int64, Job)? +} + +public extension JobRunnerType { + func stopAndClearPendingJobs(exceptForVariant: Job.Variant? = nil, onComplete: (() -> ())? = nil) { + stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) + } + + func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState = .any, with jobDetails: T) -> Bool { + return hasJob(of: variant, inState: state, with: jobDetails) + } + + func details() -> [Int64: Data?] { return detailsFor(jobs: nil, state: .any, variant: nil) } + + func detailsFor(jobs: [Job]) -> [Int64: Data?] { + return detailsFor(jobs: jobs, state: .any, variant: nil) + } + + func detailsFor(jobs: [Job], state: JobRunner.JobState) -> [Int64: Data?] { + return detailsFor(jobs: jobs, state: state, variant: nil) + } + + func detailsFor(state: JobRunner.JobState) -> [Int64: Data?] { + return detailsFor(jobs: nil, state: state, variant: nil) + } + + func detailsFor(state: JobRunner.JobState, variant: Job.Variant) -> [Int64: Data?] { + return detailsFor(jobs: nil, state: state, variant: variant) + } + + func detailsFor(variant: Job.Variant) -> [Int64: Data?] { + return detailsFor(jobs: nil, state: .any, variant: variant) + } +} + public protocol JobExecutor { /// The maximum number of times the job can fail before it fails permanently /// @@ -35,7 +91,20 @@ public protocol JobExecutor { ) } -public final class JobRunner { +public final class JobRunner: JobRunnerType { + public struct JobState: OptionSet, Hashable { + public let rawValue: UInt8 + + public init(rawValue: UInt8) { + self.rawValue = rawValue + } + + public static let pending: JobState = JobState(rawValue: 1 << 0) + public static let running: JobState = JobState(rawValue: 1 << 1) + + public static let any: JobState = [ .pending, .running ] + } + public enum JobResult { case succeeded case failed @@ -45,19 +114,33 @@ public final class JobRunner { // MARK: - Variables + private let allowToExecuteJobs: Bool private let blockingQueue: Atomic private let queues: Atomic<[Job.Variant: JobQueue]> + internal var appReadyToStartQueues: Atomic = Atomic(false) internal var perSessionJobsCompleted: Atomic> = Atomic([]) internal var hasCompletedInitialBecomeActive: Atomic = Atomic(false) internal var shutdownBackgroundTask: Atomic = Atomic(nil) - internal var canStartQueues: Atomic = Atomic(false) // MARK: - Initialization - init(dependencies: Dependencies = Dependencies()) { - var jobVariants: Set = Job.Variant.allCases.asSet() - + init( + isTestingJobRunner: Bool = false, + variantsToExclude: [Job.Variant] = [], + dependencies: Dependencies = Dependencies() + ) { + var jobVariants: Set = Job.Variant.allCases + .filter { !variantsToExclude.contains($0) } + .asSet() + + self.allowToExecuteJobs = ( + isTestingJobRunner || ( + HasAppContext() && + CurrentAppContext().isMainApp && + !CurrentAppContext().isRunningTests + ) + ) self.blockingQueue = Atomic( JobQueue( type: .blocking, @@ -66,7 +149,7 @@ public final class JobRunner { onQueueDrained: { // Once all blocking jobs have been completed we want to start running // the remaining job queues - JobRunner.startNonBlockingQueues(dependencies: dependencies) + dependencies.jobRunner.startNonBlockingQueues(dependencies: dependencies) } ) ) @@ -127,15 +210,92 @@ public final class JobRunner { // MARK: - Configuration - internal func add(executor: JobExecutor.Type, for variant: Job.Variant) { - queues.wrappedValue[variant]?.addExecutor(executor, for: variant) + public func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + blockingQueue.wrappedValue?.setExecutor(executor, for: variant) // The blocking queue can run any job + queues.wrappedValue[variant]?.setExecutor(executor, for: variant) } - // MARK: - Execution + public func canStart(queue: JobQueue) -> Bool { + return ( + allowToExecuteJobs && + appReadyToStartQueues.wrappedValue + ) + } + + // MARK: - State Management + + public func isCurrentlyRunning(_ job: Job?) -> Bool { + guard let job: Job = job else { return false } + + return !detailsFor(jobs: [job], state: .running).isEmpty + } + + public func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState, with jobDetails: T) -> Bool { + guard let detailsData: Data = try? JSONEncoder().encode(jobDetails) else { return false } + + return detailsFor(state: state, variant: variant).values.contains(detailsData) + } + + public func detailsFor( + jobs: [Job]?, + state: JobRunner.JobState, + variant: Job.Variant? + ) -> [Int64: Data?] { + var result: [(Int64, Data?)] = [] + let targetKeys: [JobQueue.JobKey] = (jobs?.compactMap { JobQueue.JobKey($0) } ?? []) + let targetVariants: [Job.Variant] = (variant.map { [$0] } ?? jobs?.map { $0.variant }) + .defaulting(to: []) + + // Insert the state of any pending jobs + if state.contains(.pending) { + func detailsFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, Data?)] { + return (queue?.pendingJobsQueue.wrappedValue + .filter { variants.isEmpty || variants.contains($0.variant) } + .compactMap { job -> (Int64, Data?)? in + guard let jobKey: JobQueue.JobKey = JobQueue.JobKey(job) else { return nil } + guard !targetKeys.isEmpty else { return (jobKey.id, job.details) } + + return (targetKeys.contains(jobKey) ? (jobKey.id, job.details) : nil) + }) + .defaulting(to: []) + } + + result.append(contentsOf: detailsFor(queue: blockingQueue.wrappedValue, variants: targetVariants)) + queues.wrappedValue + .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) } + .values + .forEach { queue in result.append(contentsOf: detailsFor(queue: queue, variants: targetVariants)) } + } + + // Insert the state of any running jobs + if state.contains(.running) { + func detailsFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, Data?)] { + return (queue?.detailsForCurrentlyRunningJobs.wrappedValue + .filter { variants.isEmpty || variants.contains($0.key.variant) } + .compactMap { jobKey, details -> (Int64, Data?)? in + guard !targetKeys.isEmpty else { return (jobKey.id, details) } + + return (targetKeys.contains(jobKey) ? (jobKey.id, details) : nil) + }) + .defaulting(to: []) + } + + result.append(contentsOf: detailsFor(queue: blockingQueue.wrappedValue, variants: targetVariants)) + queues.wrappedValue + .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) } + .values + .forEach { queue in result.append(contentsOf: detailsFor(queue: queue, variants: targetVariants)) } + } + + return result + .reduce(into: [:]) { result, next in + result[next.0] = next.1 + } + } - internal func appDidFinishLaunching(dependencies: Dependencies) { + public func appDidFinishLaunching(dependencies: Dependencies) { // Flag that the JobRunner can start it's queues - canStartQueues.mutate { $0 = true } + appReadyToStartQueues.mutate { $0 = true } // Note: 'appDidBecomeActive' will run on first launch anyway so we can // leave those jobs out and can wait until then to start the JobRunner @@ -185,9 +345,9 @@ public final class JobRunner { } } - internal func appDidBecomeActive(dependencies: Dependencies) { + public func appDidBecomeActive(dependencies: Dependencies) { // Flag that the JobRunner can start it's queues - canStartQueues.mutate { $0 = true } + appReadyToStartQueues.mutate { $0 = true } // If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it // can result in the database being suspended and us being unable to interact with it at all @@ -232,7 +392,73 @@ public final class JobRunner { self.hasCompletedInitialBecomeActive.mutate { $0 = true } } - internal func add( + public func startNonBlockingQueues(dependencies: Dependencies) { + queues.wrappedValue.forEach { _, queue in + queue.start(dependencies: dependencies) + } + } + + public func stopAndClearPendingJobs( + exceptForVariant: Job.Variant?, + onComplete: (() -> ())? + ) { + // Inform the JobRunner that it can't start any queues (this is to prevent queues from + // rescheduling themselves while in the background, when the app restarts or becomes active + // the JobRunenr will update this flag) + appReadyToStartQueues.mutate { $0 = false } + + // Stop all queues except for the one containing the `exceptForVariant` + queues.wrappedValue + .values + .filter { queue -> Bool in + guard let exceptForVariant: Job.Variant = exceptForVariant else { return true } + + return !queue.jobVariants.contains(exceptForVariant) + } + .forEach { $0.stopAndClearPendingJobs() } + + // Ensure the queue is actually running (if not the trigger the callback immediately) + guard + let exceptForVariant: Job.Variant = exceptForVariant, + let queue: JobQueue = queues.wrappedValue[exceptForVariant], + queue.isRunning.wrappedValue == true + else { + onComplete?() + return + } + + let oldQueueDrained: (() -> ())? = queue.onQueueDrained + + // Create a backgroundTask to give the queue the chance to properly be drained + shutdownBackgroundTask.mutate { + $0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in + // If the background task didn't succeed then trigger the onComplete (and hope we have + // enough time to complete it's logic) + guard state != .cancelled else { + queue?.onQueueDrained = oldQueueDrained + return + } + guard state != .success else { return } + + onComplete?() + queue?.onQueueDrained = oldQueueDrained + queue?.stopAndClearPendingJobs() + } + } + + // Add a callback to be triggered once the queue is drained + queue.onQueueDrained = { [weak self, weak queue] in + oldQueueDrained?() + queue?.onQueueDrained = oldQueueDrained + onComplete?() + + self?.shutdownBackgroundTask.mutate { $0 = nil } + } + } + + // MARK: - Execution + + public func add( _ db: Database, job: Job?, canStartJob: Bool, @@ -262,7 +488,7 @@ public final class JobRunner { } } - internal func upsert( + public func upsert( _ db: Database, job: Job?, canStartJob: Bool, @@ -285,7 +511,7 @@ public final class JobRunner { } } - @discardableResult internal func insert( + @discardableResult public func insert( _ db: Database, job: Job?, before otherJob: Job, @@ -315,75 +541,6 @@ public final class JobRunner { return (jobId, updatedJob) } - internal func stopAndClearPendingJobs( - exceptForVariant: Job.Variant? = nil, - onComplete: (() -> ())? = nil - ) { - // Inform the JobRunner that it can't start any queues (this is to prevent queues from - // rescheduling themselves while in the background, when the app restarts or becomes active - // the JobRunenr will update this flag) - canStartQueues.mutate { $0 = false } - - // Stop all queues except for the one containing the `exceptForVariant` - queues.wrappedValue - .values - .filter { queue -> Bool in - guard let exceptForVariant: Job.Variant = exceptForVariant else { return true } - - return !queue.jobVariants.contains(exceptForVariant) - } - .forEach { $0.stopAndClearPendingJobs() } - - // Ensure the queue is actually running (if not the trigger the callback immediately) - guard - let exceptForVariant: Job.Variant = exceptForVariant, - let queue: JobQueue = queues.wrappedValue[exceptForVariant], - queue.isRunning.wrappedValue == true - else { - onComplete?() - return - } - - let oldQueueDrained: (() -> ())? = queue.onQueueDrained - - // Create a backgroundTask to give the queue the chance to properly be drained - shutdownBackgroundTask.mutate { - $0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in - // If the background task didn't succeed then trigger the onComplete (and hope we have - // enough time to complete it's logic) - guard state != .cancelled else { - queue?.onQueueDrained = oldQueueDrained - return - } - guard state != .success else { return } - - onComplete?() - queue?.onQueueDrained = oldQueueDrained - queue?.stopAndClearPendingJobs() - } - } - - // Add a callback to be triggered once the queue is drained - queue.onQueueDrained = { [weak self, weak queue] in - oldQueueDrained?() - queue?.onQueueDrained = oldQueueDrained - onComplete?() - - self?.shutdownBackgroundTask.mutate { $0 = nil } - } - } - - internal func isCurrentlyRunning(_ job: Job?) -> Bool { - guard let job: Job = job, let jobId: Int64 = job.id else { return false } - - return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true) - } - - internal func detailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { - return (queues.wrappedValue[variant]?.detailsForAllCurrentlyRunningJobs()) - .defaulting(to: [:]) - } - internal func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else { callback(.notFound) @@ -393,13 +550,6 @@ public final class JobRunner { queue.afterCurrentlyRunningJob(jobId, callback: callback) } - internal func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { - guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } - guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false } - - return targetQueue.hasPendingOrRunningJob(with: detailsData) - } - internal func removePendingJob(_ job: Job?) { guard let job: Job = job, let jobId: Int64 = job.id else { return } @@ -421,100 +571,9 @@ public final class JobRunner { } } -// MARK: - JobRunner Singleton - -public extension JobRunner { - private static let instance: JobRunner = JobRunner() - - // MARK: - Static Access - - static func add(executor: JobExecutor.Type, for variant: Job.Variant) { - instance.add(executor: executor, for: variant) - } - - static func appDidFinishLaunching(dependencies: Dependencies = Dependencies()) { - instance.appDidFinishLaunching(dependencies: dependencies) - } - - static func appDidBecomeActive(dependencies: Dependencies = Dependencies()) { - instance.appDidBecomeActive(dependencies: dependencies) - } - - /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start - /// the JobRunner - /// - /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` - /// is in the future then the job won't be started - static func add( - _ db: Database, - job: Job?, - canStartJob: Bool = true, - dependencies: Dependencies = Dependencies() - ) { instance.add(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } - - /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start - /// the JobRunner - /// - /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` - /// is in the future then the job won't be started - static func upsert( - _ db: Database, - job: Job?, - canStartJob: Bool = true, - dependencies: Dependencies = Dependencies() - ) { instance.upsert(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } - - @discardableResult static func insert( - _ db: Database, - job: Job?, - before otherJob: Job, - dependencies: Dependencies = Dependencies() - ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob, dependencies: dependencies) } - - /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run - /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their - /// failure - they _should_ be picked up again the next time the app is launched) - static func stopAndClearPendingJobs( - exceptForVariant: Job.Variant? = nil, - onComplete: (() -> ())? = nil - ) { instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } - - static func isCurrentlyRunning(_ job: Job?) -> Bool { - return instance.isCurrentlyRunning(job) - } - - static func detailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { - return instance.detailsForCurrentlyRunningJobs(of: variant) - } - - static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { - instance.afterCurrentlyRunningJob(job, callback: callback) - } - - static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { - return instance.hasPendingOrRunningJob(with: variant, details: details) - } - - static func removePendingJob(_ job: Job?) { - instance.removePendingJob(job) - } - - // MARK: - Internal Static Access - - fileprivate static func canStart(queue: JobQueue) -> Bool { - return instance.canStartQueues.wrappedValue - } - - fileprivate static func startNonBlockingQueues(dependencies: Dependencies) { - instance.queues.wrappedValue.forEach { _, queue in - queue.start(dependencies: dependencies) - } - } -} - // MARK: - JobQueue -private final class JobQueue { +public final class JobQueue { fileprivate enum QueueType: Hashable { case blocking case general(number: Int) @@ -577,6 +636,18 @@ private final class JobQueue { } } + fileprivate struct JobKey: Equatable, Hashable { + fileprivate let id: Int64 + fileprivate let variant: Job.Variant + + fileprivate init?(_ job: Job?) { + guard let id: Int64 = job?.id, let variant: Job.Variant = job?.variant else { return nil } + + self.id = id + self.variant = variant + } + } + private static let deferralLoopThreshold: Int = 3 private let type: QueueType @@ -606,17 +677,17 @@ private final class JobQueue { private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) private var nextTrigger: Atomic = Atomic(nil) fileprivate var isRunning: Atomic = Atomic(false) - private var queue: Atomic<[Job]> = Atomic([]) - private var jobsCurrentlyRunning: Atomic> = Atomic([]) + fileprivate var pendingJobsQueue: Atomic<[Job]> = Atomic([]) + fileprivate var jobsCurrentlyRunning: Atomic> = Atomic([]) + fileprivate var detailsForCurrentlyRunningJobs: Atomic<[JobKey: Data?]> = Atomic([:]) private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) - private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) - fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } + fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty } // MARK: - Initialization - init( + fileprivate init( type: QueueType, executionType: ExecutionType = .serial, qos: DispatchQoS, @@ -633,7 +704,7 @@ private final class JobQueue { // MARK: - Configuration - fileprivate func addExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + fileprivate func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { executorMap.mutate { $0[variant] = executor } } @@ -651,7 +722,7 @@ private final class JobQueue { return } - queue.mutate { $0.append(job) } + pendingJobsQueue.mutate { $0.append(job) } } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start @@ -665,21 +736,21 @@ private final class JobQueue { return } - // Lock the queue while checking the index and inserting to ensure we don't run into + // Lock the pendingJobsQueue while checking the index and inserting to ensure we don't run into // any multi-threading shenanigans // - // Note: currently running jobs are removed from the queue so we don't need to check + // Note: currently running jobs are removed from the pendingJobsQueue so we don't need to check // the 'jobsCurrentlyRunning' set var didUpdateExistingJob: Bool = false - queue.mutate { queue in + pendingJobsQueue.mutate { queue in if let jobIndex: Array.Index = queue.firstIndex(where: { $0.id == jobId }) { queue[jobIndex] = job didUpdateExistingJob = true } } - // If we didn't update an existing job then we need to add it to the queue + // If we didn't update an existing job then we need to add it to the pendingJobsQueue guard !didUpdateExistingJob else { return } add(job, canStartJob: canStartJob, dependencies: dependencies) @@ -692,10 +763,10 @@ private final class JobQueue { } // Insert the job before the current job (re-adding the current job to - // the start of the queue if it's not in there) - this will mean the new + // the start of the pendingJobsQueue if it's not in there) - this will mean the new // job will run and then the otherJob will run (or run again) once it's // done - queue.mutate { + pendingJobsQueue.mutate { guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else { $0.insert(contentsOf: [job, otherJob], at: 0) return @@ -710,7 +781,7 @@ private final class JobQueue { canStart: Bool, dependencies: Dependencies ) { - queue.mutate { $0.append(contentsOf: jobs) } + pendingJobsQueue.mutate { $0.append(contentsOf: jobs) } // Start the job runner if needed if canStart && !isRunning.wrappedValue { @@ -723,8 +794,8 @@ private final class JobQueue { canStart: Bool, dependencies: Dependencies ) { - queue.mutate { queue in - // Avoid re-adding jobs to the queue that are already in it (this can + pendingJobsQueue.mutate { queue in + // Avoid re-adding jobs to the pendingJobsQueue that are already in it (this can // happen if the user sends the app to the background before the 'onActive' // jobs and then brings it back to the foreground) let jobsNotAlreadyInQueue: [Job] = jobs @@ -739,16 +810,8 @@ private final class JobQueue { } } - fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool { - return jobsCurrentlyRunning.wrappedValue.contains(jobId) - } - - fileprivate func detailsForAllCurrentlyRunningJobs() -> [Int64: Data?] { - return detailsForCurrentlyRunningJobs.wrappedValue - } - fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { - guard isCurrentlyRunning(jobId) else { + guard jobsCurrentlyRunning.wrappedValue.contains(jobId) else { callback(.notFound) return } @@ -758,14 +821,8 @@ private final class JobQueue { } } - fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { - let pendingJobs: [Job] = queue.wrappedValue - - return pendingJobs.contains { job in job.details == detailsData } - } - fileprivate func removePendingJob(_ jobId: Int64) { - queue.mutate { queue in + pendingJobsQueue.mutate { queue in queue = queue.filter { $0.id != jobId } } } @@ -773,17 +830,12 @@ private final class JobQueue { // MARK: - Job Running fileprivate func start( - force: Bool = false, + forceWhenAlreadyRunning: Bool = false, dependencies: Dependencies ) { - // We only want the JobRunner to run in the main app - guard - HasAppContext() && - CurrentAppContext().isMainApp && - !CurrentAppContext().isRunningTests && - JobRunner.canStart(queue: self) - else { return } - guard force || !isRunning.wrappedValue else { return } + // Only start if the JobRunner is allowed to start the queue + guard dependencies.jobRunner.canStart(queue: self) else { return } + guard forceWhenAlreadyRunning || !isRunning.wrappedValue else { return } // The JobRunner runs synchronously we need to ensure this doesn't start // on the main thread (if it is on the main thread then swap to a different thread) @@ -804,7 +856,7 @@ private final class JobQueue { // Get any pending jobs let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue - let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() + let jobsAlreadyInQueue: Set = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = dependencies.storage.read { db in try Job .filterPendingJobs( @@ -821,7 +873,7 @@ private final class JobQueue { // Determine the number of jobs to run var jobCount: Int = 0 - queue.mutate { queue in + pendingJobsQueue.mutate { queue in queue.append(contentsOf: jobsToRun) jobCount = queue.count } @@ -836,7 +888,7 @@ private final class JobQueue { return } - // Run the first job in the queue + // Run the first job in the pendingJobsQueue if !wasAlreadyRunning { SNLog("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))") } @@ -845,7 +897,7 @@ private final class JobQueue { fileprivate func stopAndClearPendingJobs() { isRunning.mutate { $0 = false } - queue.mutate { $0 = [] } + pendingJobsQueue.mutate { $0 = [] } deferLoopTracker.mutate { $0 = [:] } } @@ -860,7 +912,7 @@ private final class JobQueue { } return } - guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { + guard let (nextJob, numJobsRemaining): (Job, Int) = pendingJobsQueue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { isRunning.mutate { $0 = false } @@ -913,19 +965,21 @@ private final class JobQueue { } // If the 'nextRunTimestamp' for the job is in the future then don't run it yet - guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else { + guard nextJob.nextRunTimestamp <= dependencies.date.timeIntervalSince1970 else { handleJobDeferred(nextJob, dependencies: dependencies) return } // Check if the next job has any dependencies - let dependencyInfo: (expectedCount: Int, jobs: [Job]) = dependencies.storage.read { db in - let numExpectedDependencies: Int = try JobDependencies + let dependencyInfo: (expectedCount: Int, jobs: Set) = dependencies.storage.read { db in + let expectedDependencies: Set = try JobDependencies .filter(JobDependencies.Columns.jobId == nextJob.id) - .fetchCount(db) - let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db) + .fetchSet(db) + let jobDependencies: Set = try Job + .filter(ids: expectedDependencies.compactMap { $0.dependantId }) + .fetchSet(db) - return (numExpectedDependencies, jobDependencies) + return (expectedDependencies.count, jobDependencies) } .defaulting(to: (0, [])) @@ -942,39 +996,15 @@ private final class JobQueue { guard dependencyInfo.jobs.isEmpty else { SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") - let jobDependencyIds: [Int64] = dependencyInfo.jobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = jobDependencyIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependencies which aren't in the queue we should just append them - guard !jobIdsNotInQueue.isEmpty else { - queue.mutate { queue in - queue.append( - contentsOf: dependencyInfo.jobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - queue.append(nextJob) - } - handleJobDeferred(nextJob, dependencies: dependencies) - return - } - - // Otherwise re-add the current job after it's dependencies (if this isn't a concurrent - // queue - don't want to immediately try to start the job again only for it to end up back - // in here) - if executionType != .concurrent { - queue.mutate { queue in - guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else { - queue.append(nextJob) - return - } - - queue.insert(nextJob, at: lastDependencyIndex + 1) - } + /// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue + /// + /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies + /// are successfully completed + pendingJobsQueue.mutate { queue in + queue = queue + .filter { !dependencyInfo.jobs.contains($0) } + .inserting(contentsOf: Array(dependencyInfo.jobs), at: 0) } - handleJobDeferred(nextJob, dependencies: dependencies) return } @@ -993,10 +1023,10 @@ private final class JobQueue { jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) numJobsRunning = jobsCurrentlyRunning.count } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } + detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(JobKey(nextJob), nextJob.details) } SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") - /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to + /// As it turns out Combine doesn't play too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to /// the queue which means an odd situation can occasionally occur where the `finished` event can actually run before the `output` /// event - this can result in unexpected behaviours (for more information see https://github.com/groue/GRDB.swift/issues/1334) /// @@ -1049,7 +1079,7 @@ private final class JobQueue { // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // the 'onQueueDrained' callback and stop - guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStart(queue: self) else { + guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, dependencies.jobRunner.canStart(queue: self) else { if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { self.onQueueDrained?() } @@ -1073,7 +1103,7 @@ private final class JobQueue { // queue (for concurrent queues we want to force them to load in pending jobs and add // them to the queue regardless of whether the queue is already running) internalQueue.async { [weak self] in - self?.start(force: (self?.executionType == .concurrent), dependencies: dependencies) + self?.start(forceWhenAlreadyRunning: (self?.executionType == .concurrent), dependencies: dependencies) } return } @@ -1097,11 +1127,17 @@ private final class JobQueue { shouldStop: Bool, dependencies: Dependencies ) { + /// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is + /// removed so we need to retrieve these records before that happens) + let dependantJobs: [Job] = dependencies.storage + .read { db in try job.dependantJobs.fetchAll(db) } + .defaulting(to: []) + switch job.behaviour { case .runOnce, .runOnceNextLaunch: dependencies.storage.write { db in - // First remove any JobDependencies requiring this job to be completed (if - // we don't then the dependant jobs will automatically be deleted) + /// Since this job has been completed we can update the dependencies so other job that were dependant + /// on this one can be run _ = try JobDependencies .filter(JobDependencies.Columns.dependantId == job.id) .deleteAll(db) @@ -1111,8 +1147,8 @@ private final class JobQueue { case .recurring where shouldStop == true: dependencies.storage.write { db in - // First remove any JobDependencies requiring this job to be completed (if - // we don't then the dependant jobs will automatically be deleted) + /// Since this job has been completed we can update the dependencies so other job that were dependant + /// on this one can be run _ = try JobDependencies .filter(JobDependencies.Columns.dependantId == job.id) .deleteAll(db) @@ -1120,9 +1156,8 @@ private final class JobQueue { _ = try job.delete(db) } - // For `recurring` jobs which have already run, they should automatically run again - // but we want at least 1 second to pass before doing so - the job itself should - // really update it's own 'nextRunTimestamp' (this is just a safety net) + /// For `recurring` jobs which have already run, they should automatically run again but we want at least 1 second + /// to pass before doing so - the job itself should really update it's own `nextRunTimestamp` (this is just a safety net) case .recurring where job.nextRunTimestamp <= Date().timeIntervalSince1970: guard let jobId: Int64 = job.id else { break } @@ -1136,9 +1171,8 @@ private final class JobQueue { ) } - // For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to - // clear their `failureCount` and `nextRunTimestamp` to prevent them from endlessly running - // over and over again + /// For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to clear their + /// `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over and over again case .recurringOnLaunch, .recurringOnActive: guard let jobId: Int64 = job.id, @@ -1159,26 +1193,17 @@ private final class JobQueue { default: break } - // For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other - // dependencies they will be removed again when they try to execute) - if executionType == .concurrent { - let dependantJobs: [Job] = dependencies.storage - .read { db in try job.dependantJobs.fetchAll(db) } - .defaulting(to: []) - let dependantJobIds: [Int64] = dependantJobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = dependantJobIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependant jobs which aren't in the queue we should just append them - if !jobIdsNotInQueue.isEmpty { - queue.mutate { queue in - queue.append( - contentsOf: dependantJobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - } + /// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the + /// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other + /// unrelated jobs) + /// + /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be + /// removed from the queue, replaced by their dependencies + if !dependantJobs.isEmpty { + pendingJobsQueue.mutate { queue in + queue = queue + .filter { !dependantJobs.contains($0) } + .inserting(contentsOf: dependantJobs, at: 0) } } @@ -1197,7 +1222,7 @@ private final class JobQueue { permanentFailure: Bool, dependencies: Dependencies ) { - guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else { + guard dependencies.storage.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else { SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled") performCleanUp(for: job, result: .failed) @@ -1229,7 +1254,7 @@ private final class JobQueue { // Only add it back to the queue if it wasn't a deferral loop if !wasPossibleDeferralLoop { - queue.mutate { $0.insert(job, at: 0) } + pendingJobsQueue.mutate { $0.insert(job, at: 0) } } internalQueue.async { [weak self] in @@ -1242,20 +1267,30 @@ private final class JobQueue { let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0) let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) - Storage.shared.write { db in + dependencies.storage.write { db in + /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try + /// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely) + let dependantJobIds: [Int64] = try job.dependantJobs + .select(.id) + .asRequest(of: Int64.self) + .fetchAll(db) + + if !dependantJobIds.isEmpty { + pendingJobsQueue.mutate { queue in + queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } + } + } + + /// Delete/update the failed jobs and any dependencies + let updatedFailureCount: UInt = (job.failureCount + 1) guard !permanentFailure && ( maxFailureCount < 0 || - job.failureCount + 1 < maxFailureCount + updatedFailureCount <= maxFailureCount ) else { SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")") - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - // If the job permanently failed or we have performed all of our retry attempts // then delete the job and all of it's dependant jobs (it'll probably never succeed) _ = try job.dependantJobs @@ -1263,48 +1298,28 @@ private final class JobQueue { _ = try job.delete(db) - // Remove the dependant jobs from the queue (so we don't try to run a deleted job) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } - performCleanUp(for: job, result: .failed) return } - SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; scheduling retry (failure count is \(job.failureCount + 1))") + SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; scheduling retry (failure count is \(updatedFailureCount))") _ = try job .with( - failureCount: (job.failureCount + 1), + failureCount: updatedFailureCount, nextRunTimestamp: nextRunTimestamp ) .saved(db) // Update the failureCount and nextRunTimestamp on dependant jobs as well (update the - // 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated it'll + // 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated they'll // come after the dependency) try job.dependantJobs .updateAll( db, - Job.Columns.failureCount.set(to: (job.failureCount + 1)), + Job.Columns.failureCount.set(to: updatedFailureCount), Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000))) ) - - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - - // Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying - // to run dependecies indefinitely) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } } performCleanUp(for: job, result: .failed) @@ -1315,7 +1330,7 @@ private final class JobQueue { /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant /// on other jobs, and it should automatically manage those dependencies) - private func handleJobDeferred( + public func handleJobDeferred( _ job: Job, dependencies: Dependencies ) { @@ -1375,7 +1390,7 @@ private final class JobQueue { // The job is removed from the queue before it runs so all we need to to is remove it // from the 'currentlyRunning' set jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: JobKey(job)) } guard shouldTriggerCallbacks else { return } @@ -1391,3 +1406,74 @@ private final class JobQueue { } } } + +// MARK: - JobRunner Singleton +// FIXME: Remove this once the jobRunner is dependency injected everywhere correctly +public extension JobRunner { + internal static let instance: JobRunner = JobRunner() + + // MARK: - Static Access + + static func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + instance.setExecutor(executor, for: variant) + } + + static func appDidFinishLaunching(dependencies: Dependencies = Dependencies()) { + instance.appDidFinishLaunching(dependencies: dependencies) + } + + static func appDidBecomeActive(dependencies: Dependencies = Dependencies()) { + instance.appDidBecomeActive(dependencies: dependencies) + } + + /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start + /// the JobRunner + /// + /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` + /// is in the future then the job won't be started + static func add( + _ db: Database, + job: Job?, + canStartJob: Bool = true, + dependencies: Dependencies = Dependencies() + ) { instance.add(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } + + /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start + /// the JobRunner + /// + /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` + /// is in the future then the job won't be started + static func upsert( + _ db: Database, + job: Job?, + canStartJob: Bool = true, + dependencies: Dependencies = Dependencies() + ) { instance.upsert(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } + + @discardableResult static func insert( + _ db: Database, + job: Job?, + before otherJob: Job, + dependencies: Dependencies = Dependencies() + ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob, dependencies: dependencies) } + + /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run + /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their + /// failure - they _should_ be picked up again the next time the app is launched) + static func stopAndClearPendingJobs( + exceptForVariant: Job.Variant? = nil, + onComplete: (() -> ())? = nil + ) { instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } + + static func isCurrentlyRunning(_ job: Job?) -> Bool { + return instance.isCurrentlyRunning(job) + } + + static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { + instance.afterCurrentlyRunningJob(job, callback: callback) + } + + static func removePendingJob(_ job: Job?) { + instance.removePendingJob(job) + } +} diff --git a/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift b/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift index b6dfcc2d7..8c4db0211 100644 --- a/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift +++ b/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift @@ -9,7 +9,7 @@ import Nimble @testable import SessionUtilitiesKit class JobRunnerSpec: QuickSpec { - public enum TestSuccessfulJob: JobExecutor { + enum TestSuccessfulJob: JobExecutor { static let maxFailureCount: Int = 0 static let requiresThreadId: Bool = false static let requiresInteractionId: Bool = false @@ -22,14 +22,93 @@ class JobRunnerSpec: QuickSpec { deferred: @escaping (Job, Dependencies) -> (), dependencies: Dependencies ) { - success(job, true, dependencies) + guard dependencies.date.timeIntervalSinceNow > 0 else { return success(job, true, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + success(job, true, dependencies) + } + } + } + + enum TestFailedJob: JobExecutor { + static let maxFailureCount: Int = 1 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + guard dependencies.date.timeIntervalSinceNow > 0 else { return failure(job, nil, false, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + failure(job, nil, false, dependencies) + } + } + } + + enum TestPermanentFailureJob: JobExecutor { + static let maxFailureCount: Int = 1 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + guard dependencies.date.timeIntervalSinceNow > 0 else { return failure(job, nil, true, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + failure(job, nil, true, dependencies) + } + } + } + + enum TestDeferredJob: JobExecutor { + static let maxFailureCount: Int = 0 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + guard dependencies.date.timeIntervalSinceNow > 0 else { return deferred(job, dependencies) } + + queue.asyncAfter(deadline: .now() + .milliseconds(Int(dependencies.date.timeIntervalSinceNow * 1000))) { + deferred(job, dependencies) + } } } + struct TestDetails: Codable { + public let intValue: Int64 + public let stringValue: String + } + + struct InvalidDetails: Codable { + func encode(to encoder: Encoder) throws { throw HTTP.Error.parsingFailed } + } + // MARK: - Spec override func spec() { - var jobRunner: JobRunner! + var jobRunner: JobRunnerType! + var job1: Job! + var job2: Job! + var jobDetails: TestDetails! var mockStorage: Storage! var dependencies: Dependencies! @@ -48,20 +127,1149 @@ class JobRunnerSpec: QuickSpec { date: Date(timeIntervalSince1970: 1234567890) ) - jobRunner = JobRunner() + // Migrations add jobs which we don't want so delete them + mockStorage.write { db in try Job.deleteAll(db) } + + job1 = Job( + id: 100, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + jobDetails = TestDetails( + intValue: 100, + stringValue: "200" + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentUpload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + + jobRunner = JobRunner(isTestingJobRunner: true, dependencies: dependencies) + + // Need to assign this to ensure it's used by nested dependencies + dependencies.jobRunner = jobRunner } afterEach { + jobRunner.stopAndClearPendingJobs() jobRunner = nil mockStorage = nil dependencies = nil } + // MARK: -- when configuring context("when configuring") { it("adds an executor correctly") { - // TODO: Test this - jobRunner.add(executor: TestSuccessfulJob.self, for: .messageSend) + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // First check that it fails to start + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + + // Then check that it succeeded to start + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + } + + // MARK: -- when managing state + + context("when managing state") { + + // MARK: ---- by checking if a job is currently running + + context("by checking if a job is currently running") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + } + + it("returns false when not given a job") { + expect(jobRunner.isCurrentlyRunning(nil)).to(beFalse()) + } + + it("returns false when given a job that has not been persisted") { + job1 = Job(variant: .messageSend) + + expect(jobRunner.isCurrentlyRunning(job1)).to(beFalse()) + } + + it("returns false when given a job that is not running") { + expect(jobRunner.isCurrentlyRunning(job1)).to(beFalse()) + } + + it("returns true when given a non blocking job that is running") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + + it("returns true when given a blocking job that is running") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job2)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + } + + // MARK: ---- by getting the details for jobs + + context("by getting the details for jobs") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentUpload) + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentDownload) + } + + it("returns an empty dictionary when there are no jobs") { + expect(jobRunner.details()).to(equal([:])) + } + + it("returns an empty dictionary when there are no jobs matching the filters") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.detailsFor(state: .running, variant: .messageSend)) + .toEventually( + equal([:]), + timeout: .milliseconds(10) + ) + } + + it("can filter to specific jobs") { + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + /// The `canStartJob` value needs to be `true` for the job to be added to the queue but as + /// long as `appDidFinishLaunching` hasn't been called it won't actually start running and + /// as a result we can test the "pending" state + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.details()) + .toEventuallyNot( + beEmpty(), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(jobs: [job1])).to(equal([:])) + expect(jobRunner.detailsFor(jobs: [job2])).to(equal([101: job2.details])) + } + + it("can filter to running jobs") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.detailsFor(state: .running)) + .toEventually( + equal([100: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + expect(Array(jobRunner.details().keys).sorted()).to(equal([100, 101])) + } + + it("can filter to pending jobs") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentDownload, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.detailsFor(state: .pending)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + expect(Array(jobRunner.details().keys).sorted()).to(equal([100, 101])) + } + + it("can filter to specific variants") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Wait for there to be data and the validate the filtering works + expect(jobRunner.detailsFor(variant: .attachmentUpload)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + expect(Array(jobRunner.details().keys).sorted()).to(equal([100, 101])) + } + + it("includes non blocking jobs") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.detailsFor(state: .running, variant: .attachmentUpload)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + } + + it("includes blocking jobs") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentUpload, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.detailsFor(state: .running, variant: .attachmentUpload)) + .toEventually( + equal([101: try! JSONEncoder().encode(jobDetails)]), + timeout: .milliseconds(10) + ) + } + } + + // MARK: ---- by checking for an existing job + + context("by checking for an existing job") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentUpload) + } + + it("returns false for a queue that doesn't exist") { + jobRunner = JobRunner( + isTestingJobRunner: true, + variantsToExclude: [.attachmentUpload], + dependencies: dependencies + ) + + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beFalse()) + } + + it("returns false when the provided details fail to decode") { + expect(jobRunner.hasJob(of: .attachmentUpload, with: InvalidDetails())) + .to(beFalse()) + } + + it("returns false when there is not a pending or running job") { + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beFalse()) + } + + it("returns true when there is a pending job") { + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + /// The `canStartJob` value needs to be `true` for the job to be added to the queue but as + /// long as `appDidFinishLaunching` hasn't been called it won't actually start running and + /// as a result we can test the "pending" state + canStartJob: true, + dependencies: dependencies + ) + } + + expect(Array(jobRunner.detailsFor(state: .pending, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + + it("returns true when there is a running job") { + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + + it("returns true when there is a blocking job") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .attachmentUpload, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: try! JSONEncoder().encode(jobDetails) + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + + it("returns true when there is a non blocking job") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.hasJob(of: .attachmentUpload, with: jobDetails)) + .to(beTrue()) + } + } + + // MARK: ---- by being notified of app launch + + context("by being notified of app launch") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + } + + it("does not start a job before getting the app launch call") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + + it("does nothing if there are no app launch jobs") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + + it("starts the job queues after completing blocking app launch jobs") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job2)).to(beFalse()) + + // Make sure it starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // Blocking job running but blocked job not + expect(jobRunner.isCurrentlyRunning(job2)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job1)).to(beFalse()) + + // Blocked job eventually starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(20) + ) + } + + it("starts the job queues alongside non blocking app launch jobs") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Make sure it starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job2)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + } + + // MARK: ---- by being notified of app becoming active + + context("by being notified of app becoming active") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + } + + it("does not start a job before getting the app active call") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + } + + it("does not start the job queues if there are no app active jobs and blocking jobs are running") { + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Start the blocking job + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // Make sure the other queues don't start + dependencies.date = Date().addingTimeInterval(30 / 1000) // Complete job after delay + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(20) + ) + } + + it("does not start the job queues if there are app active jobs and blocking jobs are running") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .messageSend, + behaviour: .recurringOnActive, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnceNextLaunch, + shouldBlock: true, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Start the blocking queue + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidFinishLaunching(dependencies: dependencies) + + // Make sure the other queues don't start + dependencies.date = Date().addingTimeInterval(30 / 1000) // Complete job after delay + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(20) + ) + } + + it("starts the job queues if there are no app active jobs") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + } + + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + } + + it("starts the job queues if there are app active jobs") { + job1 = Job( + id: 100, + failureCount: 0, + variant: .messageSend, + behaviour: .recurringOnActive, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + job2 = Job( + id: 101, + failureCount: 0, + variant: .messageSend, + behaviour: .runOnce, + shouldBlock: false, + shouldSkipLaunchBecomeActive: false, + nextRunTimestamp: 0, + threadId: nil, + interactionId: nil, + details: nil + ) + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + + jobRunner.upsert( + db, + job: job1, + canStartJob: true, + dependencies: dependencies + ) + jobRunner.upsert( + db, + job: job2, + canStartJob: true, + dependencies: dependencies + ) + } + + // Not currently running + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beFalse(), + timeout: .milliseconds(10) + ) + + // Make sure the queues are started + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + jobRunner.appDidBecomeActive(dependencies: dependencies) + + expect(jobRunner.isCurrentlyRunning(job1)) + .toEventually( + beTrue(), + timeout: .milliseconds(10) + ) + expect(jobRunner.isCurrentlyRunning(job2)).to(beTrue()) + } + } + } + + // MARK: -- when running jobs + + context("when running jobs") { + beforeEach { + jobRunner.setExecutor(TestSuccessfulJob.self, for: .messageSend) + jobRunner.setExecutor(TestSuccessfulJob.self, for: .attachmentUpload) + jobRunner.appDidFinishLaunching(dependencies: dependencies) } + + // MARK: ---- with dependencies + + context("with dependencies") { + it("starts dependencies first") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + } + + it("removes the initial job from the queue") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the initial job is removed from the queue + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + } + + it("starts the initial job when the dependencies succeed") { + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure the initial job starts + dependencies.date = Date().addingTimeInterval(20 / 1000) // Complete job after delay + expect(Array(jobRunner.detailsFor(state: .running, variant: .messageSend).keys)) + .toEventually( + equal([100]), + timeout: .milliseconds(20) + ) + } + + it("does not start the initial job if the dependencies fail") { + jobRunner.setExecutor(TestFailedJob.self, for: .attachmentUpload) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Fail job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure there are no running jobs + expect(Array(jobRunner.detailsFor(state: .running).keys)) + .toEventually( + beEmpty(), + timeout: .milliseconds(20) + ) + } + + it("does not delete the initial job if the dependencies fail") { + jobRunner.setExecutor(TestFailedJob.self, for: .attachmentUpload) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Fail job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure there are no running jobs + dependencies.date = Date().addingTimeInterval(20 / 1000) // Delay subsequent runs + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + beEmpty(), + timeout: .milliseconds(20) + ) + + // Stop the queues so it doesn't run out of retry attempts + jobRunner.stopAndClearPendingJobs(exceptForVariant: nil, onComplete: nil) + + // Make sure the jobs still exist + expect(mockStorage.read { db in try Job.fetchCount(db) }).to(equal(2)) + } + + it("deletes the initial job if the dependencies permanently fail") { + jobRunner.setExecutor(TestPermanentFailureJob.self, for: .attachmentUpload) + + dependencies.date = Date().addingTimeInterval(20 / 1000) // Fail job after delay + + mockStorage.write { db in + try job1.insert(db) + try job2.insert(db) + try JobDependencies(jobId: job1.id!, dependantId: job2.id!).insert(db) + + jobRunner.upsert(db, job: job1, canStartJob: true, dependencies: dependencies) + } + + // Make sure the dependency is run + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + equal([101]), + timeout: .milliseconds(10) + ) + expect(jobRunner.detailsFor(state: .running, variant: .messageSend).keys).toNot(contain(100)) + + // Make sure there are no running jobs + expect(Array(jobRunner.detailsFor(state: .running, variant: .attachmentUpload).keys)) + .toEventually( + beEmpty(), + timeout: .milliseconds(20) + ) + + // Make sure the jobs were deleted + expect(mockStorage.read { db in try Job.fetchCount(db) }).to(equal(0)) + } + } + } } } diff --git a/_SharedTestUtilities/CommonMockedExtensions.swift b/_SharedTestUtilities/CommonMockedExtensions.swift index 052931525..b70e06327 100644 --- a/_SharedTestUtilities/CommonMockedExtensions.swift +++ b/_SharedTestUtilities/CommonMockedExtensions.swift @@ -1,8 +1,10 @@ // Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Foundation +import GRDB import Sodium import Curve25519Kit +import SessionUtilitiesKit extension Box.KeyPair: Mocked { static var mockValue: Box.KeyPair = Box.KeyPair( @@ -19,3 +21,19 @@ extension ECKeyPair: Mocked { ) } } + +extension Database: Mocked { + static var mockValue: Database { + var result: Database! + try! DatabaseQueue().read { result = $0 } + return result! + } +} + +extension Job: Mocked { + static var mockValue: Job = Job(variant: .messageSend) +} + +extension Job.Variant: Mocked { + static var mockValue: Job.Variant = .messageSend +} diff --git a/_SharedTestUtilities/MockJobRunner.swift b/_SharedTestUtilities/MockJobRunner.swift new file mode 100644 index 000000000..494643ed2 --- /dev/null +++ b/_SharedTestUtilities/MockJobRunner.swift @@ -0,0 +1,56 @@ +// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB +import SessionUtilitiesKit + +@testable import SessionMessagingKit + +class MockJobRunner: Mock, JobRunnerType { + // MARK: - Configuration + + func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + accept(args: [executor, variant]) + } + + func canStart(queue: JobQueue) -> Bool { + return accept(args: [queue]) as! Bool + } + + // MARK: - State Management + + func isCurrentlyRunning(_ job: Job?) -> Bool { + return accept(args: [job]) as! Bool + } + + func hasJob(of variant: Job.Variant, inState state: JobRunner.JobState, with jobDetails: T) -> Bool { + return accept(args: [variant, state, jobDetails]) as! Bool + } + + func detailsFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: Data?] { + return accept(args: [jobs, state, variant]) as! [Int64: Data?] + } + + func appDidFinishLaunching(dependencies: Dependencies) {} + func appDidBecomeActive(dependencies: Dependencies) {} + func startNonBlockingQueues(dependencies: Dependencies) {} + + func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, onComplete: (() -> ())?) { + accept(args: [exceptForVariant, onComplete]) + onComplete?() + } + + // MARK: - Job Scheduling + + func add(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) { + accept(args: [db, job, canStartJob]) + } + + func upsert(_ db: Database, job: Job?, canStartJob: Bool, dependencies: Dependencies) { + accept(args: [db, job, canStartJob]) + } + + func insert(_ db: Database, job: Job?, before otherJob: Job, dependencies: Dependencies) -> (Int64, Job)? { + return accept(args: [db, job, otherJob]) as? (Int64, Job) + } +}