diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index bdf4a4399..2b8ce7cf0 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -1921,6 +1921,10 @@ extension ConversationVC: } case .contact, .legacyGroup, .group: + let targetPublicKey: String = (cellViewModel.threadVariant == .contact ? + userPublicKey : + cellViewModel.threadId + ) let serverHash: String? = Storage.shared.read { db -> String? in try Interaction .select(.serverHash) @@ -1996,16 +2000,7 @@ extension ConversationVC: accessibilityIdentifier: "Delete for everyone", style: .destructive ) { [weak self] _ in - deleteRemotely( - from: self, - request: SnodeAPI - .deleteMessages( - publicKey: cellViewModel.threadId, - serverHashes: [serverHash] - ) - .map { _ in () } - .eraseToAnyPublisher() - ) { [weak self] in + let completeServerDeletion = { [weak self] in Storage.shared.writeAsync { db in try MessageSender .send( @@ -2019,6 +2014,22 @@ extension ConversationVC: self?.showInputAccessoryView() } + + // We can only delete messages on the server for `contact` and `group` conversations + guard cellViewModel.threadVariant == .contact || cellViewModel.threadVariant == .group else { + return completeServerDeletion() + } + + deleteRemotely( + from: self, + request: SnodeAPI + .deleteMessages( + publicKey: targetPublicKey, + serverHashes: [serverHash] + ) + .map { _ in () } + .eraseToAnyPublisher() + ) { completeServerDeletion() } }) actionSheet.addAction(UIAlertAction.init(title: "TXT_CANCEL_TITLE".localized(), style: .cancel) { [weak self] _ in diff --git a/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift b/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift index 94f52b6d5..1f2cd628f 100644 --- a/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift +++ b/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift @@ -9,6 +9,13 @@ final class ReactionContainerView: UIView { private static let arrowSize: CGSize = CGSize(width: 15, height: 13) private static let arrowSpacing: CGFloat = Values.verySmallSpacing + // We have explicit limits on the number of emoji which should be displayed before they + // automatically get collapsed, these values are consistent across platforms so are set + // here (even though the logic will automatically calculate and limit to a single line + // of reactions dynamically for the size of the view) + private static let numCollapsedEmoji: Int = 4 + private static let maxEmojiBeforeCollapse: Int = 6 + private var maxWidth: CGFloat = 0 private var collapsedCount: Int = 0 private var showingAllReactions: Bool = false @@ -173,7 +180,10 @@ final class ReactionContainerView: UIView { numReactions += 1 } - return numReactions + return (numReactions > ReactionContainerView.maxEmojiBeforeCollapse ? + ReactionContainerView.numCollapsedEmoji : + numReactions + ) }() self.showNumbers = showNumbers self.reactionViews = [] diff --git a/Session/Meta/SessionApp.swift b/Session/Meta/SessionApp.swift index c8c5e1eed..2adb9eb7c 100644 --- a/Session/Meta/SessionApp.swift +++ b/Session/Meta/SessionApp.swift @@ -71,7 +71,8 @@ public struct SessionApp { // This _should_ be wiped out below. Logger.error("") DDLog.flushLog() - + + SessionUtil.clearMemoryState() Storage.resetAllStorage() ProfileManager.resetProfileStorage() Attachment.resetAttachmentStorage() diff --git a/Session/Onboarding/LinkDeviceVC.swift b/Session/Onboarding/LinkDeviceVC.swift index faf0bc6c6..0c4b0af0a 100644 --- a/Session/Onboarding/LinkDeviceVC.swift +++ b/Session/Onboarding/LinkDeviceVC.swift @@ -86,6 +86,12 @@ final class LinkDeviceVC: BaseVC, UIPageViewControllerDataSource, UIPageViewCont scanQRCodePlaceholderVC.constrainHeight(to: height) } + override func viewWillAppear(_ animated: Bool) { + super.viewWillAppear(animated) + + Onboarding.Flow.register.unregister() + } + override func viewDidAppear(_ animated: Bool) { super.viewDidAppear(animated) tabBarTopConstraint.constant = navigationController!.navigationBar.height() diff --git a/Session/Onboarding/Onboarding.swift b/Session/Onboarding/Onboarding.swift index 902143c9c..8c664c045 100644 --- a/Session/Onboarding/Onboarding.swift +++ b/Session/Onboarding/Onboarding.swift @@ -8,111 +8,152 @@ import SessionUtilitiesKit import SessionMessagingKit enum Onboarding { - private static let profileNameRetrievalPublisher: Atomic> = { + private static let profileNameRetrievalIdentifier: Atomic = Atomic(nil) + private static let profileNameRetrievalPublisher: Atomic?> = Atomic(nil) + public static var profileNamePublisher: AnyPublisher { + guard let existingPublisher: AnyPublisher = profileNameRetrievalPublisher.wrappedValue else { + return profileNameRetrievalPublisher.mutate { value in + let requestId: UUID = UUID() + let result: AnyPublisher = createProfileNameRetrievalPublisher(requestId) + + value = result + profileNameRetrievalIdentifier.mutate { $0 = requestId } + return result + } + } + + return existingPublisher + } + + private static func createProfileNameRetrievalPublisher(_ requestId: UUID) -> AnyPublisher { // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent guard SessionUtil.userConfigsEnabled else { - return Atomic( - Just(nil) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - ) + return Just(nil) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() } let userPublicKey: String = getUserHexEncodedPublicKey() - return Atomic( - SnodeAPI.getSwarm(for: userPublicKey) - .subscribe(on: DispatchQueue.global(qos: .userInitiated)) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return CurrentUserPoller - .poll( - namespaces: [.configUserProfile], - from: snode, - for: userPublicKey, - on: DispatchQueue.global(qos: .userInitiated), - // Note: These values mean the received messages will be - // processed immediately rather than async as part of a Job - calledFromBackgroundPoller: true, - isBackgroundPollValid: { true } - ) - .tryFlatMap { receivedMessageTypes -> AnyPublisher in - // FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough - guard receivedMessageTypes.isEmpty else { - return Just(()) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - } - - SNLog("Onboarding failed to retrieve user config, checking for legacy config") - - return CurrentUserPoller - .poll( - namespaces: [.default], - from: snode, - for: userPublicKey, - on: DispatchQueue.global(qos: .userInitiated), - // Note: These values mean the received messages will be - // processed immediately rather than async as part of a Job - calledFromBackgroundPoller: true, - isBackgroundPollValid: { true } - ) - .tryMap { receivedMessageTypes -> Void in - guard - let message: ConfigurationMessage = receivedMessageTypes - .last(where: { $0 is ConfigurationMessage }) - .asType(ConfigurationMessage.self), - let displayName: String = message.displayName - else { return () } - - // Handle user profile changes - Storage.shared.write { db in - try ProfileManager.updateProfileIfNeeded( - db, - publicKey: userPublicKey, - name: displayName, - avatarUpdate: { - guard - let profilePictureUrl: String = message.profilePictureUrl, - let profileKey: Data = message.profileKey - else { return .none } - - return .updateTo( - url: profilePictureUrl, - key: profileKey, - fileName: nil - ) - }(), - sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000), - calledFromConfigHandling: false - ) - } - return () - } + return SnodeAPI.getSwarm(for: userPublicKey) + .subscribe(on: DispatchQueue.global(qos: .userInitiated)) + .tryFlatMapWithRandomSnode { snode -> AnyPublisher in + CurrentUserPoller + .poll( + namespaces: [.configUserProfile], + from: snode, + for: userPublicKey, + on: DispatchQueue.global(qos: .userInitiated), + // Note: These values mean the received messages will be + // processed immediately rather than async as part of a Job + calledFromBackgroundPoller: true, + isBackgroundPollValid: { true } + ) + .tryFlatMap { receivedMessageTypes -> AnyPublisher in + // FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough + guard + receivedMessageTypes.isEmpty, + requestId == profileNameRetrievalIdentifier.wrappedValue + else { + return Just(()) + .setFailureType(to: Error.self) .eraseToAnyPublisher() } - } - .flatMap { _ -> AnyPublisher in - Storage.shared.readPublisher { db in - try Profile - .filter(id: userPublicKey) - .select(.name) - .asRequest(of: String.self) - .fetchOne(db) + + SNLog("Onboarding failed to retrieve user config, checking for legacy config") + + return CurrentUserPoller + .poll( + namespaces: [.default], + from: snode, + for: userPublicKey, + on: DispatchQueue.global(qos: .userInitiated), + // Note: These values mean the received messages will be + // processed immediately rather than async as part of a Job + calledFromBackgroundPoller: true, + isBackgroundPollValid: { true } + ) + .tryMap { receivedMessageTypes -> Void in + guard + let message: ConfigurationMessage = receivedMessageTypes + .last(where: { $0 is ConfigurationMessage }) + .asType(ConfigurationMessage.self), + let displayName: String = message.displayName, + requestId == profileNameRetrievalIdentifier.wrappedValue + else { return () } + + // Handle user profile changes + Storage.shared.write { db in + try ProfileManager.updateProfileIfNeeded( + db, + publicKey: userPublicKey, + name: displayName, + avatarUpdate: { + guard + let profilePictureUrl: String = message.profilePictureUrl, + let profileKey: Data = message.profileKey + else { return .none } + + return .updateTo( + url: profilePictureUrl, + key: profileKey, + fileName: nil + ) + }(), + sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000), + calledFromConfigHandling: false + ) + } + return () + } + .eraseToAnyPublisher() } + } + .map { _ -> String? in + guard requestId == profileNameRetrievalIdentifier.wrappedValue else { + return nil } - .shareReplay(1) - .eraseToAnyPublisher() - ) - }() - public static var profileNamePublisher: AnyPublisher { - profileNameRetrievalPublisher.wrappedValue + + return Storage.shared.read { db in + try Profile + .filter(id: userPublicKey) + .select(.name) + .asRequest(of: String.self) + .fetchOne(db) + } + } + .shareReplay(1) + .eraseToAnyPublisher() } enum Flow { case register, recover, link + /// If the user returns to an earlier screen during Onboarding we might need to clear out a partially created + /// account (eg. returning from the PN setting screen to the seed entry screen when linking a device) + func unregister() { + // Clear the in-memory state from SessionUtil + SessionUtil.clearMemoryState() + + // Clear any data which gets set during Onboarding + Storage.shared.write { db in + db[.hasViewedSeed] = false + + try SessionThread.deleteAll(db) + try Profile.deleteAll(db) + try Contact.deleteAll(db) + try Identity.deleteAll(db) + try ConfigDump.deleteAll(db) + try SnodeReceivedMessageInfo.deleteAll(db) + } + + // Clear the profile name retrieve publisher + profileNameRetrievalIdentifier.mutate { $0 = nil } + profileNameRetrievalPublisher.mutate { $0 = nil } + + UserDefaults.standard[.hasSyncedInitialConfiguration] = false + } + func preregister(with seed: Data, ed25519KeyPair: KeyPair, x25519KeyPair: KeyPair) { let x25519PublicKey = x25519KeyPair.hexEncodedPublicKey diff --git a/Session/Onboarding/PNModeVC.swift b/Session/Onboarding/PNModeVC.swift index 6bc15d8d2..8c60ab51e 100644 --- a/Session/Onboarding/PNModeVC.swift +++ b/Session/Onboarding/PNModeVC.swift @@ -174,7 +174,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate { } // If we don't have one then show a loading indicator and try to retrieve the existing name - ModalActivityIndicatorViewController.present(fromViewController: self) { viewController in + ModalActivityIndicatorViewController.present(fromViewController: self) { [weak self, flow = self.flow] viewController in Onboarding.profileNamePublisher .timeout(.seconds(15), scheduler: DispatchQueue.main, customError: { HTTPError.timeout }) .catch { _ -> AnyPublisher in @@ -185,7 +185,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate { } .receive(on: DispatchQueue.main) .sinkUntilComplete( - receiveValue: { [weak self, flow = self.flow] value in + receiveValue: { value in // Hide the loading indicator viewController.dismiss(animated: true) diff --git a/Session/Onboarding/RegisterVC.swift b/Session/Onboarding/RegisterVC.swift index 56979231f..52cc441a6 100644 --- a/Session/Onboarding/RegisterVC.swift +++ b/Session/Onboarding/RegisterVC.swift @@ -151,6 +151,12 @@ final class RegisterVC : BaseVC { updateSeed() } + override func viewWillAppear(_ animated: Bool) { + super.viewWillAppear(animated) + + Onboarding.Flow.register.unregister() + } + // MARK: General @objc private func enableCopyButton() { copyPublicKeyButton.isUserInteractionEnabled = true diff --git a/Session/Onboarding/RestoreVC.swift b/Session/Onboarding/RestoreVC.swift index abddf58f6..e196c5f06 100644 --- a/Session/Onboarding/RestoreVC.swift +++ b/Session/Onboarding/RestoreVC.swift @@ -128,8 +128,15 @@ final class RestoreVC: BaseVC { notificationCenter.addObserver(self, selector: #selector(handleKeyboardWillHideNotification(_:)), name: UIResponder.keyboardWillHideNotification, object: nil) } + override func viewWillAppear(_ animated: Bool) { + super.viewWillAppear(animated) + + Onboarding.Flow.register.unregister() + } + override func viewDidAppear(_ animated: Bool) { super.viewDidAppear(animated) + // On small screens we hide the legal label when the keyboard is up, but it's important that the user sees it so // in those instances we don't make the keyboard come up automatically if !isIPhone5OrSmaller { diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index d468e8c0c..e00e46f94 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -67,10 +67,8 @@ public final class BackgroundPoller { return SnodeAPI.getSwarm(for: userPublicKey) .subscribeOnMain(immediately: true) .receiveOnMain(immediately: true) - .tryFlatMap { swarm -> AnyPublisher<[Message], Error> in - guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return CurrentUserPoller.poll( + .tryFlatMapWithRandomSnode { snode -> AnyPublisher<[Message], Error> in + CurrentUserPoller.poll( namespaces: CurrentUserPoller.namespaces, from: snode, for: userPublicKey, diff --git a/SessionMessagingKit/Database/Models/Interaction.swift b/SessionMessagingKit/Database/Models/Interaction.swift index 6abfd584e..21fdd5456 100644 --- a/SessionMessagingKit/Database/Models/Interaction.swift +++ b/SessionMessagingKit/Database/Models/Interaction.swift @@ -846,10 +846,9 @@ public extension Interaction { .asRequest(of: Attachment.DescriptionInfo.self) .fetchOne(db), attachmentCount: try? attachments.fetchCount(db), - isOpenGroupInvitation: (try? linkPreview + isOpenGroupInvitation: linkPreview .filter(LinkPreview.Columns.variant == LinkPreview.Variant.openGroupInvitation) - .isNotEmpty(db)) - .defaulting(to: false) + .isNotEmpty(db) ) case .infoMediaSavedNotification, .infoScreenshotNotification, .infoCall: diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 8a9e3e218..50a462f9a 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -31,7 +31,16 @@ public enum MessageSendJob: JobExecutor { // so extract them from any associated attachments var messageFileIds: [String] = [] - if details.message is VisibleMessage { + /// Ensure any associated attachments have already been uploaded before sending the message + /// + /// **Note:** Reactions reference their original message so we need to ignore this logic for reaction messages to ensure we don't + /// incorrectly re-upload incoming attachments that the user reacted to, we also want to exclude "sync" messages since they should + /// already have attachments in a valid state + if + details.message is VisibleMessage, + (details.message as? VisibleMessage)?.reaction == nil && + details.isSyncMessage == false + { guard let jobId: Int64 = job.id, let interactionId: Int64 = job.interactionId diff --git a/SessionMessagingKit/SessionUtil/SessionUtil.swift b/SessionMessagingKit/SessionUtil/SessionUtil.swift index bcc1e67aa..1735f6b6c 100644 --- a/SessionMessagingKit/SessionUtil/SessionUtil.swift +++ b/SessionMessagingKit/SessionUtil/SessionUtil.swift @@ -10,6 +10,7 @@ import SessionUtilitiesKit public extension Features { static func useSharedUtilForUserConfig(_ db: Database? = nil) -> Bool { + return true // TODO: Need to set this timestamp to the correct date (currently start of 2030) // guard Date().timeIntervalSince1970 < 1893456000 else { return true } guard !SessionUtil.hasCheckedMigrationsCompleted.wrappedValue else { @@ -141,6 +142,15 @@ public enum SessionUtil { // MARK: - Loading + public static func clearMemoryState() { + // Ensure we have a loaded state before we continue + guard !SessionUtil.configStore.wrappedValue.isEmpty else { return } + + SessionUtil.configStore.mutate { confStore in + confStore.removeAll() + } + } + public static func loadState( _ db: Database? = nil, userPublicKey: String, diff --git a/SessionSnodeKit/Networking/SnodeAPI.swift b/SessionSnodeKit/Networking/SnodeAPI.swift index 39cf4d59e..b0d7112bd 100644 --- a/SessionSnodeKit/Networking/SnodeAPI.swift +++ b/SessionSnodeKit/Networking/SnodeAPI.swift @@ -639,10 +639,8 @@ public final class SnodeAPI { } return getSwarm(for: publicKey) - .tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return try sendMessage(to: snode) + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> in + try sendMessage(to: snode) .tryMap { info, response -> (ResponseInfoType, SendMessagesResponse) in try response.validateResultMap( sodium: sodium.wrappedValue, @@ -651,11 +649,8 @@ public final class SnodeAPI { return (info, response) } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } public static func sendConfigMessages( @@ -732,10 +727,8 @@ public final class SnodeAPI { let responseTypes = requests.map { $0.responseType } return getSwarm(for: publicKey) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return SnodeAPI + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher in + SnodeAPI .send( request: SnodeRequest( endpoint: .sequence, @@ -749,8 +742,6 @@ public final class SnodeAPI { .decoded(as: responseTypes, requireAllResults: false, using: dependencies) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } // MARK: - Edit @@ -768,10 +759,8 @@ public final class SnodeAPI { return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher<[String: [(hash: String, expiry: UInt64)]], Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return SnodeAPI + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: [(hash: String, expiry: UInt64)]], Error> in + SnodeAPI .send( request: SnodeRequest( endpoint: .expire, @@ -796,11 +785,8 @@ public final class SnodeAPI { validationData: serverHashes ) } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } public static func revokeSubkey( @@ -815,10 +801,8 @@ public final class SnodeAPI { return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return SnodeAPI + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher in + SnodeAPI .send( request: SnodeRequest( endpoint: .revokeSubkey, @@ -843,11 +827,8 @@ public final class SnodeAPI { return () } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } // MARK: Delete @@ -866,61 +847,46 @@ public final class SnodeAPI { return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) - .flatMap { swarm -> AnyPublisher<[String: Bool], Error> in - Just(()) - .setFailureType(to: Error.self) - .tryMap { _ -> Snode in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in + SnodeAPI + .send( + request: SnodeRequest( + endpoint: .deleteMessages, + body: DeleteMessagesRequest( + messageHashes: serverHashes, + requireSuccessfulDeletion: false, + pubkey: userX25519PublicKey, + ed25519PublicKey: userED25519KeyPair.publicKey, + ed25519SecretKey: userED25519KeyPair.secretKey + ) + ), + to: snode, + associatedWith: publicKey, + using: dependencies + ) + .decoded(as: DeleteMessagesResponse.self, using: dependencies) + .tryMap { _, response -> [String: Bool] in + let validResultMap: [String: Bool] = try response.validResultMap( + sodium: sodium.wrappedValue, + userX25519PublicKey: userX25519PublicKey, + validationData: serverHashes + ) - return snode - } - .flatMap { snode -> AnyPublisher<[String: Bool], Error> in - SnodeAPI - .send( - request: SnodeRequest( - endpoint: .deleteMessages, - body: DeleteMessagesRequest( - messageHashes: serverHashes, - requireSuccessfulDeletion: false, - pubkey: userX25519PublicKey, - ed25519PublicKey: userED25519KeyPair.publicKey, - ed25519SecretKey: userED25519KeyPair.secretKey - ) - ), - to: snode, - associatedWith: publicKey, - using: dependencies + // If `validResultMap` didn't throw then at least one service node + // deleted successfully so we should mark the hash as invalid so we + // don't try to fetch updates using that hash going forward (if we + // do we would end up re-fetching all old messages) + Storage.shared.writeAsync { db in + try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( + db, + potentiallyInvalidHashes: serverHashes ) - .subscribe(on: Threading.workQueue) - .eraseToAnyPublisher() - .decoded(as: DeleteMessagesResponse.self, using: dependencies) - .tryMap { _, response -> [String: Bool] in - let validResultMap: [String: Bool] = try response.validResultMap( - sodium: sodium.wrappedValue, - userX25519PublicKey: userX25519PublicKey, - validationData: serverHashes - ) - - // If `validResultMap` didn't throw then at least one service node - // deleted successfully so we should mark the hash as invalid so we - // don't try to fetch updates using that hash going forward (if we - // do we would end up re-fetching all old messages) - Storage.shared.writeAsync { db in - try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( - db, - potentiallyInvalidHashes: serverHashes - ) - } - - return validResultMap - } - .retry(maxRetryCount) - .eraseToAnyPublisher() + } + + return validResultMap } .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } /// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation. @@ -937,10 +903,8 @@ public final class SnodeAPI { return getSwarm(for: userX25519PublicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher<[String: Bool], Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return getNetworkTime(from: snode) + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in + getNetworkTime(from: snode) .flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( @@ -968,11 +932,8 @@ public final class SnodeAPI { } .eraseToAnyPublisher() } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } /// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation. @@ -990,10 +951,8 @@ public final class SnodeAPI { return getSwarm(for: userX25519PublicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher<[String: Bool], Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return getNetworkTime(from: snode) + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in + getNetworkTime(from: snode) .flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( @@ -1022,11 +981,8 @@ public final class SnodeAPI { } .eraseToAnyPublisher() } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } // MARK: - Internal API @@ -1377,3 +1333,31 @@ public final class SNSnodeAPI: NSObject { return UInt64(SnodeAPI.currentOffsetTimestampMs()) } } + +// MARK: - Convenience + +public extension Publisher where Output == Set { + func tryFlatMapWithRandomSnode( + maxPublishers: Subscribers.Demand = .unlimited, + retry retries: Int = 0, + _ transform: @escaping (Snode) throws -> P + ) -> AnyPublisher where T == P.Output, P: Publisher, P.Failure == Error { + return self + .mapError { $0 } + .flatMap(maxPublishers: maxPublishers) { swarm -> AnyPublisher in + var remainingSnodes: Set = swarm + + return Just(()) + .setFailureType(to: Error.self) + .tryFlatMap(maxPublishers: maxPublishers) { _ -> AnyPublisher in + let snode: Snode = try remainingSnodes.popRandomElement() ?? { throw SnodeAPIError.generic }() + + return try transform(snode) + .eraseToAnyPublisher() + } + .retry(retries) + .eraseToAnyPublisher() + } + .eraseToAnyPublisher() + } +} diff --git a/SessionUtilitiesKit/Combine/Publisher+Utilities.swift b/SessionUtilitiesKit/Combine/Publisher+Utilities.swift index a49341ae4..4df5f143f 100644 --- a/SessionUtilitiesKit/Combine/Publisher+Utilities.swift +++ b/SessionUtilitiesKit/Combine/Publisher+Utilities.swift @@ -137,7 +137,7 @@ public extension AnyPublisher { // MARK: - Data Decoding -public extension AnyPublisher where Output == Data, Failure == Error { +public extension Publisher where Output == Data, Failure == Error { func decoded( as type: R.Type, using dependencies: Dependencies = Dependencies() @@ -148,7 +148,7 @@ public extension AnyPublisher where Output == Data, Failure == Error { } } -public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error { +public extension Publisher where Output == (ResponseInfoType, Data?), Failure == Error { func decoded( as type: R.Type, using dependencies: Dependencies = Dependencies() diff --git a/SessionUtilitiesKit/General/Set+Utilities.swift b/SessionUtilitiesKit/General/Set+Utilities.swift index 5fb2d416b..c15f96f6a 100644 --- a/SessionUtilitiesKit/General/Set+Utilities.swift +++ b/SessionUtilitiesKit/General/Set+Utilities.swift @@ -29,4 +29,11 @@ public extension Set { return updatedSet } + + mutating func popRandomElement() -> Element? { + guard let value: Element = randomElement() else { return nil } + + self.remove(value) + return value + } } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 616ab687f..6364f8d77 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -537,6 +537,11 @@ private final class JobQueue { } queue.mutate { $0.append(job) } + + // If this is a concurrent queue then we should immediately start the next job + guard executionType == .concurrent else { return } + + runNextJob() } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start diff --git a/SessionUtilitiesKit/Networking/BatchResponse.swift b/SessionUtilitiesKit/Networking/BatchResponse.swift index 78575a775..4b0e244e8 100644 --- a/SessionUtilitiesKit/Networking/BatchResponse.swift +++ b/SessionUtilitiesKit/Networking/BatchResponse.swift @@ -78,7 +78,7 @@ public extension Decodable { } } -public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error { +public extension Publisher where Output == (ResponseInfoType, Data?), Failure == Error { func decoded( as types: HTTP.BatchResponseTypes, requireAllResults: Bool = true,