diff --git a/Session/Notifications/SyncPushTokensJob.swift b/Session/Notifications/SyncPushTokensJob.swift index 5ca477831..b9d3cdf3a 100644 --- a/Session/Notifications/SyncPushTokensJob.swift +++ b/Session/Notifications/SyncPushTokensJob.swift @@ -21,8 +21,11 @@ public enum SyncPushTokensJob: JobExecutor { failure: @escaping (Job, Error?, Bool) -> (), deferred: @escaping (Job) -> () ) { - // Don't run when inactive or not in main app - guard (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) else { + // Don't run when inactive or not in main app or if the user doesn't exist yet + guard + (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false), + Identity.userExists() + else { deferred(job) // Don't need to do anything if it's not the main app return } diff --git a/Session/Onboarding/Onboarding.swift b/Session/Onboarding/Onboarding.swift index acdf75b68..9d4913b2f 100644 --- a/Session/Onboarding/Onboarding.swift +++ b/Session/Onboarding/Onboarding.swift @@ -26,16 +26,72 @@ enum Onboarding { .tryFlatMap { swarm -> AnyPublisher in guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } - return CurrentUserPoller.poll( - namespaces: [.configUserProfile], - from: snode, - for: userPublicKey, - on: DispatchQueue.global(qos: .userInitiated), - // Note: These values mean the received messages will be - // processed immediately rather than async as part of a Job - calledFromBackgroundPoller: true, - isBackgroundPollValid: { true } - ) + return CurrentUserPoller + .poll( + namespaces: [.configUserProfile], + from: snode, + for: userPublicKey, + on: DispatchQueue.global(qos: .userInitiated), + // Note: These values mean the received messages will be + // processed immediately rather than async as part of a Job + calledFromBackgroundPoller: true, + isBackgroundPollValid: { true } + ) + .tryFlatMap { receivedMessageTypes -> AnyPublisher in + // FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough + guard !receivedMessageTypes.isEmpty else { + return Just(()) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } + + SNLog("Onboarding failed to retrieve user config, checking for legacy config") + + return CurrentUserPoller + .poll( + namespaces: [.default], + from: snode, + for: userPublicKey, + on: DispatchQueue.global(qos: .userInitiated), + // Note: These values mean the received messages will be + // processed immediately rather than async as part of a Job + calledFromBackgroundPoller: true, + isBackgroundPollValid: { true } + ) + .tryMap { receivedMessageTypes -> Void in + guard + let message: ConfigurationMessage = receivedMessageTypes + .last(where: { $0 is ConfigurationMessage }) + .asType(ConfigurationMessage.self), + let displayName: String = message.displayName + else { return () } + + // Handle user profile changes + Storage.shared.write { db in + try ProfileManager.updateProfileIfNeeded( + db, + publicKey: userPublicKey, + name: displayName, + avatarUpdate: { + guard + let profilePictureUrl: String = message.profilePictureUrl, + let profileKey: Data = message.profileKey + else { return .none } + + return .updateTo( + url: profilePictureUrl, + key: profileKey, + fileName: nil + ) + }(), + sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000), + calledFromConfigHandling: false + ) + } + return () + } + .eraseToAnyPublisher() + } } .flatMap { _ -> AnyPublisher in Storage.shared.readPublisher(receiveOn: DispatchQueue.global(qos: .userInitiated)) { db in diff --git a/Session/Onboarding/PNModeVC.swift b/Session/Onboarding/PNModeVC.swift index 8906f75cb..6bc15d8d2 100644 --- a/Session/Onboarding/PNModeVC.swift +++ b/Session/Onboarding/PNModeVC.swift @@ -176,7 +176,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate { // If we don't have one then show a loading indicator and try to retrieve the existing name ModalActivityIndicatorViewController.present(fromViewController: self) { viewController in Onboarding.profileNamePublisher - .timeout(.seconds(10), scheduler: DispatchQueue.main, customError: { HTTPError.timeout }) + .timeout(.seconds(15), scheduler: DispatchQueue.main, customError: { HTTPError.timeout }) .catch { _ -> AnyPublisher in SNLog("Onboarding failed to retrieve existing profile information") return Just(nil) diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index b5391a23a..d468e8c0c 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -67,7 +67,7 @@ public final class BackgroundPoller { return SnodeAPI.getSwarm(for: userPublicKey) .subscribeOnMain(immediately: true) .receiveOnMain(immediately: true) - .tryFlatMap { swarm -> AnyPublisher in + .tryFlatMap { swarm -> AnyPublisher<[Message], Error> in guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } return CurrentUserPoller.poll( @@ -79,6 +79,7 @@ public final class BackgroundPoller { isBackgroundPollValid: { BackgroundPoller.isValid } ) } + .map { _ in () } .eraseToAnyPublisher() } @@ -101,7 +102,7 @@ public final class BackgroundPoller { SnodeAPI.getSwarm(for: groupPublicKey) .subscribeOnMain(immediately: true) .receiveOnMain(immediately: true) - .tryFlatMap { swarm -> AnyPublisher in + .tryFlatMap { swarm -> AnyPublisher<[Message], Error> in guard let snode: Snode = swarm.randomElement() else { throw OnionRequestAPIError.insufficientSnodes } @@ -115,6 +116,7 @@ public final class BackgroundPoller { isBackgroundPollValid: { BackgroundPoller.isValid } ) } + .map { _ in () } .eraseToAnyPublisher() } } diff --git a/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift b/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift index 481a5f59e..3890df232 100644 --- a/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift +++ b/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift @@ -83,6 +83,12 @@ public struct ControlMessageProcessRecord: Codable, FetchableRecord, Persistable // message handling to make sure the messages are for the same ongoing call if message is CallMessage { return nil } + // We don't want to do any de-duping for SharedConfigMessages as libSession will handle + // the deduping for us, it also gives libSession more options to potentially recover from + // invalid data, conflicts or even process new changes which weren't supported from older + // versions of the library as it will always re-process messages + if message is SharedConfigMessage { return nil } + // Allow '.new' and 'encryptionKeyPair' closed group control message duplicates to avoid // the following situation: // • The app performed a background poll or received a push notification @@ -103,7 +109,7 @@ public struct ControlMessageProcessRecord: Codable, FetchableRecord, Persistable case is ClosedGroupControlMessage: return .closedGroupControlMessage case is DataExtractionNotification: return .dataExtractionNotification case is ExpirationTimerUpdate: return .expirationTimerUpdate - case is ConfigurationMessage, is SharedConfigMessage: return .configurationMessage // TODO: Confirm this is desired + case is ConfigurationMessage, is SharedConfigMessage: return .configurationMessage case is UnsendRequest: return .unsendRequest case is MessageRequestResponse: return .messageRequestResponse case is CallMessage: return .call diff --git a/SessionMessagingKit/Database/Models/SharedConfigDump.swift b/SessionMessagingKit/Database/Models/SharedConfigDump.swift index db2fae388..8468dcf74 100644 --- a/SessionMessagingKit/Database/Models/SharedConfigDump.swift +++ b/SessionMessagingKit/Database/Models/SharedConfigDump.swift @@ -66,7 +66,9 @@ public extension ConfigDump { } public extension ConfigDump.Variant { - static let userVariants: [ConfigDump.Variant] = [ .userProfile, .contacts, .convoInfoVolatile, .userGroups ] + static let userVariants: [ConfigDump.Variant] = [ + .userProfile, .contacts, .convoInfoVolatile, .userGroups + ] var configMessageKind: SharedConfigMessage.Kind { switch self { @@ -85,4 +87,15 @@ public extension ConfigDump.Variant { case .userGroups: return SnodeAPI.Namespace.configUserGroups } } + + /// This value defines the order that the SharedConfigMessages should be processed in, while we re-process config + /// messages every time we poll this will prevent an edge-case where we have `convoInfoVolatile` data related + /// to a new conversation which hasn't been created yet because it's associated `contacts`/`userGroups` message + /// hasn't yet been processed (without this we would have to wait until the next poll for it to be processed correctly) + var processingOrder: Int { + switch self { + case .userProfile, .contacts, .userGroups: return 0 + case .convoInfoVolatile: return 1 + } + } } diff --git a/SessionMessagingKit/LibSessionUtil/SessionUtil.swift b/SessionMessagingKit/LibSessionUtil/SessionUtil.swift index b16038aa2..dedc595f8 100644 --- a/SessionMessagingKit/LibSessionUtil/SessionUtil.swift +++ b/SessionMessagingKit/LibSessionUtil/SessionUtil.swift @@ -317,13 +317,13 @@ public enum SessionUtil { guard !messages.isEmpty else { return } guard !publicKey.isEmpty else { throw MessageReceiverError.noThread } - let groupedMessages: [SharedConfigMessage.Kind: [SharedConfigMessage]] = messages - .grouped(by: \.kind) - + let groupedMessages: [ConfigDump.Variant: [SharedConfigMessage]] = messages + .grouped(by: \.kind.configDumpVariant) // Merge the config messages into the current state let mergeResults: [ConfigDump.Variant: IncomingConfResult] = groupedMessages + .sorted { lhs, rhs in lhs.key.processingOrder < rhs.key.processingOrder } .reduce(into: [:]) { result, next in - let key: ConfigKey = ConfigKey(variant: next.key.configDumpVariant, publicKey: publicKey) + let key: ConfigKey = ConfigKey(variant: next.key, publicKey: publicKey) let atomicConf: Atomic?> = ( SessionUtil.configStore.wrappedValue[key] ?? Atomic(nil) @@ -340,8 +340,8 @@ public enum SessionUtil { var mergeData: [UnsafePointer?] = next.value .map { message -> [UInt8] in message.data.bytes } .unsafeCopy() - var mergeSize: [Int] = messages.map { $0.data.count } - config_merge(conf, &mergeData, &mergeSize, messages.count) + var mergeSize: [Int] = next.value.map { $0.data.count } + config_merge(conf, &mergeData, &mergeSize, next.value.count) mergeData.forEach { $0?.deallocate() } // Get the state of this variant @@ -350,7 +350,7 @@ public enum SessionUtil { } // Return the current state of the config - result[next.key.configDumpVariant] = IncomingConfResult( + result[next.key] = IncomingConfResult( needsPush: needsPush, needsDump: needsDump, messageHashes: messageHashes, diff --git a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/Info.plist b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/Info.plist index c9a2d9b3e..97310ed4d 100644 --- a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/Info.plist +++ b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/Info.plist @@ -6,30 +6,30 @@ LibraryIdentifier - ios-arm64 + ios-arm64_x86_64-simulator LibraryPath libsession-util.a SupportedArchitectures arm64 + x86_64 SupportedPlatform ios + SupportedPlatformVariant + simulator LibraryIdentifier - ios-arm64_x86_64-simulator + ios-arm64 LibraryPath libsession-util.a SupportedArchitectures arm64 - x86_64 SupportedPlatform ios - SupportedPlatformVariant - simulator CFBundlePackageType diff --git a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64/libsession-util.a b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64/libsession-util.a index 3f7bee486..2ad2f5c11 100644 Binary files a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64/libsession-util.a and b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64/libsession-util.a differ diff --git a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64_x86_64-simulator/libsession-util.a b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64_x86_64-simulator/libsession-util.a index de85c6780..496ccbf15 100644 Binary files a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64_x86_64-simulator/libsession-util.a and b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/ios-arm64_x86_64-simulator/libsession-util.a differ diff --git a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/contacts.h b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/contacts.h index 76fd8acf5..a8303b41c 100644 --- a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/contacts.h +++ b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/contacts.h @@ -9,6 +9,9 @@ extern "C" { #include "profile_pic.h" #include "util.h" +// Maximum length of a contact name/nickname, in bytes (not including the null terminator). +extern const size_t CONTACT_MAX_NAME_LENGTH; + typedef struct contacts_contact { char session_id[67]; // in hex; 66 hex chars + null terminator. diff --git a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/profile_pic.h b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/profile_pic.h index 590df2117..204c87318 100644 --- a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/profile_pic.h +++ b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/profile_pic.h @@ -6,6 +6,9 @@ extern "C" { #include +// Maximum length of the profile pic URL (not including the null terminator) +extern const size_t PROFILE_PIC_MAX_URL_LENGTH; + typedef struct user_profile_pic { // Null-terminated C string containing the uploaded URL of the pic. Will be length 0 if there // is no profile pic. diff --git a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/user_groups.h b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/user_groups.h index a4897da6f..232e67977 100644 --- a/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/user_groups.h +++ b/SessionMessagingKit/LibSessionUtil/libsession-util.xcframework/session/config/user_groups.h @@ -7,15 +7,26 @@ extern "C" { #include "base.h" #include "util.h" +// Maximum length of a group name, in bytes +extern const size_t GROUP_NAME_MAX_LENGTH; + +// Maximum length of a community full URL +extern const size_t COMMUNITY_URL_MAX_LENGTH; + +// Maximum length of a community room token +extern const size_t COMMUNITY_ROOM_MAX_LENGTH; + typedef struct ugroups_legacy_group_info { char session_id[67]; // in hex; 66 hex chars + null terminator. - char name[101]; // Null-terminated C string (human-readable). Max length is 511. Will always - // be set (even if an empty string). + char name[101]; // Null-terminated C string (human-readable). Max length is 100 (plus 1 for + // null). Will always be set (even if an empty string). bool have_enc_keys; // Will be true if we have an encryption keypair, false if not. - unsigned char enc_pubkey[32]; // If `have_enc_keys`, this is the 32-byte pubkey - unsigned char enc_seckey[32]; // If `have_enc_keys`, this is the 32-byte secret key + unsigned char enc_pubkey[32]; // If `have_enc_keys`, this is the 32-byte pubkey (no NULL + // terminator). + unsigned char enc_seckey[32]; // If `have_enc_keys`, this is the 32-byte secret key (no NULL + // terminator). int64_t disappearing_timer; // Minutes. 0 == disabled. bool hidden; // true if hidden from the convo list diff --git a/SessionMessagingKit/Messages/Message.swift b/SessionMessagingKit/Messages/Message.swift index 2f2767632..4c6333c45 100644 --- a/SessionMessagingKit/Messages/Message.swift +++ b/SessionMessagingKit/Messages/Message.swift @@ -209,6 +209,10 @@ public extension Message { handleClosedGroupKeyUpdateMessages: true ) + // Ensure we actually want to de-dupe messages for this namespace, otherwise just + // succeed early + guard rawMessage.namespace.shouldDedupeMessages else { return processedMessage } + // Retrieve the number of entries we have for the hash of this message let numExistingHashes: Int = (try? SnodeReceivedMessageInfo .filter(SnodeReceivedMessageInfo.Columns.hash == rawMessage.info.hash) diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index 22143d133..6b409b121 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -1088,21 +1088,19 @@ public final class MessageSender { if let message = message as? VisibleMessage { message.syncTarget = publicKey } if let message = message as? ExpirationTimerUpdate { message.syncTarget = publicKey } - Storage.shared.write { db in - JobRunner.add( - db, - job: Job( - variant: .messageSend, - threadId: threadId, - interactionId: interactionId, - details: MessageSendJob.Details( - destination: .contact(publicKey: currentUserPublicKey), - message: message, - isSyncMessage: true - ) + JobRunner.add( + db, + job: Job( + variant: .messageSend, + threadId: threadId, + interactionId: interactionId, + details: MessageSendJob.Details( + destination: .contact(publicKey: currentUserPublicKey), + message: message, + isSyncMessage: true ) ) - } + ) } } } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 230218237..799c9f0a5 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -88,7 +88,7 @@ public class Poller { getSnodeForPolling(for: publicKey) .subscribe(on: Threading.pollerQueue) - .flatMap { snode -> AnyPublisher in + .flatMap { snode -> AnyPublisher<[Message], Error> in Poller.poll( namespaces: namespaces, from: snode, @@ -126,7 +126,7 @@ public class Poller { self?.getSnodeForPolling(for: publicKey) .subscribe(on: Threading.pollerQueue) - .flatMap { snode -> AnyPublisher in + .flatMap { snode -> AnyPublisher<[Message], Error> in Poller.poll( namespaces: namespaces, from: snode, @@ -177,6 +177,11 @@ public class Poller { } } + /// Polls the specified namespaces and processes any messages, returning an array of messages that were + /// successfully processed + /// + /// **Note:** The returned messages will have already been processed by the `Poller`, they are only returned + /// for cases where we need explicit/custom behaviours to occur (eg. Onboarding) public static func poll( namespaces: [SnodeAPI.Namespace], from snode: Snode, @@ -185,13 +190,13 @@ public class Poller { calledFromBackgroundPoller: Bool = false, isBackgroundPollValid: @escaping (() -> Bool) = { true }, poller: Poller? = nil - ) -> AnyPublisher { + ) -> AnyPublisher<[Message], Error> { // If the polling has been cancelled then don't continue guard (calledFromBackgroundPoller && isBackgroundPollValid()) || poller?.isPolling.wrappedValue[publicKey] == true else { - return Just(()) + return Just([]) .setFailureType(to: Error.self) .eraseToAnyPublisher() } @@ -210,26 +215,25 @@ public class Poller { from: snode, associatedWith: publicKey ) - .flatMap { namespacedResults -> AnyPublisher in + .flatMap { namespacedResults -> AnyPublisher<[Message], Error> in guard (calledFromBackgroundPoller && isBackgroundPollValid()) || poller?.isPolling.wrappedValue[publicKey] == true else { - return Just(()) + return Just([]) .setFailureType(to: Error.self) .eraseToAnyPublisher() } - let allMessagesCount: Int = namespacedResults - .map { $0.value.data?.messages.count ?? 0 } - .reduce(0, +) + let allMessages: [SnodeReceivedMessage] = namespacedResults + .compactMap { _, result -> [SnodeReceivedMessage]? in result.data?.messages } + .flatMap { $0 } // No need to do anything if there are no messages - guard allMessagesCount > 0 else { - if !calledFromBackgroundPoller { - SNLog("Received no new messages in \(pollerName)") - } - return Just(()) + guard !allMessages.isEmpty else { + if !calledFromBackgroundPoller { SNLog("Received no new messages in \(pollerName)") } + + return Just([]) .setFailureType(to: Error.self) .eraseToAnyPublisher() } @@ -237,90 +241,92 @@ public class Poller { // Otherwise process the messages and add them to the queue for handling let lastHashes: [String] = namespacedResults .compactMap { $0.value.data?.lastHash } + let otherKnownHashes: [String] = namespacedResults + .filter { $0.key.shouldDedupeMessages } + .compactMap { $0.value.data?.messages.map { $0.info.hash } } + .reduce([], +) var messageCount: Int = 0 + var processedMessages: [Message] = [] var hadValidHashUpdate: Bool = false var jobsToRun: [Job] = [] Storage.shared.write { db in - namespacedResults.forEach { namespace, result in - result.data?.messages - .compactMap { message -> ProcessedMessage? in - do { - return try Message.processRawReceivedMessage(db, rawMessage: message) - } - catch { - switch error { - // Ignore duplicate & selfSend message errors (and don't bother logging - // them as there will be a lot since we each service node duplicates messages) - case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, - MessageReceiverError.duplicateMessage, - MessageReceiverError.duplicateControlMessage, - MessageReceiverError.selfSend: - break - - case MessageReceiverError.duplicateMessageNewSnode: - hadValidHashUpdate = true - break - - case DatabaseError.SQLITE_ABORT: - // In the background ignore 'SQLITE_ABORT' (it generally means - // the BackgroundPoller has timed out - if !calledFromBackgroundPoller { - SNLog("Failed to the database being suspended (running in background with no background task).") - } - break - - default: SNLog("Failed to deserialize envelope due to error: \(error).") - } - - return nil - } + allMessages + .compactMap { message -> ProcessedMessage? in + do { + return try Message.processRawReceivedMessage(db, rawMessage: message) } - .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } - .forEach { threadId, threadMessages in - messageCount += threadMessages.count - - let jobToRun: Job? = Job( - variant: .messageReceive, - behaviour: .runOnce, - threadId: threadId, - details: MessageReceiveJob.Details( - messages: threadMessages.map { $0.messageInfo }, - calledFromBackgroundPoller: calledFromBackgroundPoller - ) - ) - jobsToRun = jobsToRun.appending(jobToRun) + catch { + switch error { + // Ignore duplicate & selfSend message errors (and don't bother logging + // them as there will be a lot since we each service node duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + case MessageReceiverError.duplicateMessageNewSnode: + hadValidHashUpdate = true + break + + case DatabaseError.SQLITE_ABORT: + // In the background ignore 'SQLITE_ABORT' (it generally means + // the BackgroundPoller has timed out + if !calledFromBackgroundPoller { + SNLog("Failed to the database being suspended (running in background with no background task).") + } + break + + default: SNLog("Failed to deserialize envelope due to error: \(error).") + } - // If we are force-polling then add to the JobRunner so they are - // persistent and will retry on the next app run if they fail but - // don't let them auto-start - JobRunner.add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller) + return nil } - } + } + .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } + .forEach { threadId, threadMessages in + messageCount += threadMessages.count + processedMessages += threadMessages.map { $0.messageInfo.message } + + let jobToRun: Job? = Job( + variant: .messageReceive, + behaviour: .runOnce, + threadId: threadId, + details: MessageReceiveJob.Details( + messages: threadMessages.map { $0.messageInfo }, + calledFromBackgroundPoller: calledFromBackgroundPoller + ) + ) + jobsToRun = jobsToRun.appending(jobToRun) + + // If we are force-polling then add to the JobRunner so they are + // persistent and will retry on the next app run if they fail but + // don't let them auto-start + JobRunner.add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller) + } // Clean up message hashes and add some logs about the poll results - if allMessagesCount == 0 && !hadValidHashUpdate { + if allMessages.isEmpty && !hadValidHashUpdate { if !calledFromBackgroundPoller { - SNLog("Received \(allMessagesCount) new message\(allMessagesCount == 1 ? "" : "s"), all duplicates - marking the hash we polled with as invalid") + SNLog("Received \(allMessages.count) new message\(allMessages.count == 1 ? "" : "s"), all duplicates - marking the hash we polled with as invalid") } // Update the cached validity of the messages try SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( db, potentiallyInvalidHashes: lastHashes, - otherKnownValidHashes: namespacedResults - .compactMap { $0.value.data?.messages.map { $0.info.hash } } - .reduce([], +) + otherKnownValidHashes: otherKnownHashes ) } else if !calledFromBackgroundPoller { - SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in \(pollerName) (duplicates: \(allMessagesCount - messageCount))") + SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in \(pollerName) (duplicates: \(allMessages.count - messageCount))") } } // If we aren't runing in a background poller then just finish immediately guard calledFromBackgroundPoller else { - return Just(()) + return Just(processedMessages) .setFailureType(to: Error.self) .eraseToAnyPublisher() } @@ -345,7 +351,7 @@ public class Poller { } ) .collect() - .map { _ in () } + .map { _ in processedMessages } .eraseToAnyPublisher() } .eraseToAnyPublisher() diff --git a/SessionMessagingKit/Utilities/ProfileManager.swift b/SessionMessagingKit/Utilities/ProfileManager.swift index def1a651a..d2b7b368e 100644 --- a/SessionMessagingKit/Utilities/ProfileManager.swift +++ b/SessionMessagingKit/Utilities/ProfileManager.swift @@ -533,6 +533,7 @@ public struct ProfileManager { // Profile picture & profile key var avatarNeedsDownload: Bool = false + var targetAvatarUrl: String? = nil let shouldUpdateAvatar: Bool = { guard isCurrentUser else { return true } @@ -568,6 +569,7 @@ public struct ProfileManager { profileChanges.append(Profile.Columns.profilePictureUrl.set(to: url)) profileChanges.append(Profile.Columns.profileEncryptionKey.set(to: key)) avatarNeedsDownload = true + targetAvatarUrl = url } // Profile filename (this isn't synchronized between devices) @@ -618,7 +620,9 @@ public struct ProfileManager { // Download the profile picture if needed guard avatarNeedsDownload else { return } - db.afterNextTransactionNested { db in + let dedupeIdentifier: String = "AvatarDownload-\(publicKey)-\(targetAvatarUrl ?? "remove")" + + db.afterNextTransactionNestedOnce(dedupeIdentifier: dedupeIdentifier) { db in // Need to refetch to ensure the db changes have occurred ProfileManager.downloadAvatar(for: Profile.fetchOrCreate(db, id: publicKey)) } diff --git a/SessionSnodeKit/Models/SnodeMessage.swift b/SessionSnodeKit/Models/SnodeMessage.swift index fea1d5bf7..69cff264e 100644 --- a/SessionSnodeKit/Models/SnodeMessage.swift +++ b/SessionSnodeKit/Models/SnodeMessage.swift @@ -53,10 +53,7 @@ extension SnodeMessage { public func encode(to encoder: Encoder) throws { var container: KeyedEncodingContainer = encoder.container(keyedBy: CodingKeys.self) - try container.encode( - (Features.useTestnet ? recipient.removingIdPrefixIfNeeded() : recipient), - forKey: .recipient - ) + try container.encode(recipient, forKey: .recipient) try container.encode(data, forKey: .data) try container.encode(ttl, forKey: .ttl) try container.encode(timestampMs, forKey: .timestampMs) diff --git a/SessionSnodeKit/Models/SnodeReceivedMessage.swift b/SessionSnodeKit/Models/SnodeReceivedMessage.swift index a4659cb78..6e20921d1 100644 --- a/SessionSnodeKit/Models/SnodeReceivedMessage.swift +++ b/SessionSnodeKit/Models/SnodeReceivedMessage.swift @@ -9,6 +9,7 @@ public struct SnodeReceivedMessage: CustomDebugStringConvertible { public static let defaultExpirationSeconds: Int64 = ((15 * 24 * 60 * 60) * 1000) public let info: SnodeReceivedMessageInfo + public let namespace: SnodeAPI.Namespace public let data: Data init?( @@ -29,6 +30,7 @@ public struct SnodeReceivedMessage: CustomDebugStringConvertible { hash: rawMessage.hash, expirationDateMs: (rawMessage.expiration ?? SnodeReceivedMessage.defaultExpirationSeconds) ) + self.namespace = namespace self.data = data } diff --git a/SessionSnodeKit/Networking/SnodeAPI.swift b/SessionSnodeKit/Networking/SnodeAPI.swift index 7489e4011..2f5e1d289 100644 --- a/SessionSnodeKit/Networking/SnodeAPI.swift +++ b/SessionSnodeKit/Networking/SnodeAPI.swift @@ -264,17 +264,13 @@ public final class SnodeAPI { } SNLog("Getting swarm for: \((publicKey == getUserHexEncodedPublicKey()) ? "self" : publicKey).") - let targetPublicKey: String = (Features.useTestnet ? - publicKey.removingIdPrefixIfNeeded() : - publicKey - ) return getRandomSnode() .flatMap { snode in SnodeAPI.send( request: SnodeRequest( endpoint: .getSwarm, - body: GetSwarmRequest(pubkey: targetPublicKey) + body: GetSwarmRequest(pubkey: publicKey) ), to: snode, associatedWith: publicKey, @@ -305,16 +301,14 @@ public final class SnodeAPI { } let userX25519PublicKey: String = getUserHexEncodedPublicKey() - let targetPublicKey: String = (Features.useTestnet ? - publicKey.removingIdPrefixIfNeeded() : - publicKey - ) return Just(()) .setFailureType(to: Error.self) .map { _ -> [SnodeAPI.Namespace: String] in namespaces .reduce(into: [:]) { result, namespace in + guard namespace.shouldDedupeMessages else { return } + // Prune expired message hashes for this namespace on this service node SnodeReceivedMessageInfo.pruneExpiredMessageHashInfo( for: snode, @@ -375,7 +369,7 @@ public final class SnodeAPI { request: SnodeRequest( endpoint: .getMessages, body: LegacyGetMessagesRequest( - pubkey: targetPublicKey, + pubkey: publicKey, lastHash: (namespaceLastHash[namespace] ?? ""), namespace: namespace, maxCount: nil, @@ -393,7 +387,7 @@ public final class SnodeAPI { body: GetMessagesRequest( lastHash: (namespaceLastHash[namespace] ?? ""), namespace: namespace, - pubkey: targetPublicKey, + pubkey: publicKey, subkey: nil, // TODO: Need to get this timestampMs: UInt64(SnodeAPI.currentOffsetTimestampMs()), ed25519PublicKey: userED25519KeyPair.publicKey, @@ -462,11 +456,6 @@ public final class SnodeAPI { associatedWith publicKey: String, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<(info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?), Error> { - let targetPublicKey: String = (Features.useTestnet ? - publicKey.removingIdPrefixIfNeeded() : - publicKey - ) - return Deferred { Future { resolver in // Prune expired message hashes for this namespace on this service node @@ -495,7 +484,7 @@ public final class SnodeAPI { request: SnodeRequest( endpoint: .getMessages, body: LegacyGetMessagesRequest( - pubkey: targetPublicKey, + pubkey: publicKey, lastHash: (lastHash ?? ""), namespace: namespace, maxCount: nil, @@ -522,7 +511,7 @@ public final class SnodeAPI { body: GetMessagesRequest( lastHash: (lastHash ?? ""), namespace: namespace, - pubkey: targetPublicKey, + pubkey: publicKey, subkey: nil, timestampMs: UInt64(SnodeAPI.currentOffsetTimestampMs()), ed25519PublicKey: userED25519KeyPair.publicKey, @@ -566,10 +555,7 @@ public final class SnodeAPI { in namespace: Namespace, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> { - let publicKey: String = (Features.useTestnet ? - message.recipient.removingIdPrefixIfNeeded() : - message.recipient - ) + let publicKey: String = message.recipient let userX25519PublicKey: String = getUserHexEncodedPublicKey() let sendTimestamp: UInt64 = UInt64(SnodeAPI.currentOffsetTimestampMs()) @@ -657,10 +643,7 @@ public final class SnodeAPI { } let userX25519PublicKey: String = getUserHexEncodedPublicKey() - let publicKey: String = (Features.useTestnet ? - recipient.removingIdPrefixIfNeeded() : - recipient - ) + let publicKey: String = recipient var requests: [SnodeAPI.BatchRequest.Info] = targetedMessages .map { message, namespace in // Check if this namespace requires authentication @@ -749,11 +732,6 @@ public final class SnodeAPI { .eraseToAnyPublisher() } - let publicKey: String = (Features.useTestnet ? - publicKey.removingIdPrefixIfNeeded() : - publicKey - ) - return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) .tryFlatMap { swarm -> AnyPublisher<[String: (hashes: [String], expiry: UInt64)], Error> in @@ -801,11 +779,6 @@ public final class SnodeAPI { .eraseToAnyPublisher() } - let publicKey: String = (Features.useTestnet ? - publicKey.removingIdPrefixIfNeeded() : - publicKey - ) - return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) .tryFlatMap { swarm -> AnyPublisher in @@ -855,10 +828,6 @@ public final class SnodeAPI { .eraseToAnyPublisher() } - let publicKey: String = (Features.useTestnet ? - publicKey.removingIdPrefixIfNeeded() : - publicKey - ) let userX25519PublicKey: String = getUserHexEncodedPublicKey() return getSwarm(for: publicKey) diff --git a/SessionSnodeKit/Types/SnodeAPINamespace.swift b/SessionSnodeKit/Types/SnodeAPINamespace.swift index 84ee98a3e..4d75897f7 100644 --- a/SessionSnodeKit/Types/SnodeAPINamespace.swift +++ b/SessionSnodeKit/Types/SnodeAPINamespace.swift @@ -31,6 +31,24 @@ public extension SnodeAPI { } } + /// This flag indicates whether we should dedupe messages from the specified namespace, when `true` we will + /// store a `SnodeReceivedMessageInfo` record for the message and check for a matching record whenever + /// we receive a message from this namespace + /// + /// **Note:** An additional side-effect of this flag is that when we poll for messages from the specified namespace + /// we will always retrieve **all** messages from the namespace (instead of just new messages since the last one + /// we have seen) + public var shouldDedupeMessages: Bool { + switch self { + case .`default`, .legacyClosedGroup: return true + + case .configUserProfile, .configContacts, + .configConvoInfoVolatile, .configUserGroups, + .configClosedGroupInfo: + return false + } + } + var verificationString: String { switch self { case .`default`: return "" diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index ed058a229..c6928b245 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -143,7 +143,7 @@ public final class JobRunner { guard canStartJob else { return } // Start the job runner if needed - db.afterNextTransactionNested { _ in + db.afterNextTransactionNestedOnce(dedupeIdentifier: "JobRunner-Start: \(updatedJob.variant)") { _ in queues.wrappedValue[updatedJob.variant]?.start() } } @@ -166,7 +166,7 @@ public final class JobRunner { guard canStartJob else { return } // Start the job runner if needed - db.afterNextTransactionNested { _ in + db.afterNextTransactionNestedOnce(dedupeIdentifier: "JobRunner-Start: \(job.variant)") { _ in queues.wrappedValue[job.variant]?.start() } }