From 3f362a71f3f5e1026d71a3ea85d03d239c6647c6 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Tue, 23 May 2023 09:42:10 +1000 Subject: [PATCH] Fixed a couple of QA issues Forced the user config feature to be on (for testing) Fixed a bug where triggering the 'Delete for everyone' functionality would incorrectly try to delete from the recipient swarm (not possible) Fixed a bug where the 'profileNamePublisher' could only be set once resulting in potential issues if you try to restore different accounts within the same session Re-added the limit to the number of reactions to display before collapsing to make it consistent with the designs and other platforms Updated the SnodeAPI to ensure that when it retries it will actually select a new snode --- .../ConversationVC+Interaction.swift | 31 ++- .../Content Views/ReactionContainerView.swift | 12 +- Session/Meta/SessionApp.swift | 3 +- Session/Onboarding/LinkDeviceVC.swift | 6 + Session/Onboarding/Onboarding.swift | 221 +++++++++++------- Session/Onboarding/PNModeVC.swift | 4 +- Session/Onboarding/RegisterVC.swift | 6 + Session/Onboarding/RestoreVC.swift | 7 + Session/Utilities/BackgroundPoller.swift | 6 +- .../Database/Models/Interaction.swift | 5 +- .../Jobs/Types/MessageSendJob.swift | 11 +- .../SessionUtil/SessionUtil.swift | 10 + SessionSnodeKit/Networking/SnodeAPI.swift | 166 ++++++------- .../Combine/Publisher+Utilities.swift | 4 +- .../General/Set+Utilities.swift | 7 + SessionUtilitiesKit/JobRunner/JobRunner.swift | 5 + .../Networking/BatchResponse.swift | 2 +- 17 files changed, 300 insertions(+), 206 deletions(-) diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index bdf4a4399..2b8ce7cf0 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -1921,6 +1921,10 @@ extension ConversationVC: } case .contact, .legacyGroup, .group: + let targetPublicKey: String = (cellViewModel.threadVariant == .contact ? + userPublicKey : + cellViewModel.threadId + ) let serverHash: String? = Storage.shared.read { db -> String? in try Interaction .select(.serverHash) @@ -1996,16 +2000,7 @@ extension ConversationVC: accessibilityIdentifier: "Delete for everyone", style: .destructive ) { [weak self] _ in - deleteRemotely( - from: self, - request: SnodeAPI - .deleteMessages( - publicKey: cellViewModel.threadId, - serverHashes: [serverHash] - ) - .map { _ in () } - .eraseToAnyPublisher() - ) { [weak self] in + let completeServerDeletion = { [weak self] in Storage.shared.writeAsync { db in try MessageSender .send( @@ -2019,6 +2014,22 @@ extension ConversationVC: self?.showInputAccessoryView() } + + // We can only delete messages on the server for `contact` and `group` conversations + guard cellViewModel.threadVariant == .contact || cellViewModel.threadVariant == .group else { + return completeServerDeletion() + } + + deleteRemotely( + from: self, + request: SnodeAPI + .deleteMessages( + publicKey: targetPublicKey, + serverHashes: [serverHash] + ) + .map { _ in () } + .eraseToAnyPublisher() + ) { completeServerDeletion() } }) actionSheet.addAction(UIAlertAction.init(title: "TXT_CANCEL_TITLE".localized(), style: .cancel) { [weak self] _ in diff --git a/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift b/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift index 94f52b6d5..1f2cd628f 100644 --- a/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift +++ b/Session/Conversations/Message Cells/Content Views/ReactionContainerView.swift @@ -9,6 +9,13 @@ final class ReactionContainerView: UIView { private static let arrowSize: CGSize = CGSize(width: 15, height: 13) private static let arrowSpacing: CGFloat = Values.verySmallSpacing + // We have explicit limits on the number of emoji which should be displayed before they + // automatically get collapsed, these values are consistent across platforms so are set + // here (even though the logic will automatically calculate and limit to a single line + // of reactions dynamically for the size of the view) + private static let numCollapsedEmoji: Int = 4 + private static let maxEmojiBeforeCollapse: Int = 6 + private var maxWidth: CGFloat = 0 private var collapsedCount: Int = 0 private var showingAllReactions: Bool = false @@ -173,7 +180,10 @@ final class ReactionContainerView: UIView { numReactions += 1 } - return numReactions + return (numReactions > ReactionContainerView.maxEmojiBeforeCollapse ? + ReactionContainerView.numCollapsedEmoji : + numReactions + ) }() self.showNumbers = showNumbers self.reactionViews = [] diff --git a/Session/Meta/SessionApp.swift b/Session/Meta/SessionApp.swift index c8c5e1eed..2adb9eb7c 100644 --- a/Session/Meta/SessionApp.swift +++ b/Session/Meta/SessionApp.swift @@ -71,7 +71,8 @@ public struct SessionApp { // This _should_ be wiped out below. Logger.error("") DDLog.flushLog() - + + SessionUtil.clearMemoryState() Storage.resetAllStorage() ProfileManager.resetProfileStorage() Attachment.resetAttachmentStorage() diff --git a/Session/Onboarding/LinkDeviceVC.swift b/Session/Onboarding/LinkDeviceVC.swift index faf0bc6c6..0c4b0af0a 100644 --- a/Session/Onboarding/LinkDeviceVC.swift +++ b/Session/Onboarding/LinkDeviceVC.swift @@ -86,6 +86,12 @@ final class LinkDeviceVC: BaseVC, UIPageViewControllerDataSource, UIPageViewCont scanQRCodePlaceholderVC.constrainHeight(to: height) } + override func viewWillAppear(_ animated: Bool) { + super.viewWillAppear(animated) + + Onboarding.Flow.register.unregister() + } + override func viewDidAppear(_ animated: Bool) { super.viewDidAppear(animated) tabBarTopConstraint.constant = navigationController!.navigationBar.height() diff --git a/Session/Onboarding/Onboarding.swift b/Session/Onboarding/Onboarding.swift index 902143c9c..8c664c045 100644 --- a/Session/Onboarding/Onboarding.swift +++ b/Session/Onboarding/Onboarding.swift @@ -8,111 +8,152 @@ import SessionUtilitiesKit import SessionMessagingKit enum Onboarding { - private static let profileNameRetrievalPublisher: Atomic> = { + private static let profileNameRetrievalIdentifier: Atomic = Atomic(nil) + private static let profileNameRetrievalPublisher: Atomic?> = Atomic(nil) + public static var profileNamePublisher: AnyPublisher { + guard let existingPublisher: AnyPublisher = profileNameRetrievalPublisher.wrappedValue else { + return profileNameRetrievalPublisher.mutate { value in + let requestId: UUID = UUID() + let result: AnyPublisher = createProfileNameRetrievalPublisher(requestId) + + value = result + profileNameRetrievalIdentifier.mutate { $0 = requestId } + return result + } + } + + return existingPublisher + } + + private static func createProfileNameRetrievalPublisher(_ requestId: UUID) -> AnyPublisher { // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent guard SessionUtil.userConfigsEnabled else { - return Atomic( - Just(nil) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - ) + return Just(nil) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() } let userPublicKey: String = getUserHexEncodedPublicKey() - return Atomic( - SnodeAPI.getSwarm(for: userPublicKey) - .subscribe(on: DispatchQueue.global(qos: .userInitiated)) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return CurrentUserPoller - .poll( - namespaces: [.configUserProfile], - from: snode, - for: userPublicKey, - on: DispatchQueue.global(qos: .userInitiated), - // Note: These values mean the received messages will be - // processed immediately rather than async as part of a Job - calledFromBackgroundPoller: true, - isBackgroundPollValid: { true } - ) - .tryFlatMap { receivedMessageTypes -> AnyPublisher in - // FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough - guard receivedMessageTypes.isEmpty else { - return Just(()) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - } - - SNLog("Onboarding failed to retrieve user config, checking for legacy config") - - return CurrentUserPoller - .poll( - namespaces: [.default], - from: snode, - for: userPublicKey, - on: DispatchQueue.global(qos: .userInitiated), - // Note: These values mean the received messages will be - // processed immediately rather than async as part of a Job - calledFromBackgroundPoller: true, - isBackgroundPollValid: { true } - ) - .tryMap { receivedMessageTypes -> Void in - guard - let message: ConfigurationMessage = receivedMessageTypes - .last(where: { $0 is ConfigurationMessage }) - .asType(ConfigurationMessage.self), - let displayName: String = message.displayName - else { return () } - - // Handle user profile changes - Storage.shared.write { db in - try ProfileManager.updateProfileIfNeeded( - db, - publicKey: userPublicKey, - name: displayName, - avatarUpdate: { - guard - let profilePictureUrl: String = message.profilePictureUrl, - let profileKey: Data = message.profileKey - else { return .none } - - return .updateTo( - url: profilePictureUrl, - key: profileKey, - fileName: nil - ) - }(), - sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000), - calledFromConfigHandling: false - ) - } - return () - } + return SnodeAPI.getSwarm(for: userPublicKey) + .subscribe(on: DispatchQueue.global(qos: .userInitiated)) + .tryFlatMapWithRandomSnode { snode -> AnyPublisher in + CurrentUserPoller + .poll( + namespaces: [.configUserProfile], + from: snode, + for: userPublicKey, + on: DispatchQueue.global(qos: .userInitiated), + // Note: These values mean the received messages will be + // processed immediately rather than async as part of a Job + calledFromBackgroundPoller: true, + isBackgroundPollValid: { true } + ) + .tryFlatMap { receivedMessageTypes -> AnyPublisher in + // FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough + guard + receivedMessageTypes.isEmpty, + requestId == profileNameRetrievalIdentifier.wrappedValue + else { + return Just(()) + .setFailureType(to: Error.self) .eraseToAnyPublisher() } - } - .flatMap { _ -> AnyPublisher in - Storage.shared.readPublisher { db in - try Profile - .filter(id: userPublicKey) - .select(.name) - .asRequest(of: String.self) - .fetchOne(db) + + SNLog("Onboarding failed to retrieve user config, checking for legacy config") + + return CurrentUserPoller + .poll( + namespaces: [.default], + from: snode, + for: userPublicKey, + on: DispatchQueue.global(qos: .userInitiated), + // Note: These values mean the received messages will be + // processed immediately rather than async as part of a Job + calledFromBackgroundPoller: true, + isBackgroundPollValid: { true } + ) + .tryMap { receivedMessageTypes -> Void in + guard + let message: ConfigurationMessage = receivedMessageTypes + .last(where: { $0 is ConfigurationMessage }) + .asType(ConfigurationMessage.self), + let displayName: String = message.displayName, + requestId == profileNameRetrievalIdentifier.wrappedValue + else { return () } + + // Handle user profile changes + Storage.shared.write { db in + try ProfileManager.updateProfileIfNeeded( + db, + publicKey: userPublicKey, + name: displayName, + avatarUpdate: { + guard + let profilePictureUrl: String = message.profilePictureUrl, + let profileKey: Data = message.profileKey + else { return .none } + + return .updateTo( + url: profilePictureUrl, + key: profileKey, + fileName: nil + ) + }(), + sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000), + calledFromConfigHandling: false + ) + } + return () + } + .eraseToAnyPublisher() } + } + .map { _ -> String? in + guard requestId == profileNameRetrievalIdentifier.wrappedValue else { + return nil } - .shareReplay(1) - .eraseToAnyPublisher() - ) - }() - public static var profileNamePublisher: AnyPublisher { - profileNameRetrievalPublisher.wrappedValue + + return Storage.shared.read { db in + try Profile + .filter(id: userPublicKey) + .select(.name) + .asRequest(of: String.self) + .fetchOne(db) + } + } + .shareReplay(1) + .eraseToAnyPublisher() } enum Flow { case register, recover, link + /// If the user returns to an earlier screen during Onboarding we might need to clear out a partially created + /// account (eg. returning from the PN setting screen to the seed entry screen when linking a device) + func unregister() { + // Clear the in-memory state from SessionUtil + SessionUtil.clearMemoryState() + + // Clear any data which gets set during Onboarding + Storage.shared.write { db in + db[.hasViewedSeed] = false + + try SessionThread.deleteAll(db) + try Profile.deleteAll(db) + try Contact.deleteAll(db) + try Identity.deleteAll(db) + try ConfigDump.deleteAll(db) + try SnodeReceivedMessageInfo.deleteAll(db) + } + + // Clear the profile name retrieve publisher + profileNameRetrievalIdentifier.mutate { $0 = nil } + profileNameRetrievalPublisher.mutate { $0 = nil } + + UserDefaults.standard[.hasSyncedInitialConfiguration] = false + } + func preregister(with seed: Data, ed25519KeyPair: KeyPair, x25519KeyPair: KeyPair) { let x25519PublicKey = x25519KeyPair.hexEncodedPublicKey diff --git a/Session/Onboarding/PNModeVC.swift b/Session/Onboarding/PNModeVC.swift index 6bc15d8d2..8c60ab51e 100644 --- a/Session/Onboarding/PNModeVC.swift +++ b/Session/Onboarding/PNModeVC.swift @@ -174,7 +174,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate { } // If we don't have one then show a loading indicator and try to retrieve the existing name - ModalActivityIndicatorViewController.present(fromViewController: self) { viewController in + ModalActivityIndicatorViewController.present(fromViewController: self) { [weak self, flow = self.flow] viewController in Onboarding.profileNamePublisher .timeout(.seconds(15), scheduler: DispatchQueue.main, customError: { HTTPError.timeout }) .catch { _ -> AnyPublisher in @@ -185,7 +185,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate { } .receive(on: DispatchQueue.main) .sinkUntilComplete( - receiveValue: { [weak self, flow = self.flow] value in + receiveValue: { value in // Hide the loading indicator viewController.dismiss(animated: true) diff --git a/Session/Onboarding/RegisterVC.swift b/Session/Onboarding/RegisterVC.swift index 56979231f..52cc441a6 100644 --- a/Session/Onboarding/RegisterVC.swift +++ b/Session/Onboarding/RegisterVC.swift @@ -151,6 +151,12 @@ final class RegisterVC : BaseVC { updateSeed() } + override func viewWillAppear(_ animated: Bool) { + super.viewWillAppear(animated) + + Onboarding.Flow.register.unregister() + } + // MARK: General @objc private func enableCopyButton() { copyPublicKeyButton.isUserInteractionEnabled = true diff --git a/Session/Onboarding/RestoreVC.swift b/Session/Onboarding/RestoreVC.swift index abddf58f6..e196c5f06 100644 --- a/Session/Onboarding/RestoreVC.swift +++ b/Session/Onboarding/RestoreVC.swift @@ -128,8 +128,15 @@ final class RestoreVC: BaseVC { notificationCenter.addObserver(self, selector: #selector(handleKeyboardWillHideNotification(_:)), name: UIResponder.keyboardWillHideNotification, object: nil) } + override func viewWillAppear(_ animated: Bool) { + super.viewWillAppear(animated) + + Onboarding.Flow.register.unregister() + } + override func viewDidAppear(_ animated: Bool) { super.viewDidAppear(animated) + // On small screens we hide the legal label when the keyboard is up, but it's important that the user sees it so // in those instances we don't make the keyboard come up automatically if !isIPhone5OrSmaller { diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index d468e8c0c..e00e46f94 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -67,10 +67,8 @@ public final class BackgroundPoller { return SnodeAPI.getSwarm(for: userPublicKey) .subscribeOnMain(immediately: true) .receiveOnMain(immediately: true) - .tryFlatMap { swarm -> AnyPublisher<[Message], Error> in - guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return CurrentUserPoller.poll( + .tryFlatMapWithRandomSnode { snode -> AnyPublisher<[Message], Error> in + CurrentUserPoller.poll( namespaces: CurrentUserPoller.namespaces, from: snode, for: userPublicKey, diff --git a/SessionMessagingKit/Database/Models/Interaction.swift b/SessionMessagingKit/Database/Models/Interaction.swift index 6abfd584e..21fdd5456 100644 --- a/SessionMessagingKit/Database/Models/Interaction.swift +++ b/SessionMessagingKit/Database/Models/Interaction.swift @@ -846,10 +846,9 @@ public extension Interaction { .asRequest(of: Attachment.DescriptionInfo.self) .fetchOne(db), attachmentCount: try? attachments.fetchCount(db), - isOpenGroupInvitation: (try? linkPreview + isOpenGroupInvitation: linkPreview .filter(LinkPreview.Columns.variant == LinkPreview.Variant.openGroupInvitation) - .isNotEmpty(db)) - .defaulting(to: false) + .isNotEmpty(db) ) case .infoMediaSavedNotification, .infoScreenshotNotification, .infoCall: diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 8a9e3e218..50a462f9a 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -31,7 +31,16 @@ public enum MessageSendJob: JobExecutor { // so extract them from any associated attachments var messageFileIds: [String] = [] - if details.message is VisibleMessage { + /// Ensure any associated attachments have already been uploaded before sending the message + /// + /// **Note:** Reactions reference their original message so we need to ignore this logic for reaction messages to ensure we don't + /// incorrectly re-upload incoming attachments that the user reacted to, we also want to exclude "sync" messages since they should + /// already have attachments in a valid state + if + details.message is VisibleMessage, + (details.message as? VisibleMessage)?.reaction == nil && + details.isSyncMessage == false + { guard let jobId: Int64 = job.id, let interactionId: Int64 = job.interactionId diff --git a/SessionMessagingKit/SessionUtil/SessionUtil.swift b/SessionMessagingKit/SessionUtil/SessionUtil.swift index bcc1e67aa..1735f6b6c 100644 --- a/SessionMessagingKit/SessionUtil/SessionUtil.swift +++ b/SessionMessagingKit/SessionUtil/SessionUtil.swift @@ -10,6 +10,7 @@ import SessionUtilitiesKit public extension Features { static func useSharedUtilForUserConfig(_ db: Database? = nil) -> Bool { + return true // TODO: Need to set this timestamp to the correct date (currently start of 2030) // guard Date().timeIntervalSince1970 < 1893456000 else { return true } guard !SessionUtil.hasCheckedMigrationsCompleted.wrappedValue else { @@ -141,6 +142,15 @@ public enum SessionUtil { // MARK: - Loading + public static func clearMemoryState() { + // Ensure we have a loaded state before we continue + guard !SessionUtil.configStore.wrappedValue.isEmpty else { return } + + SessionUtil.configStore.mutate { confStore in + confStore.removeAll() + } + } + public static func loadState( _ db: Database? = nil, userPublicKey: String, diff --git a/SessionSnodeKit/Networking/SnodeAPI.swift b/SessionSnodeKit/Networking/SnodeAPI.swift index 39cf4d59e..b0d7112bd 100644 --- a/SessionSnodeKit/Networking/SnodeAPI.swift +++ b/SessionSnodeKit/Networking/SnodeAPI.swift @@ -639,10 +639,8 @@ public final class SnodeAPI { } return getSwarm(for: publicKey) - .tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return try sendMessage(to: snode) + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> in + try sendMessage(to: snode) .tryMap { info, response -> (ResponseInfoType, SendMessagesResponse) in try response.validateResultMap( sodium: sodium.wrappedValue, @@ -651,11 +649,8 @@ public final class SnodeAPI { return (info, response) } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } public static func sendConfigMessages( @@ -732,10 +727,8 @@ public final class SnodeAPI { let responseTypes = requests.map { $0.responseType } return getSwarm(for: publicKey) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return SnodeAPI + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher in + SnodeAPI .send( request: SnodeRequest( endpoint: .sequence, @@ -749,8 +742,6 @@ public final class SnodeAPI { .decoded(as: responseTypes, requireAllResults: false, using: dependencies) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } // MARK: - Edit @@ -768,10 +759,8 @@ public final class SnodeAPI { return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher<[String: [(hash: String, expiry: UInt64)]], Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return SnodeAPI + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: [(hash: String, expiry: UInt64)]], Error> in + SnodeAPI .send( request: SnodeRequest( endpoint: .expire, @@ -796,11 +785,8 @@ public final class SnodeAPI { validationData: serverHashes ) } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } public static func revokeSubkey( @@ -815,10 +801,8 @@ public final class SnodeAPI { return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return SnodeAPI + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher in + SnodeAPI .send( request: SnodeRequest( endpoint: .revokeSubkey, @@ -843,11 +827,8 @@ public final class SnodeAPI { return () } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } // MARK: Delete @@ -866,61 +847,46 @@ public final class SnodeAPI { return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) - .flatMap { swarm -> AnyPublisher<[String: Bool], Error> in - Just(()) - .setFailureType(to: Error.self) - .tryMap { _ -> Snode in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in + SnodeAPI + .send( + request: SnodeRequest( + endpoint: .deleteMessages, + body: DeleteMessagesRequest( + messageHashes: serverHashes, + requireSuccessfulDeletion: false, + pubkey: userX25519PublicKey, + ed25519PublicKey: userED25519KeyPair.publicKey, + ed25519SecretKey: userED25519KeyPair.secretKey + ) + ), + to: snode, + associatedWith: publicKey, + using: dependencies + ) + .decoded(as: DeleteMessagesResponse.self, using: dependencies) + .tryMap { _, response -> [String: Bool] in + let validResultMap: [String: Bool] = try response.validResultMap( + sodium: sodium.wrappedValue, + userX25519PublicKey: userX25519PublicKey, + validationData: serverHashes + ) - return snode - } - .flatMap { snode -> AnyPublisher<[String: Bool], Error> in - SnodeAPI - .send( - request: SnodeRequest( - endpoint: .deleteMessages, - body: DeleteMessagesRequest( - messageHashes: serverHashes, - requireSuccessfulDeletion: false, - pubkey: userX25519PublicKey, - ed25519PublicKey: userED25519KeyPair.publicKey, - ed25519SecretKey: userED25519KeyPair.secretKey - ) - ), - to: snode, - associatedWith: publicKey, - using: dependencies + // If `validResultMap` didn't throw then at least one service node + // deleted successfully so we should mark the hash as invalid so we + // don't try to fetch updates using that hash going forward (if we + // do we would end up re-fetching all old messages) + Storage.shared.writeAsync { db in + try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( + db, + potentiallyInvalidHashes: serverHashes ) - .subscribe(on: Threading.workQueue) - .eraseToAnyPublisher() - .decoded(as: DeleteMessagesResponse.self, using: dependencies) - .tryMap { _, response -> [String: Bool] in - let validResultMap: [String: Bool] = try response.validResultMap( - sodium: sodium.wrappedValue, - userX25519PublicKey: userX25519PublicKey, - validationData: serverHashes - ) - - // If `validResultMap` didn't throw then at least one service node - // deleted successfully so we should mark the hash as invalid so we - // don't try to fetch updates using that hash going forward (if we - // do we would end up re-fetching all old messages) - Storage.shared.writeAsync { db in - try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( - db, - potentiallyInvalidHashes: serverHashes - ) - } - - return validResultMap - } - .retry(maxRetryCount) - .eraseToAnyPublisher() + } + + return validResultMap } .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } /// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation. @@ -937,10 +903,8 @@ public final class SnodeAPI { return getSwarm(for: userX25519PublicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher<[String: Bool], Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return getNetworkTime(from: snode) + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in + getNetworkTime(from: snode) .flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( @@ -968,11 +932,8 @@ public final class SnodeAPI { } .eraseToAnyPublisher() } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } /// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation. @@ -990,10 +951,8 @@ public final class SnodeAPI { return getSwarm(for: userX25519PublicKey) .subscribe(on: Threading.workQueue) - .tryFlatMap { swarm -> AnyPublisher<[String: Bool], Error> in - guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic } - - return getNetworkTime(from: snode) + .tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in + getNetworkTime(from: snode) .flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( @@ -1022,11 +981,8 @@ public final class SnodeAPI { } .eraseToAnyPublisher() } - .retry(maxRetryCount) .eraseToAnyPublisher() } - .retry(maxRetryCount) - .eraseToAnyPublisher() } // MARK: - Internal API @@ -1377,3 +1333,31 @@ public final class SNSnodeAPI: NSObject { return UInt64(SnodeAPI.currentOffsetTimestampMs()) } } + +// MARK: - Convenience + +public extension Publisher where Output == Set { + func tryFlatMapWithRandomSnode( + maxPublishers: Subscribers.Demand = .unlimited, + retry retries: Int = 0, + _ transform: @escaping (Snode) throws -> P + ) -> AnyPublisher where T == P.Output, P: Publisher, P.Failure == Error { + return self + .mapError { $0 } + .flatMap(maxPublishers: maxPublishers) { swarm -> AnyPublisher in + var remainingSnodes: Set = swarm + + return Just(()) + .setFailureType(to: Error.self) + .tryFlatMap(maxPublishers: maxPublishers) { _ -> AnyPublisher in + let snode: Snode = try remainingSnodes.popRandomElement() ?? { throw SnodeAPIError.generic }() + + return try transform(snode) + .eraseToAnyPublisher() + } + .retry(retries) + .eraseToAnyPublisher() + } + .eraseToAnyPublisher() + } +} diff --git a/SessionUtilitiesKit/Combine/Publisher+Utilities.swift b/SessionUtilitiesKit/Combine/Publisher+Utilities.swift index a49341ae4..4df5f143f 100644 --- a/SessionUtilitiesKit/Combine/Publisher+Utilities.swift +++ b/SessionUtilitiesKit/Combine/Publisher+Utilities.swift @@ -137,7 +137,7 @@ public extension AnyPublisher { // MARK: - Data Decoding -public extension AnyPublisher where Output == Data, Failure == Error { +public extension Publisher where Output == Data, Failure == Error { func decoded( as type: R.Type, using dependencies: Dependencies = Dependencies() @@ -148,7 +148,7 @@ public extension AnyPublisher where Output == Data, Failure == Error { } } -public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error { +public extension Publisher where Output == (ResponseInfoType, Data?), Failure == Error { func decoded( as type: R.Type, using dependencies: Dependencies = Dependencies() diff --git a/SessionUtilitiesKit/General/Set+Utilities.swift b/SessionUtilitiesKit/General/Set+Utilities.swift index 5fb2d416b..c15f96f6a 100644 --- a/SessionUtilitiesKit/General/Set+Utilities.swift +++ b/SessionUtilitiesKit/General/Set+Utilities.swift @@ -29,4 +29,11 @@ public extension Set { return updatedSet } + + mutating func popRandomElement() -> Element? { + guard let value: Element = randomElement() else { return nil } + + self.remove(value) + return value + } } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 616ab687f..6364f8d77 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -537,6 +537,11 @@ private final class JobQueue { } queue.mutate { $0.append(job) } + + // If this is a concurrent queue then we should immediately start the next job + guard executionType == .concurrent else { return } + + runNextJob() } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start diff --git a/SessionUtilitiesKit/Networking/BatchResponse.swift b/SessionUtilitiesKit/Networking/BatchResponse.swift index 78575a775..4b0e244e8 100644 --- a/SessionUtilitiesKit/Networking/BatchResponse.swift +++ b/SessionUtilitiesKit/Networking/BatchResponse.swift @@ -78,7 +78,7 @@ public extension Decodable { } } -public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error { +public extension Publisher where Output == (ResponseInfoType, Data?), Failure == Error { func decoded( as types: HTTP.BatchResponseTypes, requireAllResults: Bool = true,