diff --git a/LibSession-Util b/LibSession-Util index 1c4667ba0..765196710 160000 --- a/LibSession-Util +++ b/LibSession-Util @@ -1 +1 @@ -Subproject commit 1c4667ba0c56c924d4e957743d1324be2c899040 +Subproject commit 7651967104845db16e6a58f70635c01f7f4c2033 diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index ee75ab000..480012349 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -460,6 +460,7 @@ FCB11D8C1A129A76002F93FB /* CoreMedia.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = FCB11D8B1A129A76002F93FB /* CoreMedia.framework */; }; FD0606BD2BC8BF6F00C3816E /* BuildPathsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD0606BC2BC8BF6F00C3816E /* BuildPathsJob.swift */; }; FD0606BF2BC8C10200C3816E /* _005_AddJobUniqueHash.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD0606BE2BC8C10200C3816E /* _005_AddJobUniqueHash.swift */; }; + FD0606C12BCC9A1500C3816E /* GetSwarmJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD0606C02BCC9A1500C3816E /* GetSwarmJob.swift */; }; FD078E4827E02561000769AF /* CommonMockedExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD078E4727E02561000769AF /* CommonMockedExtensions.swift */; }; FD078E4927E02576000769AF /* CommonMockedExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD078E4727E02561000769AF /* CommonMockedExtensions.swift */; }; FD078E4D27E17156000769AF /* MockOGMCache.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD078E4C27E17156000769AF /* MockOGMCache.swift */; }; @@ -1688,6 +1689,7 @@ FCB11D8B1A129A76002F93FB /* CoreMedia.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = CoreMedia.framework; path = System/Library/Frameworks/CoreMedia.framework; sourceTree = SDKROOT; }; FD0606BC2BC8BF6F00C3816E /* BuildPathsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = BuildPathsJob.swift; sourceTree = ""; }; FD0606BE2BC8C10200C3816E /* _005_AddJobUniqueHash.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _005_AddJobUniqueHash.swift; sourceTree = ""; }; + FD0606C02BCC9A1500C3816E /* GetSwarmJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GetSwarmJob.swift; sourceTree = ""; }; FD078E4727E02561000769AF /* CommonMockedExtensions.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CommonMockedExtensions.swift; sourceTree = ""; }; FD078E4C27E17156000769AF /* MockOGMCache.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockOGMCache.swift; sourceTree = ""; }; FD0969F82A69FFE700C5C365 /* Mocked.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Mocked.swift; sourceTree = ""; }; @@ -4564,6 +4566,7 @@ isa = PBXGroup; children = ( FDF8488D29405C04007DCAE5 /* GetSnodePoolJob.swift */, + FD0606C02BCC9A1500C3816E /* GetSwarmJob.swift */, FD0606BC2BC8BF6F00C3816E /* BuildPathsJob.swift */, ); path = Jobs; @@ -6020,6 +6023,7 @@ FD17D7B327F51E5B00122BE0 /* SSKSetting.swift in Sources */, FDF848E429405D6E007DCAE5 /* SnodeAPIEndpoint.swift in Sources */, FDF848BE29405C5A007DCAE5 /* GetServiceNodesRequest.swift in Sources */, + FD0606C12BCC9A1500C3816E /* GetSwarmJob.swift in Sources */, FDF848EB29405E4F007DCAE5 /* OnionRequestAPI.swift in Sources */, FD17D7AE27F41C4300122BE0 /* SnodeReceivedMessageInfo.swift in Sources */, ); @@ -8100,7 +8104,7 @@ GCC_WARN_UNUSED_VARIABLE = YES; HEADER_SEARCH_PATHS = ""; IPHONEOS_DEPLOYMENT_TARGET = 13.0; - MARKETING_VERSION = 2.5.1; + MARKETING_VERSION = 2.6.0; ONLY_ACTIVE_ARCH = YES; OTHER_CFLAGS = ( "-fobjc-arc-exceptions", @@ -8173,7 +8177,7 @@ GCC_WARN_UNUSED_VARIABLE = YES; HEADER_SEARCH_PATHS = ""; IPHONEOS_DEPLOYMENT_TARGET = 13.0; - MARKETING_VERSION = 2.5.1; + MARKETING_VERSION = 2.6.0; ONLY_ACTIVE_ARCH = NO; OTHER_CFLAGS = ( "-DNS_BLOCK_ASSERTIONS=1", diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index 64e793007..8cd170b70 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -80,6 +80,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { initialUnreadInteractionInfo: Interaction.TimestampInfo?, threadIsBlocked: Bool, currentUserIsClosedGroupMember: Bool?, + currentUserIsClosedGroupAdmin: Bool?, openGroupPermissions: OpenGroup.Permissions?, blinded15Key: String?, blinded25Key: String? @@ -107,13 +108,23 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { .fetchOne(db) .defaulting(to: false) ) - let currentUserIsClosedGroupMember: Bool? = (![.legacyGroup, .group].contains(threadVariant) ? nil : + let currentUserIsClosedGroupAdmin: Bool? = (![.legacyGroup, .group].contains(threadVariant) ? nil : GroupMember .filter(groupMember[.groupId] == threadId) .filter(groupMember[.profileId] == currentUserPublicKey) - .filter(groupMember[.role] == GroupMember.Role.standard) + .filter(groupMember[.role] == GroupMember.Role.admin) .isNotEmpty(db) ) + let currentUserIsClosedGroupMember: Bool? = { + guard [.legacyGroup, .group].contains(threadVariant) else { return nil } + guard currentUserIsClosedGroupAdmin != true else { return true } + + return GroupMember + .filter(groupMember[.groupId] == threadId) + .filter(groupMember[.profileId] == currentUserPublicKey) + .filter(groupMember[.role] == GroupMember.Role.standard) + .isNotEmpty(db) + }() let openGroupPermissions: OpenGroup.Permissions? = (threadVariant != .community ? nil : try OpenGroup .filter(id: threadId) @@ -139,6 +150,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { initialUnreadInteractionInfo, threadIsBlocked, currentUserIsClosedGroupMember, + currentUserIsClosedGroupAdmin, openGroupPermissions, blinded15Key, blinded25Key @@ -157,6 +169,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { threadIsNoteToSelf: (initialData?.currentUserPublicKey == threadId), threadIsBlocked: initialData?.threadIsBlocked, currentUserIsClosedGroupMember: initialData?.currentUserIsClosedGroupMember, + currentUserIsClosedGroupAdmin: initialData?.currentUserIsClosedGroupAdmin, openGroupPermissions: initialData?.openGroupPermissions ).populatingCurrentUserBlindedKeys( currentUserBlinded15PublicKeyForThisThread: initialData?.blinded15Key, diff --git a/Session/Conversations/Settings/ThreadSettingsViewModel.swift b/Session/Conversations/Settings/ThreadSettingsViewModel.swift index 28d017781..fd24f3e64 100644 --- a/Session/Conversations/Settings/ThreadSettingsViewModel.swift +++ b/Session/Conversations/Settings/ThreadSettingsViewModel.swift @@ -775,6 +775,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigationItemSource, Navi ) dependencies.storage.writeAsync { [dependencies] db in + let currentUserSessionId: String = getUserHexEncodedPublicKey(db, using: dependencies) try selectedUsers.forEach { userId in let thread: SessionThread = try SessionThread .fetchOrCreate(db, id: userId, variant: .contact, shouldBeVisible: nil) @@ -788,7 +789,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigationItemSource, Navi let interaction: Interaction = try Interaction( threadId: thread.id, - authorId: userId, + authorId: currentUserSessionId, variant: .standardOutgoing, timestampMs: SnodeAPI.currentOffsetTimestampMs(), expiresInSeconds: try? DisappearingMessagesConfiguration diff --git a/Session/Meta/AppEnvironment.swift b/Session/Meta/AppEnvironment.swift index d1c36de19..03c56fd07 100644 --- a/Session/Meta/AppEnvironment.swift +++ b/Session/Meta/AppEnvironment.swift @@ -9,6 +9,18 @@ import SignalCoreKit import SessionMessagingKit public class AppEnvironment { + + enum ExtensionType { + case share + case notification + + var name: String { + switch self { + case .share: return "ShareExtension" + case .notification: return "NotificationExtension" + } + } + } private static var _shared: AppEnvironment = AppEnvironment() @@ -67,45 +79,61 @@ public class AppEnvironment { // to a local directory (so they can be exported via XCode) - the below code reads any // logs from the shared directly and attempts to add them to the main app logs to make // debugging user issues in extensions easier - DispatchQueue.global(qos: .background).async { - let extensionDirs: [String] = [ - "\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/NotificationExtension", - "\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/ShareExtension" + DispatchQueue.global(qos: .background).async { [fileLogger] in + let extensionInfo: [(dir: String, type: ExtensionType)] = [ + ("\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/NotificationExtension", .notification), + ("\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/ShareExtension", .share) ] - let extensionLogs: [String] = extensionDirs.flatMap { dir -> [String] in + let extensionLogs: [(path: String, type: ExtensionType)] = extensionInfo.flatMap { dir, type -> [(path: String, type: ExtensionType)] in guard let files: [String] = try? FileManager.default.contentsOfDirectory(atPath: dir) else { return [] } - return files.map { "\(dir)/\($0)" } + return files.map { ("\(dir)/\($0)", type) } } + // Log to ensure the log file exists + OWSLogger.info("") + DDLog.flushLog() - extensionLogs.forEach { logFilePath in - guard let logs: String = try? String(contentsOfFile: logFilePath) else { - try? FileManager.default.removeItem(atPath: logFilePath) - return - } + do { + guard + let currentLogFileInfo: DDLogFileInfo = fileLogger.currentLogFileInfo, + let fileHandle: FileHandle = FileHandle(forWritingAtPath: currentLogFileInfo.filePath) + else { throw StorageError.objectNotFound } - logs.split(separator: "\n").forEach { line in - let lineEmoji: Character? = line - .split(separator: "[") - .first - .map { String($0) }? - .trimmingCharacters(in: .whitespaces) - .last - - switch lineEmoji { - case "💙": OWSLogger.verbose("Extension: \(String(line))") - case "💚": OWSLogger.debug("Extension: \(String(line))") - case "💛": OWSLogger.info("Extension: \(String(line))") - case "🧡": OWSLogger.warn("Extension: \(String(line))") - case "❤️": OWSLogger.error("Extension: \(String(line))") - default: OWSLogger.info("Extension: \(String(line))") - } - } + // Ensure we close the file handle + defer { fileHandle.closeFile() } - // Logs have been added - remove them now - DDLog.flushLog() - try? FileManager.default.removeItem(atPath: logFilePath) + // Move to the end of the file to insert the logs + if #available(iOS 13.4, *) { try fileHandle.seekToEnd() } + else { fileHandle.seekToEndOfFile() } + + try extensionLogs + .grouped(by: \.type) + .forEach { type, value in + guard + let typeNameStartData: Data = "🧩 \(type.name) -- Start\n".data(using: .utf8), + let typeNameEndData: Data = "🧩 \(type.name) -- End\n".data(using: .utf8) + else { throw StorageError.invalidData } + + // Write the type start separator + if #available(iOS 13.4, *) { try fileHandle.write(contentsOf: typeNameStartData) } + else { fileHandle.write(typeNameStartData) } + + // Write the logs + try value.forEach { path, _ in + let logData: Data = try Data(contentsOf: URL(fileURLWithPath: path)) + if #available(iOS 13.4, *) { try fileHandle.write(contentsOf: logData) } + else { fileHandle.write(logData) } + + // Extension logs have been writen to the app logs, remove them now + try? FileManager.default.removeItem(atPath: path) + } + + // Write the type end separator + if #available(iOS 13.4, *) { try fileHandle.write(contentsOf: typeNameEndData) } + else { fileHandle.write(typeNameEndData) } + } } + catch { SNLog("Unable to write extension logs to current log file") } } } } diff --git a/Session/Onboarding/Onboarding.swift b/Session/Onboarding/Onboarding.swift index 59f17b380..7e8a48e8d 100644 --- a/Session/Onboarding/Onboarding.swift +++ b/Session/Onboarding/Onboarding.swift @@ -176,7 +176,11 @@ enum Onboarding { // Only continue if this isn't a new account guard self != .register else { return } - // Fetch the + // Enable single-execution jobs (this allows fetching the swarm for retrieving the + // profile name below without triggering other jobs) + JobRunner.enableNewSingleExecutionJobsOnly() + + // Fetch any existing profile name Onboarding.profileNamePublisher .subscribe(on: DispatchQueue.global(qos: .userInitiated)) .sinkUntilComplete() diff --git a/Session/Shared/SessionTableViewTitleView.swift b/Session/Shared/SessionTableViewTitleView.swift index 189c85da4..50bb128fd 100644 --- a/Session/Shared/SessionTableViewTitleView.swift +++ b/Session/Shared/SessionTableViewTitleView.swift @@ -27,6 +27,7 @@ final class SessionTableViewTitleView: UIView { private lazy var titleLabel: UILabel = { let result: UILabel = UILabel() + result.setContentCompressionResistancePriority(.required, for: .vertical) result.font = .boldSystemFont(ofSize: Values.mediumFontSize) result.themeTextColor = .textPrimary result.lineBreakMode = .byTruncatingTail @@ -63,8 +64,12 @@ final class SessionTableViewTitleView: UIView { addSubview(stackView) - stackView.pin([ UIView.HorizontalEdge.trailing, UIView.VerticalEdge.top, UIView.VerticalEdge.bottom ], to: self) - stackView.pin(.leading, to: .leading, of: self, withInset: 0) + // Note: We are intentionally letting the stackView go out of bounds because the title will clip + // in some cases when the subtitle wraps over 2 lines (this provides the extra space we need) + stackView.pin(.top, to: .top, of: self, withInset: -2) + stackView.pin(.leading, to: .leading, of: self) + stackView.pin(.trailing, to: .trailing, of: self) + stackView.pin(.bottom, to: .bottom, of: self, withInset: 2) } deinit { diff --git a/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift b/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift index 23891d688..212704799 100644 --- a/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift +++ b/SessionMessagingKit/Jobs/Types/GetExpirationJob.swift @@ -43,8 +43,8 @@ public enum GetExpirationJob: JobExecutor { } let userPublicKey: String = getUserHexEncodedPublicKey(using: dependencies) - SnodeAPI - .getSwarm(for: userPublicKey, using: dependencies) + GetSwarmJob + .run(for: userPublicKey, using: dependencies) .tryFlatMapWithRandomSnode(using: dependencies) { snode -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in SnodeAPI.getExpiries( from: snode, diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index f464894d5..007ef7671 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -177,7 +177,7 @@ public class Poller { let configHashes: [String] = LibSession.configHashes(for: swarmPublicKey) // Fetch the messages - return SnodeAPI.getSwarm(for: swarmPublicKey, using: dependencies) + return GetSwarmJob.run(for: swarmPublicKey, using: dependencies) .tryFlatMapWithRandomSnode(drainBehaviour: drainBehaviour, using: dependencies) { snode -> AnyPublisher<[SnodeAPI.Namespace: (info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?)], Error> in SnodeAPI.poll( namespaces: namespaces, diff --git a/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift b/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift index 71a7ed98a..adb6ae892 100644 --- a/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift +++ b/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift @@ -363,6 +363,7 @@ public extension SessionThreadViewModel { threadIsBlocked: Bool? = nil, contactProfile: Profile? = nil, currentUserIsClosedGroupMember: Bool? = nil, + currentUserIsClosedGroupAdmin: Bool? = nil, openGroupPermissions: OpenGroup.Permissions? = nil, unreadCount: UInt = 0, hasUnreadMessagesOfAnyKind: Bool = false, @@ -402,7 +403,7 @@ public extension SessionThreadViewModel { self.closedGroupName = nil self.closedGroupUserCount = nil self.currentUserIsClosedGroupMember = currentUserIsClosedGroupMember - self.currentUserIsClosedGroupAdmin = nil + self.currentUserIsClosedGroupAdmin = currentUserIsClosedGroupAdmin self.openGroupName = nil self.openGroupServer = nil self.openGroupRoomToken = nil @@ -997,6 +998,16 @@ public extension SessionThreadViewModel { \(SQL("\(groupMember[.profileId]) = \(userPublicKey)")) ) ) AS \(ViewModel.Columns.currentUserIsClosedGroupMember), + + EXISTS ( + SELECT 1 + FROM \(GroupMember.self) + WHERE ( + \(groupMember[.groupId]) = \(closedGroup[.threadId]) AND + \(SQL("\(groupMember[.role]) = \(GroupMember.Role.admin)")) AND + \(SQL("\(groupMember[.profileId]) = \(userPublicKey)")) + ) + ) AS \(ViewModel.Columns.currentUserIsClosedGroupAdmin), \(openGroup[.name]) AS \(ViewModel.Columns.openGroupName), \(openGroup[.server]) AS \(ViewModel.Columns.openGroupServer), diff --git a/SessionNotificationServiceExtension/NSENotificationPresenter.swift b/SessionNotificationServiceExtension/NSENotificationPresenter.swift index 9dd5d330f..425b1180a 100644 --- a/SessionNotificationServiceExtension/NSENotificationPresenter.swift +++ b/SessionNotificationServiceExtension/NSENotificationPresenter.swift @@ -237,16 +237,16 @@ public class NSENotificationPresenter: NSObject, NotificationsProtocol { private func addNotifcationRequest(identifier: String, notificationContent: UNNotificationContent, trigger: UNNotificationTrigger?) { let request = UNNotificationRequest(identifier: identifier, content: notificationContent, trigger: trigger) - SNLog("Add remote notification request: \(notificationContent.body)") + SNLog("Add remote notification request: \(identifier)") let semaphore = DispatchSemaphore(value: 0) UNUserNotificationCenter.current().add(request) { error in if let error = error { - SNLog("Failed to add notification request due to error:\(error)") + SNLog("Failed to add notification request '\(identifier)' due to error: \(error)") } semaphore.signal() } semaphore.wait() - SNLog("Finish adding remote notification request") + SNLog("Finish adding remote notification request '\(identifier)") } } diff --git a/SessionNotificationServiceExtension/NotificationError.swift b/SessionNotificationServiceExtension/NotificationError.swift index 5d2884509..0ef6e2e8c 100644 --- a/SessionNotificationServiceExtension/NotificationError.swift +++ b/SessionNotificationServiceExtension/NotificationError.swift @@ -1,18 +1,20 @@ // Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. +// +// stringlint:disable import Foundation import SessionMessagingKit -enum NotificationError: LocalizedError { +enum NotificationError: Error, CustomStringConvertible { case processing(PushNotificationAPI.ProcessResult) case messageProcessing case messageHandling(MessageReceiverError) - public var errorDescription: String? { + public var description: String { switch self { - case .processing(let result): return "Failed to process notification (\(result))" - case .messageProcessing: return "Failed to process message" - case .messageHandling(let error): return "Failed to handle message (\(error))" + case .processing(let result): return "Failed to process notification (\(result)) (NotificationError.processing)." + case .messageProcessing: return "Failed to process message (NotificationError.messageProcessing)." + case .messageHandling(let error): return "Failed to handle message (\(error)) (NotificationError.messageHandling)." } } } diff --git a/SessionNotificationServiceExtension/NotificationServiceExtension.swift b/SessionNotificationServiceExtension/NotificationServiceExtension.swift index a0109053e..407bf7d1f 100644 --- a/SessionNotificationServiceExtension/NotificationServiceExtension.swift +++ b/SessionNotificationServiceExtension/NotificationServiceExtension.swift @@ -302,6 +302,8 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension // Note that this does much more than set a flag; it will also run all deferred blocks. Singleton.appReadiness.setAppReady() + + JobRunner.enableNewSingleExecutionJobsOnly() } // MARK: Handle completion diff --git a/SessionNotificationServiceExtension/NotificationServiceExtensionContext.swift b/SessionNotificationServiceExtension/NotificationServiceExtensionContext.swift index 6d17de458..52d52cba3 100644 --- a/SessionNotificationServiceExtension/NotificationServiceExtensionContext.swift +++ b/SessionNotificationServiceExtension/NotificationServiceExtensionContext.swift @@ -1,6 +1,6 @@ -// // Copyright (c) 2020 Open Whisper Systems. All rights reserved. // +// stringlint:disable import Foundation import SignalUtilitiesKit diff --git a/SessionShareExtension/ShareNavController.swift b/SessionShareExtension/ShareNavController.swift index 063d0467f..83e53f6e8 100644 --- a/SessionShareExtension/ShareNavController.swift +++ b/SessionShareExtension/ShareNavController.swift @@ -153,6 +153,7 @@ final class ShareNavController: UINavigationController, ShareViewDelegate { // We don't need to use SyncPushTokensJob in the SAE. // We don't need to use DeviceSleepManager in the SAE. + JobRunner.enableNewSingleExecutionJobsOnly() AppVersion.sharedInstance().saeLaunchDidComplete() showLockScreenOrMainContent() diff --git a/SessionShareExtension/ThreadPickerVC.swift b/SessionShareExtension/ThreadPickerVC.swift index 2e4cb1d7c..963d485c9 100644 --- a/SessionShareExtension/ThreadPickerVC.swift +++ b/SessionShareExtension/ThreadPickerVC.swift @@ -244,8 +244,8 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView } .subscribe(on: DispatchQueue.global(qos: .userInitiated)) .flatMap { _ in - SnodeAPI - .getSwarm( + GetSwarmJob + .run( for: { switch threadVariant { case .contact, .legacyGroup, .group: return threadId diff --git a/SessionSnodeKit/Configuration.swift b/SessionSnodeKit/Configuration.swift index 93d3f4ab3..51d42bef3 100644 --- a/SessionSnodeKit/Configuration.swift +++ b/SessionSnodeKit/Configuration.swift @@ -33,5 +33,6 @@ public enum SNSnodeKit: MigratableTarget { // Just to make the external API nice // Configure the job executors JobRunner.setExecutor(GetSnodePoolJob.self, for: .getSnodePool) JobRunner.setExecutor(BuildPathsJob.self, for: .buildPaths) + JobRunner.setExecutor(GetSwarmJob.self, for: .getSwarm) } } diff --git a/SessionSnodeKit/Jobs/BuildPathsJob.swift b/SessionSnodeKit/Jobs/BuildPathsJob.swift index 956f1c372..3c6e43f94 100644 --- a/SessionSnodeKit/Jobs/BuildPathsJob.swift +++ b/SessionSnodeKit/Jobs/BuildPathsJob.swift @@ -200,15 +200,11 @@ public enum BuildPathsJob: JobExecutor { .defaulting(to: true) let targetJob: Job? = dependencies.storage.write(using: dependencies) { db in - // Fetch an existing job if there is one (if there are multiple it doesn't matter which we select) - if let existingJob: Job = try? Job.filter(Job.Columns.variant == Job.Variant.buildPaths).fetchOne(db) { - return existingJob - } - - return dependencies.jobRunner.add( + return dependencies.jobRunner.upsert( db, job: Job( variant: .buildPaths, + behaviour: .runOnceTransient, shouldBeUnique: true, details: Details(reusablePaths: paths, ed25519SecretKey: ed25519SecretKey) ), diff --git a/SessionSnodeKit/Jobs/GetSwarmJob.swift b/SessionSnodeKit/Jobs/GetSwarmJob.swift new file mode 100644 index 000000000..e92caf841 --- /dev/null +++ b/SessionSnodeKit/Jobs/GetSwarmJob.swift @@ -0,0 +1,133 @@ +// Copyright © 2024 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import Combine +import GRDB +import SessionUtilitiesKit + +public enum GetSwarmJob: JobExecutor { + public static let maxFailureCount: Int = 0 + public static let requiresThreadId: Bool = false + public static let requiresInteractionId: Bool = false + + /// The minimum number of snodes in a swarm. + private static let minSwarmSnodeCount: Int = 3 + + public static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + using dependencies: Dependencies + ) { + guard + let detailsData: Data = job.details, + let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) + else { + SNLog("[GetSwarmJob] Failing due to missing details.") + return failure(job, JobRunnerError.missingRequiredDetails, true, dependencies) + } + + SNLog("[GetSwarmJob] Retrieving swarm for \(details.swarmPublicKey).") + return SnodeAPI + .getSwarm(for: details.swarmPublicKey, using: dependencies) + .subscribe(on: queue, using: dependencies) + .receive(on: queue, using: dependencies) + .sinkUntilComplete( + receiveCompletion: { result in + switch result { + case .finished: break + case .failure(let error): + SNLog("[GetSwarmJob] Failed due to error: \(error)") + failure(job, error, false, dependencies) + } + }, + receiveValue: { (snodes: Set) in + // Store the swarm and update the 'loadedSwarms' state so we don't fetch it again from the + // database the next time it's used + SnodeAPI.setSwarm(to: snodes, for: details.swarmPublicKey) + SnodeAPI.loadedSwarms.mutate { $0.insert(details.swarmPublicKey) } + + SNLog("[GetSwarmJob] Complete.") + success(job, false, dependencies) + } + ) + } + + public static func run( + for swarmPublicKey: String, + using dependencies: Dependencies + ) -> AnyPublisher, Error> { + // Try to load the swarm from the database if we haven't already + if !SnodeAPI.loadedSwarms.wrappedValue.contains(swarmPublicKey) { + let updatedCacheForKey: Set = dependencies.storage + .read { db in try Snode.fetchSet(db, publicKey: swarmPublicKey) } + .defaulting(to: []) + + SnodeAPI.swarmCache.mutate { $0[swarmPublicKey] = updatedCacheForKey } + SnodeAPI.loadedSwarms.mutate { $0.insert(swarmPublicKey) } + } + + // If we already have a cached version of the swarm which is large enough then use that + if let cachedSwarm = SnodeAPI.swarmCache.wrappedValue[swarmPublicKey], cachedSwarm.count >= minSwarmSnodeCount { + return Just(cachedSwarm) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } + + // Otherwise trigger the job + return Deferred { + Future, Error> { resolver in + let targetJob: Job? = dependencies.storage.write(using: dependencies) { db in + return dependencies.jobRunner.upsert( + db, + job: Job( + variant: .getSwarm, + behaviour: .runOnceTransient, + shouldBeUnique: true, + details: Details(swarmPublicKey: swarmPublicKey) + ), + canStartJob: true, + using: dependencies + ) + } + + guard let job: Job = targetJob else { + SNLog("[GetSwarmJob] Failed to retrieve existing job or schedule a new one.") + return resolver(Result.failure(JobRunnerError.generic)) + } + + dependencies.jobRunner.afterJob(job) { result in + switch result { + case .succeeded: + guard + let cachedSwarm = SnodeAPI.swarmCache.wrappedValue[swarmPublicKey], + cachedSwarm.count >= minSwarmSnodeCount + else { + SNLog("[GetSwarmJob] Failed to find swarm in cache after job.") + return resolver(Result.failure(JobRunnerError.generic)) + } + + resolver(Result.success(cachedSwarm)) + + case .failed(let error, _): resolver(Result.failure(error ?? JobRunnerError.generic)) + case .deferred, .notFound: resolver(Result.failure(JobRunnerError.generic)) + } + } + } + }.eraseToAnyPublisher() + } +} + +// MARK: - GetSwarmJob.Details + +extension GetSwarmJob { + public struct Details: Codable { + private enum CodingKeys: String, CodingKey { + case swarmPublicKey + } + + fileprivate let swarmPublicKey: String + } +} diff --git a/SessionSnodeKit/Networking/PreparedRequest+OnionRequest.swift b/SessionSnodeKit/Networking/PreparedRequest+OnionRequest.swift index 1b2a6fa01..be8226360 100644 --- a/SessionSnodeKit/Networking/PreparedRequest+OnionRequest.swift +++ b/SessionSnodeKit/Networking/PreparedRequest+OnionRequest.swift @@ -55,8 +55,8 @@ public extension Network.PreparedRequest { case let randomSnode as Network.RandomSnodeTarget: guard let payload: Data = request.httpBody else { throw NetworkError.invalidPreparedRequest } - return SnodeAPI.getSwarm(for: randomSnode.swarmPublicKey, using: dependencies) - .tryFlatMapWithRandomSnode(retry: SnodeAPI.maxRetryCount, using: dependencies) { snode in + return GetSwarmJob.run(for: randomSnode.swarmPublicKey, using: dependencies) + .tryFlatMapWithRandomSnode(retry: randomSnode.retryCount, using: dependencies) { snode in dependencies.network .send( .onionRequest( @@ -72,7 +72,7 @@ public extension Network.PreparedRequest { case let randomSnode as Network.RandomSnodeLatestNetworkTimeTarget: guard request.httpBody != nil else { throw NetworkError.invalidPreparedRequest } - return SnodeAPI.getSwarm(for: randomSnode.swarmPublicKey, using: dependencies) + return GetSwarmJob.run(for: randomSnode.swarmPublicKey, using: dependencies) .tryFlatMapWithRandomSnode(retry: SnodeAPI.maxRetryCount, using: dependencies) { snode in SnodeAPI .getNetworkTime(from: snode, using: dependencies) diff --git a/SessionSnodeKit/Networking/Request+SnodeAPI.swift b/SessionSnodeKit/Networking/Request+SnodeAPI.swift index 7c66d063b..05575650c 100644 --- a/SessionSnodeKit/Networking/Request+SnodeAPI.swift +++ b/SessionSnodeKit/Networking/Request+SnodeAPI.swift @@ -22,6 +22,7 @@ internal extension Network { internal extension Network { struct RandomSnodeTarget: RequestTarget, Equatable { let swarmPublicKey: String + let retryCount: Int var url: URL? { URL(string: "snode:\(swarmPublicKey)") } var urlPathAndParamsString: String { return "" } @@ -53,7 +54,8 @@ public extension Request { snode: Snode, headers: [HTTPHeader: String] = [:], body: T? = nil, - swarmPublicKey: String? + swarmPublicKey: String?, + retryCount: Int ) { self = Request( method: method, @@ -76,13 +78,15 @@ public extension Request { endpoint: Endpoint, swarmPublicKey: String, headers: [HTTPHeader: String] = [:], - body: T? = nil + body: T? = nil, + retryCount: Int ) { self = Request( method: method, endpoint: endpoint, target: Network.RandomSnodeTarget( - swarmPublicKey: swarmPublicKey + swarmPublicKey: swarmPublicKey, + retryCount: retryCount ), headers: headers, body: body @@ -99,7 +103,8 @@ public extension Request { swarmPublicKey: String, headers: [HTTPHeader: String] = [:], requiresLatestNetworkTime: Bool, - body: T? = nil + body: T? = nil, + retryCount: Int ) where T: UpdatableTimestamp { self = Request( method: method, @@ -112,7 +117,8 @@ public extension Request { endpoint: endpoint, swarmPublicKey: swarmPublicKey, headers: headers, - body: body?.with(timestampMs: timestampMs) + body: body?.with(timestampMs: timestampMs), + retryCount: retryCount ).generateUrlRequest(using: dependencies) } ), diff --git a/SessionSnodeKit/Networking/SnodeAPI.swift b/SessionSnodeKit/Networking/SnodeAPI.swift index 072cf91c2..8a58ff521 100644 --- a/SessionSnodeKit/Networking/SnodeAPI.swift +++ b/SessionSnodeKit/Networking/SnodeAPI.swift @@ -23,7 +23,7 @@ public final class SnodeAPI { internal static let sodium: Atomic = Atomic(Sodium()) private static var hasLoadedSnodePool: Atomic = Atomic(false) - private static var loadedSwarms: Atomic> = Atomic([]) + internal static var loadedSwarms: Atomic> = Atomic([]) private static var getSnodePoolPublisher: Atomic, Error>?> = Atomic(nil) /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. @@ -47,7 +47,6 @@ public final class SnodeAPI { // MARK: - Settings internal static let maxRetryCount: Int = 8 - private static let minSwarmSnodeCount: Int = 3 private static let seedNodePool: Set = { guard !Features.useTestnet else { return [ @@ -146,22 +145,9 @@ public final class SnodeAPI { // MARK: - Swarm Interaction - private static func loadSwarmIfNeeded(for publicKey: String) { - guard !loadedSwarms.wrappedValue.contains(publicKey) else { return } - - let updatedCacheForKey: Set = Storage.shared - .read { db in try Snode.fetchSet(db, publicKey: publicKey) } - .defaulting(to: []) - - swarmCache.mutate { $0[publicKey] = updatedCacheForKey } - loadedSwarms.mutate { $0.insert(publicKey) } - } - - internal static func setSwarm(to newValue: Set, for publicKey: String, persist: Bool = true) { + internal static func setSwarm(to newValue: Set, for publicKey: String) { swarmCache.mutate { $0[publicKey] = newValue } - guard persist else { return } - Storage.shared.write { db in try? newValue.save(db, key: publicKey) } @@ -257,41 +243,31 @@ public final class SnodeAPI { } } - public static func getSwarm( + internal static func getSwarm( for swarmPublicKey: String, - using dependencies: Dependencies = Dependencies() + using dependencies: Dependencies ) -> AnyPublisher, Error> { - loadSwarmIfNeeded(for: swarmPublicKey) - - if let cachedSwarm = swarmCache.wrappedValue[swarmPublicKey], cachedSwarm.count >= minSwarmSnodeCount { - return Just(cachedSwarm) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - } - - SNLog("Getting swarm for: \((swarmPublicKey == getUserHexEncodedPublicKey()) ? "self" : swarmPublicKey).") - + // Note: We do an explicit `getRandomSnode` call here because we want to send the request + // to _any_ random snode rather than a random snode for the given `swarmPublicKey` return getRandomSnode() - .tryFlatMap { snode in + .tryFlatMap { snode -> AnyPublisher, Error> in try SnodeAPI .prepareRequest( request: Request( endpoint: .getSwarm, snode: snode, swarmPublicKey: swarmPublicKey, - body: GetSwarmRequest(pubkey: swarmPublicKey) + body: GetSwarmRequest(pubkey: swarmPublicKey), + retryCount: 4 ), responseType: GetSwarmResponse.self, using: dependencies ) .send(using: dependencies) - .retry(4) .map { _, response in response.snodes } - .handleEvents( - receiveOutput: { snodes in setSwarm(to: snodes, for: swarmPublicKey) } - ) .eraseToAnyPublisher() } + .eraseToAnyPublisher() } // MARK: - Batching & Polling @@ -1415,7 +1391,8 @@ private extension Request { init( endpoint: SnodeAPI.Endpoint, swarmPublicKey: String, - body: B + body: B, + retryCount: Int = SnodeAPI.maxRetryCount ) where T == SnodeRequest, Endpoint == SnodeAPI.Endpoint { self = Request( method: .post, @@ -1424,7 +1401,8 @@ private extension Request { body: SnodeRequest( endpoint: endpoint, body: body - ) + ), + retryCount: retryCount ) } @@ -1432,7 +1410,8 @@ private extension Request { endpoint: SnodeAPI.Endpoint, snode: Snode, swarmPublicKey: String? = nil, - body: B + body: B, + retryCount: Int = SnodeAPI.maxRetryCount ) where T == SnodeRequest, Endpoint == SnodeAPI.Endpoint { self = Request( method: .post, @@ -1442,7 +1421,8 @@ private extension Request { endpoint: endpoint, body: body ), - swarmPublicKey: swarmPublicKey + swarmPublicKey: swarmPublicKey, + retryCount: retryCount ) } @@ -1450,7 +1430,8 @@ private extension Request { endpoint: SnodeAPI.Endpoint, swarmPublicKey: String, requiresLatestNetworkTime: Bool, - body: B + body: B, + retryCount: Int = SnodeAPI.maxRetryCount ) where T == SnodeRequest, Endpoint == SnodeAPI.Endpoint, B: Encodable & UpdatableTimestamp { self = Request( method: .post, @@ -1460,7 +1441,8 @@ private extension Request { body: SnodeRequest( endpoint: endpoint, body: body - ) + ), + retryCount: retryCount ) } } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 7dcdb51a6..208af4b88 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -133,6 +133,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, /// This job runs whenever we don't have enough onion request paths, it also runs distinctly so there should only /// ever be one at a time case buildPaths + + /// This job runs whenever we don't have the swarm for a public key, it also runs distinctly so there should only + /// ever be one at a time + case getSwarm } public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable { @@ -153,6 +157,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, /// This job will run once each whenever the app becomes active (launch and return from background) and /// may run again during the same session if `nextRunTimestamp` gets set case recurringOnActive + + /// This job will run once and, while it does get persisted to the database, upon subsequent launch jobs with + /// this behaivour will not be run and will be cleared from the database + case runOnceTransient } /// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 265b65bd4..aba80078a 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -20,12 +20,13 @@ public protocol JobRunnerType { func appDidFinishLaunching(using dependencies: Dependencies) func appDidBecomeActive(using dependencies: Dependencies) func startNonBlockingQueues(using dependencies: Dependencies) + func enableNewSingleExecutionJobsOnly(using dependencies: Dependencies) func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, using dependencies: Dependencies, onComplete: (() -> ())?) // MARK: - Job Scheduling @discardableResult func add(_ db: Database, job: Job?, dependantJob: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job? - func upsert(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) + @discardableResult func upsert(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job? @discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? func enqueueDependenciesIfNeeded(_ jobs: [Job], using dependencies: Dependencies) func afterJob(_ job: Job?, state: JobRunner.JobState, callback: @escaping (JobRunner.JobResult) -> ()) @@ -204,6 +205,7 @@ public final class JobRunner: JobRunnerType { internal var appReadyToStartQueues: Atomic = Atomic(false) internal var appHasBecomeActive: Atomic = Atomic(false) + internal var forceAllowSingleExecutionJobs: Atomic = Atomic(false) internal var perSessionJobsCompleted: Atomic> = Atomic([]) internal var hasCompletedInitialBecomeActive: Atomic = Atomic(false) internal var shutdownBackgroundTask: Atomic = Atomic(nil) @@ -228,7 +230,6 @@ public final class JobRunner: JobRunnerType { self.allowToExecuteJobs = ( isTestingJobRunner || ( Singleton.hasAppContext && - Singleton.appContext.isMainApp && !SNUtilitiesKit.isRunningTests ) ) @@ -256,7 +257,8 @@ public final class JobRunner: JobRunnerType { jobVariants.remove(.sendReadReceipts), jobVariants.remove(.groupLeaving), jobVariants.remove(.configurationSync), - jobVariants.remove(.buildPaths) + jobVariants.remove(.buildPaths), + jobVariants.remove(.getSwarm) ].compactMap { $0 } ), @@ -322,6 +324,7 @@ public final class JobRunner: JobRunnerType { // Now that we've finished setting up the JobRunner, update the queue closures self.blockingQueue.mutate { $0?.canStart = { [weak self] queue -> Bool in (self?.canStart(queue: queue) == true) } + $0?.canStartPendingJobs = { [weak self] queue -> Bool in (self?.canStartPendingJobs(queue: queue) == true) } $0?.onQueueDrained = { [weak self] in // Once all blocking jobs have been completed we want to start running // the remaining job queues @@ -337,6 +340,9 @@ public final class JobRunner: JobRunnerType { self.queues.mutate { $0.values.forEach { queue in queue.canStart = { [weak self] targetQueue -> Bool in (self?.canStart(queue: targetQueue) == true) } + queue.canStartPendingJobs = { [weak self] targetQueue -> Bool in + (self?.canStartPendingJobs(queue: targetQueue) == true) + } } } } @@ -349,6 +355,19 @@ public final class JobRunner: JobRunnerType { } public func canStart(queue: JobQueue?) -> Bool { + return ( + allowToExecuteJobs && ( + forceAllowSingleExecutionJobs.wrappedValue || ( + appReadyToStartQueues.wrappedValue && ( + queue?.type == .blocking || + canStartNonBlockingQueue + ) + ) + ) + ) + } + + public func canStartPendingJobs(queue: JobQueue?) -> Bool { return ( allowToExecuteJobs && appReadyToStartQueues.wrappedValue && ( @@ -448,8 +467,20 @@ public final class JobRunner: JobRunnerType { } public func appDidFinishLaunching(using dependencies: Dependencies) { + // Clear any 'runOnceTransient' entries in the database (they should only ever be run during + // the app session that they were scheduled in) + // + // Note: If we are already in "single-execution mode" then don't do this as there could be running + // jobs (this case occurs during Onboarding when trying to retrieve the existing profile name) + if !forceAllowSingleExecutionJobs.wrappedValue { + dependencies.storage.writeAsync { db in + try Job.filter(Job.Columns.behaviour == Job.Behaviour.runOnceTransient).deleteAll(db) + } + } + // Flag that the JobRunner can start it's queues appReadyToStartQueues.mutate { $0 = true } + forceAllowSingleExecutionJobs.mutate { $0 = false } // Note: 'appDidBecomeActive' will run on first launch anyway so we can // leave those jobs out and can wait until then to start the JobRunner @@ -574,6 +605,27 @@ public final class JobRunner: JobRunnerType { } } + public func enableNewSingleExecutionJobsOnly(using dependencies: Dependencies) { + // If we have already fully started the JobRunner then don't bother doing this (this shouldn't + // currently be possible but might be in the future and swapping this flag while the JobRunner + // is in it's "normal" mode could result in unexpected behaviour) + guard !appReadyToStartQueues.wrappedValue else { return } + + // Clear any 'runOnceTransient' entries in the database (they should only ever be run during + // the app session that they were scheduled in) + dependencies.storage.writeAsync { db in + try Job.filter(Job.Columns.behaviour == Job.Behaviour.runOnceTransient).deleteAll(db) + } + + // This function is called by the app extensions to allow them to run jobs directly without + // triggering any recurring or pending jobs + // + // Note: This will only allow jobs to run if they are directly added to a job queue as if + // `canStartPendingJobs` returns `false` then any persisted jobs **WILL NOT** be fetched and + // added to the queue + forceAllowSingleExecutionJobs.mutate { $0 = true } + } + public func stopAndClearPendingJobs( exceptForVariant: Job.Variant?, using dependencies: Dependencies, @@ -678,28 +730,39 @@ public final class JobRunner: JobRunnerType { job: Job?, canStartJob: Bool, using dependencies: Dependencies - ) { - guard let job: Job = job else { return } // Ignore null jobs + ) -> Job? { + guard let job: Job = job else { return nil } // Ignore null jobs guard job.id != nil else { - add(db, job: job, canStartJob: canStartJob, using: dependencies) - return + // When we upsert a job that should be unique we want to return the existing job (if it exists) + switch job.uniqueHashValue { + case .none: return add(db, job: job, canStartJob: canStartJob, using: dependencies) + case .some: + let existingJob: Job? = try? Job + .filter(Job.Columns.variant == job.variant) + .filter(Job.Columns.uniqueHashValue == job.uniqueHashValue) + .fetchOne(db) + + return (existingJob ?? add(db, job: job, canStartJob: canStartJob, using: dependencies)) + } } - guard let updatedJob: Job = validatedJob(db, job: job, validation: .enqueueOnly) else { return } + guard let updatedJob: Job = validatedJob(db, job: job, validation: .enqueueOnly) else { return nil } // Don't add to the queue if the JobRunner isn't ready (it's been saved to the db so it'll be loaded // once the queue actually get started later) - guard canAddToQueue(updatedJob) else { return } + guard canAddToQueue(updatedJob) else { return updatedJob } let jobQueue: JobQueue? = queues.wrappedValue[updatedJob.variant] jobQueue?.upsert(db, job: updatedJob, canStartJob: canStartJob, using: dependencies) // Don't start the queue if the job can't be started - guard canStartJob else { return } + guard canStartJob else { return updatedJob } // Start the job runner if needed db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(jobQueue?.queueContext ?? "N/A")") { _ in jobQueue?.start(using: dependencies) } + + return updatedJob } @discardableResult public func insert( @@ -798,6 +861,10 @@ public final class JobRunner: JobRunnerType { return ( job.behaviour == .runOnceNextLaunch || job.behaviour == .recurringOnLaunch || + ( + job.behaviour == .runOnceTransient && + forceAllowSingleExecutionJobs.wrappedValue + ) || appHasBecomeActive.wrappedValue ) } @@ -949,6 +1016,7 @@ public final class JobQueue: Hashable { private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) fileprivate var canStart: ((JobQueue?) -> Bool)? + fileprivate var canStartPendingJobs: ((JobQueue?) -> Bool)? fileprivate var onQueueDrained: (() -> ())? fileprivate var hasStartedAtLeastOnce: Atomic = Atomic(false) fileprivate var isRunning: Atomic = Atomic(false) @@ -1253,21 +1321,25 @@ public final class JobQueue: Hashable { hasStartedAtLeastOnce.mutate { $0 = true } // Get any pending jobs - let jobVariants: [Job.Variant] = self.jobVariants + var jobsToRun: [Job] = [] let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue - let jobsAlreadyInQueue: Set = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet() - let jobsToRun: [Job] = dependencies.storage.read(using: dependencies) { db in - try Job - .filterPendingJobs( - variants: jobVariants, - excludeFutureJobs: true, - includeJobsWithDependencies: false - ) - .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running - .filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue - .fetchAll(db) + + if canStartPendingJobs?(self) == true { + let jobVariants: [Job.Variant] = self.jobVariants + let jobsAlreadyInQueue: Set = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet() + jobsToRun = dependencies.storage.read(using: dependencies) { db in + try Job + .filterPendingJobs( + variants: jobVariants, + excludeFutureJobs: true, + includeJobsWithDependencies: false + ) + .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running + .filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue + .fetchAll(db) + } + .defaulting(to: []) } - .defaulting(to: []) // Determine the number of jobs to run var jobCount: Int = 0 @@ -1454,6 +1526,15 @@ public final class JobQueue: Hashable { } private func scheduleNextSoonestJob(using dependencies: Dependencies) { + // If we can't schedule pending jobs then complete the queue + guard canStartPendingJobs?(self) == true else { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { + self.onQueueDrained?() + } + return + } + + // Retrieve any pending jobs from the database let jobVariants: [Job.Variant] = self.jobVariants let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let nextJobTimestamp: TimeInterval? = dependencies.storage.read(using: dependencies) { db in @@ -1835,6 +1916,10 @@ public extension JobRunner { instance.appDidBecomeActive(using: dependencies) } + static func enableNewSingleExecutionJobsOnly(using dependencies: Dependencies = Dependencies()) { + instance.enableNewSingleExecutionJobsOnly(using: dependencies) + } + static func afterBlockingQueue(callback: @escaping () -> ()) { instance.afterBlockingQueue(callback: callback) } @@ -1856,12 +1941,12 @@ public extension JobRunner { /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started - static func upsert( + @discardableResult static func upsert( _ db: Database, job: Job?, canStartJob: Bool = true, using dependencies: Dependencies = Dependencies() - ) { instance.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) } + ) -> Job? { return instance.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) } @discardableResult static func insert( _ db: Database,