From af073657a2ed0b7abb17650e4061b4b8f19cbd5e Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 3 Jun 2022 15:47:16 +1000 Subject: [PATCH] Cleaned up received message handling and a few bugs with duplicate message handling Updated the YDB to GRDB migrations to include some progress when importing swarms & interactions (ie. the slow parts we can't properly show progress for) Changed the MessageReceiveJob into a MessageHandlingJob (when receiving a message we now parse and store everything immediately to avoid a number of weird edge-cases) Fixed a bug where the Poller would drop a Snode when returning from the background because it's last request would generally time out Fixed a few bugs with invalid attachments Added the ability to retry downloading a failed attachment Added back the search results limit --- .../ConversationVC+Interaction.swift | 29 +- .../Content Views/MediaView.swift | 22 +- .../GlobalSearchViewController.swift | 2 - .../MediaGalleryViewModel.swift | 12 +- Session/Notifications/AppNotifications.swift | 16 +- Session/Utilities/BackgroundPoller.swift | 32 +-- .../Migrations/_003_YDBToGRDBMigration.swift | 75 +++--- .../Database/Models/Attachment.swift | 2 +- .../Models/ControlMessageProcessRecord.swift | 9 +- .../Jobs/Types/FailedMessageSendsJob.swift | 2 +- .../Jobs/Types/MessageReceiveJob.swift | 107 +++++--- .../Jobs/Types/MessageSendJob.swift | 51 +--- SessionMessagingKit/Messages/Message.swift | 253 ++++++++++++++++++ .../Errors/MessageReceiverError.swift | 4 +- .../MessageReceiver+Handling.swift | 6 +- .../Sending & Receiving/MessageReceiver.swift | 113 ++++---- .../Sending & Receiving/MessageSender.swift | 3 +- .../Pollers/ClosedGroupPoller.swift | 31 +-- .../Sending & Receiving/Pollers/Poller.swift | 50 ++-- .../SessionThreadViewModel.swift | 4 + .../NotificationServiceExtension.swift | 27 +- .../Migrations/_003_YDBToGRDBMigration.swift | 35 +-- .../Database/LegacyDatabase/SUKLegacy.swift | 2 +- SessionUtilitiesKit/JobRunner/JobRunner.swift | 7 +- SessionUtilitiesKit/Media/NSData+Image.m | 1 + 25 files changed, 577 insertions(+), 318 deletions(-) diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 9f7cdec20..945c7f319 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -713,15 +713,34 @@ extension ConversationVC: guard let mediaView = albumView.mediaView(forLocation: locationInAlbumView) else { return } switch mediaView.attachment.state { - case .pendingDownload, .downloading, .uploading: - // TODO: Tapped a failed incoming attachment - break + case .pendingDownload, .downloading, .uploading: break + + // Failed uploads should be handled via the "resend" process instead + case .failedUpload: break + + case .failedDownload: + let threadId: String = self.viewModel.threadData.threadId - case .failedDownload, .failedUpload: - // TODO: Tapped a failed incoming attachment + // Retry downloading the failed attachment + GRDBStorage.shared.writeAsync { db in + JobRunner.add( + db, + job: Job( + variant: .attachmentDownload, + threadId: threadId, + interactionId: cellViewModel.id, + details: AttachmentDownloadJob.Details( + attachmentId: mediaView.attachment.id + ) + ) + ) + } break default: + // Ignore invalid media + guard mediaView.attachment.isValid else { return } + let viewController: UIViewController? = MediaGalleryViewModel.createDetailViewController( for: self.viewModel.threadData.threadId, threadVariant: self.viewModel.threadData.threadVariant, diff --git a/Session/Conversations/Message Cells/Content Views/MediaView.swift b/Session/Conversations/Message Cells/Content Views/MediaView.swift index a22cce4c6..a194e6588 100644 --- a/Session/Conversations/Message Cells/Content Views/MediaView.swift +++ b/Session/Conversations/Message Cells/Content Views/MediaView.swift @@ -86,6 +86,10 @@ public class MediaView: UIView { configure(forError: .failed) return } + guard attachment.isValid else { + configure(forError: .invalid) + return + } if attachment.isAnimated { configureForAnimatedImage(attachment: attachment) @@ -144,6 +148,7 @@ public class MediaView: UIView { animatedImageView.layer.minificationFilter = .trilinear animatedImageView.layer.magnificationFilter = .trilinear animatedImageView.backgroundColor = Colors.unimportant + animatedImageView.isHidden = !attachment.isValid addSubview(animatedImageView) animatedImageView.autoPinEdgesToSuperviewEdges() _ = addUploadProgressIfNecessary(animatedImageView) @@ -159,10 +164,7 @@ public class MediaView: UIView { } strongSelf.tryToLoadMedia( loadMediaBlock: { applyMediaBlock in - guard attachment.isValid else { - Logger.warn("Ignoring invalid attachment.") - return - } + guard attachment.isValid else { return } guard let filePath: String = attachment.originalFilePath else { owsFailDebug("Attachment stream missing original file path.") return @@ -200,6 +202,7 @@ public class MediaView: UIView { stillImageView.layer.minificationFilter = .trilinear stillImageView.layer.magnificationFilter = .trilinear stillImageView.backgroundColor = Colors.unimportant + stillImageView.isHidden = !attachment.isValid addSubview(stillImageView) stillImageView.autoPinEdgesToSuperviewEdges() _ = addUploadProgressIfNecessary(stillImageView) @@ -213,10 +216,7 @@ public class MediaView: UIView { } self?.tryToLoadMedia( loadMediaBlock: { applyMediaBlock in - guard attachment.isValid else { - Logger.warn("Ignoring invalid attachment.") - return - } + guard attachment.isValid else { return } attachment.thumbnail( size: .large, @@ -254,6 +254,7 @@ public class MediaView: UIView { stillImageView.layer.minificationFilter = .trilinear stillImageView.layer.magnificationFilter = .trilinear stillImageView.backgroundColor = Colors.unimportant + stillImageView.isHidden = !attachment.isValid addSubview(stillImageView) stillImageView.autoPinEdgesToSuperviewEdges() @@ -276,10 +277,7 @@ public class MediaView: UIView { } self?.tryToLoadMedia( loadMediaBlock: { applyMediaBlock in - guard attachment.isValid else { - Logger.warn("Ignoring invalid attachment.") - return - } + guard attachment.isValid else { return } attachment.thumbnail( size: .medium, diff --git a/Session/Home/GlobalSearch/GlobalSearchViewController.swift b/Session/Home/GlobalSearch/GlobalSearchViewController.swift index 98ee2f928..6d41fe275 100644 --- a/Session/Home/GlobalSearch/GlobalSearchViewController.swift +++ b/Session/Home/GlobalSearch/GlobalSearchViewController.swift @@ -142,7 +142,6 @@ class GlobalSearchViewController: BaseVC, UITableViewDelegate, UITableViewDataSo let result: Result<[SectionModel], Error>? = GRDBStorage.shared.read { db -> Result<[SectionModel], Error> in do { let userPublicKey: String = getUserHexEncodedPublicKey(db) - let contactsAndGroupsResults: [SessionThreadViewModel] = try SessionThreadViewModel .contactsAndGroupsQuery( userPublicKey: userPublicKey, @@ -150,7 +149,6 @@ class GlobalSearchViewController: BaseVC, UITableViewDelegate, UITableViewDataSo searchTerm: searchText ) .fetchAll(db) - let messageResults: [SessionThreadViewModel] = try SessionThreadViewModel .messagesQuery( userPublicKey: userPublicKey, diff --git a/Session/Media Viewing & Editing/MediaGalleryViewModel.swift b/Session/Media Viewing & Editing/MediaGalleryViewModel.swift index 22f7249f6..86e2de343 100644 --- a/Session/Media Viewing & Editing/MediaGalleryViewModel.swift +++ b/Session/Media Viewing & Editing/MediaGalleryViewModel.swift @@ -322,13 +322,17 @@ public class MediaGalleryViewModel { .trackingConstantRegion { db -> [Item] in guard let interactionId: Int64 = interactionId else { return [] } + let attachment: TypedTableAlias = TypedTableAlias() let interaction: TypedTableAlias = TypedTableAlias() let interactionAttachment: TypedTableAlias = TypedTableAlias() return try Item .baseQuery( orderSQL: SQL(interactionAttachment[.albumIndex]), - baseFilterSQL: SQL("\(interaction[.id]) = \(interactionId)") + baseFilterSQL: SQL(""" + \(attachment[.isValid]) = true AND + \(interaction[.id]) = \(interactionId) + """) ) .fetchAll(db) } @@ -342,13 +346,17 @@ public class MediaGalleryViewModel { // but to avoid displaying stale data we re-fetch from the database anyway let maybeAlbumInfo: AlbumInfo? = GRDBStorage.shared .read { db -> AlbumInfo in + let attachment: TypedTableAlias = TypedTableAlias() let interaction: TypedTableAlias = TypedTableAlias() let interactionAttachment: TypedTableAlias = TypedTableAlias() let newAlbumData: [Item] = try Item .baseQuery( orderSQL: SQL(interactionAttachment[.albumIndex]), - baseFilterSQL: SQL("\(interaction[.id]) = \(interactionId)") + baseFilterSQL: SQL(""" + \(attachment[.isValid]) = true AND + \(interaction[.id]) = \(interactionId) + """) ) .fetchAll(db) diff --git a/Session/Notifications/AppNotifications.swift b/Session/Notifications/AppNotifications.swift index 48c792314..3c559c85e 100644 --- a/Session/Notifications/AppNotifications.swift +++ b/Session/Notifications/AppNotifications.swift @@ -125,15 +125,13 @@ public class NotificationPresenter: NSObject, NotificationsProtocol { AssertIsOnMainThread() switch notification.object { - case let incomingMessage as TSIncomingMessage: - Logger.debug("canceled notification for message: \(incomingMessage)") - if let identifier = incomingMessage.notificationIdentifier { - cancelNotification(identifier) - } else { - cancelNotifications(threadId: incomingMessage.uniqueThreadId) - } - default: - break + case let interaction as Interaction: + guard interaction.variant == .standardIncoming else { return } + + Logger.debug("canceled notification for message: \(interaction)") + cancelNotifications(identifiers: interaction.notificationIdentifiers) + + default: break } } diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index 4f0f0e329..4b4cd5166 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -73,27 +73,27 @@ public final class BackgroundPoller : NSObject { var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] messages.forEach { message in - guard let envelope = SNProtoEnvelope.from(message) else { return } - - // Extract the threadId and add that to the messageReceive job for - // multi-threading and garbage collection purposes - let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope) - do { - threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? []) - .appending( - MessageReceiveJob.Details.MessageInfo( - data: try envelope.serializedData(), - serverHash: message.info.hash, - serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000) - ) - ) + let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) + let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId) // Persist the received message after the MessageReceiveJob is created _ = try message.info.saved(db) + threadMessages[key] = (threadMessages[key] ?? []) + .appending(processedMessage?.messageInfo) } catch { - SNLog("Failed to deserialize envelope due to error: \(error).") + switch error { + // Ignore duplicate & selfSend message errors (and don't bother logging + // them as there will be a lot since we each service node duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + default: SNLog("Failed to deserialize envelope due to error: \(error).") + } } } @@ -105,7 +105,7 @@ public final class BackgroundPoller : NSObject { threadId: threadId, details: MessageReceiveJob.Details( messages: threadMessages, - isBackgroundPoll: false + isBackgroundPoll: true ) ) diff --git a/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift b/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift index 2b8cac9f7..715f99c68 100644 --- a/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift +++ b/SessionMessagingKit/Database/Migrations/_003_YDBToGRDBMigration.swift @@ -188,6 +188,20 @@ enum _003_YDBToGRDBMigration: Migration { SNLog("[Migration Info] \(target.key(with: self)) - Processing Interactions") + /// **Note:** There is no index on the collection column so unfortunately it takes the same amount of time to enumerate through all + /// collections as it does to just get the count of collections, due to this, if the database is very large, importing thecollections can be + /// very slow (~15s with 2,000,000 rows) - we want to show some kind of progress while enumerating so the below code creates a + /// very rought guess of the number of collections based on the file size of the database (this shouldn't affect most users at all) + let roughKbPerRow: CGFloat = 2.25 + let oldDatabaseSizeBytes: CGFloat = (try? FileManager.default + .attributesOfItem(atPath: SUKLegacy.legacyDatabaseFilepath)[.size] + .asType(CGFloat.self)) + .defaulting(to: 0) + let roughNumRows: CGFloat = ((oldDatabaseSizeBytes / 1024) / roughKbPerRow) + let startProgress: CGFloat = 0.04 + let interactionsCompleteProgress: CGFloat = 0.19 + var rowIndex: CGFloat = 0 + transaction.enumerateKeysAndObjects(inCollection: SMKLegacy.interactionCollection) { _, object, _ in guard let interaction: SMKLegacy._DBInteraction = object as? SMKLegacy._DBInteraction else { SNLog("[Migration Error] Unable to process interaction") @@ -197,8 +211,19 @@ enum _003_YDBToGRDBMigration: Migration { interactions[interaction.uniqueThreadId] = (interactions[interaction.uniqueThreadId] ?? []) .appending(interaction) + + rowIndex += 1 + + GRDBStorage.shared.update( + progress: min( + interactionsCompleteProgress, + ((rowIndex / roughNumRows) * (interactionsCompleteProgress - startProgress)) + ), + for: self, + in: target + ) } - GRDBStorage.shared.update(progress: 0.19, for: self, in: target) + GRDBStorage.shared.update(progress: interactionsCompleteProgress, for: self, in: target) // MARK: --Attachments @@ -1066,39 +1091,21 @@ enum _003_YDBToGRDBMigration: Migration { return } - // We need to extract the `threadId` from the legacyJob data as the new - // MessageReceiveJob requires it for multi-threading and garbage collection purposes - guard let envelope: SNProtoEnvelope = try? SNProtoEnvelope.parseData(legacyJob.data) else { + // We have changed how messageReceive jobs work - we now parse the message upon receipt and + // the MessageReceiveJob only does the handling - as a result we need to do the same behaviour + // here so we don't need to support the legacy behaviour + guard let processedMessage: ProcessedMessage = try? Message.processRawReceivedMessage(db, serializedData: legacyJob.data, serverHash: legacyJob.serverHash) else { return } - let threadId: String? - - switch envelope.type { - // For closed group messages the 'groupPublicKey' is stored in the - // 'envelope.source' value and that should be used for the 'threadId' - case .closedGroupMessage: - threadId = envelope.source - break - - default: - threadId = MessageReceiver.extractSenderPublicKey(db, from: envelope) - } - _ = try Job( failureCount: legacyJob.failureCount, variant: .messageReceive, behaviour: .runOnce, nextRunTimestamp: 0, - threadId: threadId, + threadId: processedMessage.threadId, details: MessageReceiveJob.Details( - messages: [ - MessageReceiveJob.Details.MessageInfo( - data: legacyJob.data, - serverHash: legacyJob.serverHash, - serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds) - ) - ], + messages: [processedMessage.messageInfo], isBackgroundPoll: legacyJob.isBackgroundPoll ) )?.inserted(db) @@ -1238,8 +1245,8 @@ enum _003_YDBToGRDBMigration: Migration { try autoreleasepool { try attachmentDownloadJobs.forEach { legacyJob in guard let interactionId: Int64 = legacyInteractionToIdMap[legacyJob.tsMessageID] else { - SNLog("[Migration Error] attachmentDownload job unable to find interaction") - throw StorageError.migrationFailed + SNLog("[Migration Warning] attachmentDownload job with no interaction found - ignoring") + return } guard processedAttachmentIds.contains(legacyJob.attachmentID) else { SNLog("[Migration Error] attachmentDownload job unable to find attachment") @@ -1422,7 +1429,7 @@ enum _003_YDBToGRDBMigration: Migration { return (attachmentVailidityInfo.isValid, attachmentVailidityInfo.duration) } - if stream.isVideo { + if stream.isVisualMedia { let attachmentVailidityInfo = Attachment.determineValidityAndDuration( contentType: stream.contentType, localRelativeFilePath: processedLocalRelativeFilePath, @@ -1432,10 +1439,6 @@ enum _003_YDBToGRDBMigration: Migration { return (attachmentVailidityInfo.isValid, attachmentVailidityInfo.duration) } - if stream.isVisualMedia { - return (stream.isValidVisualMedia, nil) - } - return (true, nil) }() @@ -1460,7 +1463,13 @@ enum _003_YDBToGRDBMigration: Migration { duration: duration, isValid: isValid, encryptionKey: legacyAttachment.encryptionKey, - digest: (legacyAttachment as? SMKLegacy._AttachmentStream)?.digest, + digest: { + switch legacyAttachment { + case let stream as SMKLegacy._AttachmentStream: return stream.digest + case let pointer as SMKLegacy._AttachmentPointer: return pointer.digest + default: return nil + } + }(), caption: legacyAttachment.caption ).inserted(db) diff --git a/SessionMessagingKit/Database/Models/Attachment.swift b/SessionMessagingKit/Database/Models/Attachment.swift index e8b08a686..dc1f7d662 100644 --- a/SessionMessagingKit/Database/Models/Attachment.swift +++ b/SessionMessagingKit/Database/Models/Attachment.swift @@ -662,7 +662,7 @@ extension Attachment { } // Process image attachments - if MIMETypeUtil.isImage(contentType) { + if MIMETypeUtil.isImage(contentType) || MIMETypeUtil.isAnimated(contentType) { return ( NSData.ows_isValidImage(atPath: targetPath, mimeType: contentType), nil diff --git a/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift b/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift index b2ed7024a..2d6e86a65 100644 --- a/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift +++ b/SessionMessagingKit/Database/Models/ControlMessageProcessRecord.swift @@ -64,19 +64,12 @@ public struct ControlMessageProcessRecord: Codable, FetchableRecord, Persistable public init?( threadId: String, message: Message, - serverExpirationTimestamp: TimeInterval?, - isRetry: Bool = false + serverExpirationTimestamp: TimeInterval? ) { // All `VisibleMessage` values will have an associated `Interaction` so just let // the unique constraints on that table prevent duplicate messages if message is VisibleMessage { return nil } - // TODO: Need to allow duplicates for call messages - - // If the message failed to process and we are retrying then there will already - // be a `ControlMessageProcessRecord`, so return nil to prevent the insertion - // causing a unique constraint violation - if isRetry { return nil } // Allow '.new' and 'encryptionKeyPair' closed group control message duplicates to avoid // the following situation: diff --git a/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift b/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift index 0729b8445..3b6086d3d 100644 --- a/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift +++ b/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift @@ -25,7 +25,7 @@ public enum FailedMessageSendsJob: JobExecutor { .filter(Attachment.Columns.state == Attachment.State.uploading) .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.failedUpload)) - Logger.debug("Marked \(changeCount) message\(changeCount == 1 ? "" : "s") as failed (\(attachmentChangeCount) upload\(attachmentChangeCount == 1 ? "" : "s") cancelled)") + SNLog("Marked \(changeCount) message\(changeCount == 1 ? "" : "s") as failed (\(attachmentChangeCount) upload\(attachmentChangeCount == 1 ? "" : "s") cancelled)") } success(job, false) diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index fc8076d96..a04d0fccb 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -32,46 +32,27 @@ public enum MessageReceiveJob: JobExecutor { for messageInfo in details.messages { do { - // Note: It generally shouldn't be possible for 'MessageReceiver.parse' to fail - // the main situation where this can happen is when the jobs run out of order (eg. - // a closed group message encrypted with a new key gets processed before the key - // gets added - this shouldn't be as possible with the updated JobRunner) - let isRetry: Bool = (job.failureCount > 0) - let (message, proto) = try MessageReceiver.parse( - db, - data: messageInfo.data, - serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, - isRetry: isRetry - ) - message.serverHash = messageInfo.serverHash - try MessageReceiver.handle( db, - message: message, - associatedWithProto: proto, + message: messageInfo.message, + associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData), openGroupId: nil, isBackgroundPoll: details.isBackgroundPoll ) } catch { - switch error { - // Note: This is the same as the 'MessageReceiverError.duplicateMessage' - // which is not retryable so just skip to the next message to process (no - // longer logging this because all de-duping happens here now rather than - // when parsing as it did previously - this change results in excessive - // logging which isn't useful) - case DatabaseError.SQLITE_CONSTRAINT_UNIQUE: continue - - default: break - } - // If the current message is a permanent failure then override it with the // new error (we want to retry if there is a single non-permanent error) switch error { - // Ignore self-send errors (they will be permanently failed but no need - // to log since we are going to have a lot of the due to the change to the - // de-duping logic) - case MessageReceiverError.selfSend: continue + // Ignore duplicate and self-send errors (these will usually be caught during + // parsing but sometimes can get past and conflict at database insertion - eg. + // for open group messages) we also don't bother logging as it results in + // excessive logging which isn't useful) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break case let receiverError as MessageReceiverError where !receiverError.isRetryable: SNLog("MessageReceiveJob permanently failed message due to error: \(error)") @@ -107,7 +88,7 @@ public enum MessageReceiveJob: JobExecutor { failure(updatedJob, error, true) case .some(let error): - failure(updatedJob, error, false) + failure(updatedJob, error, false) // TODO: Confirm the 'noKeyPair' errors here aren't an issue case .none: success(updatedJob, false) @@ -120,18 +101,64 @@ public enum MessageReceiveJob: JobExecutor { extension MessageReceiveJob { public struct Details: Codable { public struct MessageInfo: Codable { - public let data: Data - public let serverHash: String? - public let serverExpirationTimestamp: TimeInterval? + private enum CodingKeys: String, CodingKey { + case message + case variant + case serializedProtoData + } + + public let message: Message + public let variant: Message.Variant + public let serializedProtoData: Data public init( - data: Data, - serverHash: String?, - serverExpirationTimestamp: TimeInterval? + message: Message, + variant: Message.Variant, + proto: SNProtoContent + ) throws { + self.message = message + self.variant = variant + self.serializedProtoData = try proto.serializedData() + } + + private init( + message: Message, + variant: Message.Variant, + serializedProtoData: Data ) { - self.data = data - self.serverHash = serverHash - self.serverExpirationTimestamp = serverExpirationTimestamp + self.message = message + self.variant = variant + self.serializedProtoData = serializedProtoData + } + + // MARK: - Codable + + public init(from decoder: Decoder) throws { + let container: KeyedDecodingContainer = try decoder.container(keyedBy: CodingKeys.self) + + guard let variant: Message.Variant = try? container.decode(Message.Variant.self, forKey: .variant) else { + SNLog("Unable to decode messageReceive job due to missing variant") + throw StorageError.decodingFailed + } + + self = MessageInfo( + message: try variant.decode(from: container, forKey: .message), + variant: variant, + serializedProtoData: try container.decode(Data.self, forKey: .serializedProtoData) + ) + } + + public func encode(to encoder: Encoder) throws { + var container: KeyedEncodingContainer = encoder.container(keyedBy: CodingKeys.self) + + guard let variant: Message.Variant = Message.Variant(from: message) else { + SNLog("Unable to encode messageReceive job due to unsupported variant") + throw StorageError.objectNotFound + } + + try container.encode(message, forKey: .message) + try container.encode(variant, forKey: .variant) + try container.encode(serializedProtoData, forKey: .serializedProtoData) } } diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index 51fbe5ef6..44baf85e0 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -170,31 +170,15 @@ public enum MessageSendJob: JobExecutor { extension MessageSendJob { public struct Details: Codable { - // Note: This approach is less than ideal (since it needs to be manually maintained) but - // I couldn't think of an easy way to support a generic decoded type for the 'message' - // value in the database while using Codable - private static let supportedMessageTypes: [String: Message.Type] = [ - "VisibleMessage": VisibleMessage.self, - - "ReadReceipt": ReadReceipt.self, - "TypingIndicator": TypingIndicator.self, - "ClosedGroupControlMessage": ClosedGroupControlMessage.self, - "DataExtractionNotification": DataExtractionNotification.self, - "ExpirationTimerUpdate": ExpirationTimerUpdate.self, - "ConfigurationMessage": ConfigurationMessage.self, - "UnsendRequest": UnsendRequest.self, - "MessageRequestResponse": MessageRequestResponse.self - ] - private enum CodingKeys: String, CodingKey { - case interactionId case destination - case messageType case message + case variant } public let destination: Message.Destination public let message: Message + public let variant: Message.Variant? // MARK: - Initialization @@ -204,49 +188,36 @@ extension MessageSendJob { ) { self.destination = destination self.message = message + self.variant = Message.Variant(from: message) } + // MARK: - Codable + public init(from decoder: Decoder) throws { let container: KeyedDecodingContainer = try decoder.container(keyedBy: CodingKeys.self) - guard let messageType: String = try? container.decode(String.self, forKey: .messageType) else { - Logger.error("Unable to decode messageSend job due to missing messageType") + guard let variant: Message.Variant = try? container.decode(Message.Variant.self, forKey: .variant) else { + SNLog("Unable to decode messageSend job due to missing variant") throw StorageError.decodingFailed } - /// Note: This **MUST** be a `Codable.Type` rather than a `Message.Type` otherwise the decoding will result - /// in a `Message` object being returned rather than the desired subclass - guard let MessageType: Codable.Type = MessageSendJob.Details.supportedMessageTypes[messageType] else { - Logger.error("Unable to decode messageSend job due to unsupported messageType") - throw StorageError.decodingFailed - } - guard let message: Message = try MessageType.decoded(with: container, forKey: .message) as? Message else { - Logger.error("Unable to decode messageSend job due to message conversion issue") - throw StorageError.decodingFailed - } - self = Details( destination: try container.decode(Message.Destination.self, forKey: .destination), - message: message + message: try variant.decode(from: container, forKey: .message) ) } public func encode(to encoder: Encoder) throws { var container: KeyedEncodingContainer = encoder.container(keyedBy: CodingKeys.self) - let messageType: Codable.Type = type(of: message) - let maybeMessageTypeString: String? = MessageSendJob.Details.supportedMessageTypes - .first(where: { _, type in messageType == type })? - .key - - guard let messageTypeString: String = maybeMessageTypeString else { - Logger.error("Unable to encode messageSend job due to unsupported messageType") + guard let variant: Message.Variant = Message.Variant(from: message) else { + SNLog("Unable to encode messageSend job due to unsupported variant") throw StorageError.objectNotFound } try container.encode(destination, forKey: .destination) - try container.encode(messageTypeString, forKey: .messageType) try container.encode(message, forKey: .message) + try container.encode(variant, forKey: .variant) } } } diff --git a/SessionMessagingKit/Messages/Message.swift b/SessionMessagingKit/Messages/Message.swift index aeb747893..187fda7f4 100644 --- a/SessionMessagingKit/Messages/Message.swift +++ b/SessionMessagingKit/Messages/Message.swift @@ -2,6 +2,7 @@ import Foundation import GRDB +import SessionSnodeKit /// Abstract base class for `VisibleMessage` and `ControlMessage`. public class Message: Codable { @@ -76,6 +77,258 @@ public class Message: Codable { } } +// MARK: - Message Parsing/Processing + +public typealias ProcessedMessage = ( + threadId: String?, + proto: SNProtoContent, + messageInfo: MessageReceiveJob.Details.MessageInfo +) + +public extension Message { + static let nonThreadMessageId: String = "NON_THREAD_MESSAGE" + + enum Variant: String, Codable { + case readReceipt + case typingIndicator + case closedGroupControlMessage + case dataExtractionNotification + case expirationTimerUpdate + case configurationMessage + case unsendRequest + case messageRequestResponse + case visibleMessage + + init?(from type: Message) { + switch type { + case is ReadReceipt: self = .readReceipt + case is TypingIndicator: self = .typingIndicator + case is ClosedGroupControlMessage: self = .closedGroupControlMessage + case is DataExtractionNotification: self = .dataExtractionNotification + case is ExpirationTimerUpdate: self = .expirationTimerUpdate + case is ConfigurationMessage: self = .configurationMessage + case is UnsendRequest: self = .unsendRequest + case is MessageRequestResponse: self = .messageRequestResponse + case is VisibleMessage: self = .visibleMessage + default: return nil + } + } + + var messageType: Message.Type { + switch self { + case .readReceipt: return ReadReceipt.self + case .typingIndicator: return TypingIndicator.self + case .closedGroupControlMessage: return ClosedGroupControlMessage.self + case .dataExtractionNotification: return DataExtractionNotification.self + case .expirationTimerUpdate: return ExpirationTimerUpdate.self + case .configurationMessage: return ConfigurationMessage.self + case .unsendRequest: return UnsendRequest.self + case .messageRequestResponse: return MessageRequestResponse.self + case .visibleMessage: return VisibleMessage.self + } + } + + func decode(from container: KeyedDecodingContainer, forKey key: CodingKeys) throws -> Message { + switch self { + case .readReceipt: return try container.decode(ReadReceipt.self, forKey: key) + case .typingIndicator: return try container.decode(TypingIndicator.self, forKey: key) + + case .closedGroupControlMessage: + return try container.decode(ClosedGroupControlMessage.self, forKey: key) + + case .dataExtractionNotification: + return try container.decode(DataExtractionNotification.self, forKey: key) + + case .expirationTimerUpdate: return try container.decode(ExpirationTimerUpdate.self, forKey: key) + case .configurationMessage: return try container.decode(ConfigurationMessage.self, forKey: key) + case .unsendRequest: return try container.decode(UnsendRequest.self, forKey: key) + case .messageRequestResponse: return try container.decode(MessageRequestResponse.self, forKey: key) + case .visibleMessage: return try container.decode(VisibleMessage.self, forKey: key) + } + } + } + + static func createMessageFrom(_ proto: SNProtoContent, sender: String) -> Message? { + // Note: This array is ordered intentionally to ensure the correct types are processed + // and aren't parsed as the wrong type + let prioritisedVariants: [Variant] = [ + .readReceipt, + .typingIndicator, + .closedGroupControlMessage, + .dataExtractionNotification, + .expirationTimerUpdate, + .configurationMessage, + .unsendRequest, + .messageRequestResponse, + .visibleMessage + ] + + return prioritisedVariants + .reduce(nil) { prev, variant in + guard prev == nil else { return prev } + + return variant.messageType.fromProto(proto, sender: sender) + } + } + + static func processRawReceivedMessage( + _ db: Database, + rawMessage: SnodeReceivedMessage + ) throws -> ProcessedMessage? { + guard let envelope = SNProtoEnvelope.from(rawMessage) else { + throw MessageReceiverError.invalidMessage + } + + do { + let processedMessage: ProcessedMessage? = try processRawReceivedMessage( + db, + envelope: envelope, + serverExpirationTimestamp: (TimeInterval(rawMessage.info.expirationDateMs) / 1000), + serverHash: rawMessage.info.hash, + handleClosedGroupKeyUpdateMessages: true + ) + + // Retrieve the number of entries we have for the hash of this message + let numExistingHashes: Int = (try? SnodeReceivedMessageInfo + .filter(SnodeReceivedMessageInfo.Columns.hash == rawMessage.info.hash) + .fetchCount(db)) + .defaulting(to: 0) + + // Try to insert the raw message info into the database (used for both request paging and + // de-duping purposes) + _ = try rawMessage.info.inserted(db) + + // If the above insertion worked then we hadn't processed this message for this specific + // service node, but may have done so for another node - if the hash already existed in + // the database before we inserted it for this node then we can ignore this message as a + // duplicate + guard numExistingHashes == 0 else { throw MessageReceiverError.duplicateMessage } + + return processedMessage + } + catch { + // If we get 'selfSend' or 'duplicateControlMessage' errors then we still want to insert + // the SnodeReceivedMessageInfo to prevent retrieving and attempting to process the same + // message again (as well as ensure the next poll doesn't retrieve the same message) + switch error { + case MessageReceiverError.selfSend, MessageReceiverError.duplicateControlMessage: + _ = try? rawMessage.info.inserted(db) + break + + default: break + } + + throw error + } + } + + static func processRawReceivedMessage( + _ db: Database, + serializedData: Data, + serverHash: String? + ) throws -> ProcessedMessage? { + guard let envelope = try? SNProtoEnvelope.parseData(serializedData) else { + throw MessageReceiverError.invalidMessage + } + + return try processRawReceivedMessage( + db, + envelope: envelope, + serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds), + serverHash: serverHash, + handleClosedGroupKeyUpdateMessages: true + ) + } + + /// This method behaves slightly differently from the other `processRawReceivedMessage` methods as it doesn't + /// insert the "message info" for deduping (we want the poller to re-process the message) and also avoids handling any + /// closed group key update messages (the `NotificationServiceExtension` does this itself) + static func processRawReceivedMessageAsNotification( + _ db: Database, + envelope: SNProtoEnvelope + ) throws -> ProcessedMessage? { + let processedMessage: ProcessedMessage? = try processRawReceivedMessage( + db, + envelope: envelope, + serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds), + serverHash: nil, + handleClosedGroupKeyUpdateMessages: false + ) + + return processedMessage + } + + private static func processRawReceivedMessage( + _ db: Database, + envelope: SNProtoEnvelope, + serverExpirationTimestamp: TimeInterval, + serverHash: String?, + // TODO: These + openGroupId: String? = nil, + openGroupMessageServerId: UInt64? = nil, + handleClosedGroupKeyUpdateMessages: Bool + ) throws -> ProcessedMessage? { + let (message, proto, threadId) = try MessageReceiver.parse( + db, + envelope: envelope, + serverExpirationTimestamp: serverExpirationTimestamp, + openGroupId: openGroupId, + openGroupMessageServerId: openGroupMessageServerId + ) + message.serverHash = serverHash + + // Ignore invalid messages and hashes for messages we have previously handled + guard let variant: Message.Variant = Message.Variant(from: message) else { + throw MessageReceiverError.invalidMessage + } + + /// **Note:** We want to immediately handle any `ClosedGroupControlMessage` with the kind `encryptionKeyPair` as + /// we need the keyPair in storage in order to be able to parse and messages which were signed with the new key (also no need to add + /// these as jobs as they will be fully handled in here) + if handleClosedGroupKeyUpdateMessages { + switch message { + case let closedGroupControlMessage as ClosedGroupControlMessage: + switch closedGroupControlMessage.kind { + case .encryptionKeyPair: + try MessageReceiver.handleClosedGroupControlMessage(db, closedGroupControlMessage) + return nil + + default: break + } + + default: break + } + } + + // Prevent ControlMessages from being handled multiple times if not supported + do { + try ControlMessageProcessRecord( + threadId: threadId, + message: message, + serverExpirationTimestamp: serverExpirationTimestamp + )?.insert(db) + } + catch { + // We want to custom handle this + if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error { + throw MessageReceiverError.duplicateControlMessage + } + + throw error + } + + return ( + threadId, + proto, + try MessageReceiveJob.Details.MessageInfo( + message: message, + variant: variant, + proto: proto + ) + ) + } +} + // MARK: - Mutation internal extension Message { diff --git a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift index 9fcc17398..2d94b8946 100644 --- a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift +++ b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift @@ -4,6 +4,7 @@ import Foundation public enum MessageReceiverError: LocalizedError { case duplicateMessage + case duplicateControlMessage case invalidMessage case unknownMessage case unknownEnvelopeType @@ -20,7 +21,7 @@ public enum MessageReceiverError: LocalizedError { public var isRetryable: Bool { switch self { - case .duplicateMessage, .invalidMessage, .unknownMessage, .unknownEnvelopeType, + case .duplicateMessage, .duplicateControlMessage, .invalidMessage, .unknownMessage, .unknownEnvelopeType, .invalidSignature, .noData, .senderBlocked, .noThread, .selfSend, .decryptionFailed: return false @@ -31,6 +32,7 @@ public enum MessageReceiverError: LocalizedError { public var errorDescription: String? { switch self { case .duplicateMessage: return "Duplicate message." + case .duplicateControlMessage: return "Duplicate control message." case .invalidMessage: return "Invalid message." case .unknownMessage: return "Unknown message type." case .unknownEnvelopeType: return "Unknown envelope type." diff --git a/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift b/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift index 2a091b7a9..9a679f252 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageReceiver+Handling.swift @@ -924,7 +924,11 @@ extension MessageReceiver { ).insert(db) } catch { - return SNLog("Ignoring duplicate closed group encryption key pair.") + if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error { + return SNLog("Ignoring duplicate closed group encryption key pair.") + } + + throw error } SNLog("Received a new closed group encryption key pair.") diff --git a/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift b/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift index 312448d0a..c43122d3f 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift @@ -10,18 +10,14 @@ public enum MessageReceiver { public static func parse( _ db: Database, - data: Data, + envelope: SNProtoEnvelope, serverExpirationTimestamp: TimeInterval?, - openGroupId: String? = nil, - openGroupMessageServerId: UInt64? = nil, - isRetry: Bool = false - ) throws -> (Message, SNProtoContent) { + openGroupId: String?, + openGroupMessageServerId: UInt64? + ) throws -> (Message, SNProtoContent, String) { let userPublicKey: String = getUserHexEncodedPublicKey() let isOpenGroupMessage: Bool = (openGroupMessageServerId != nil) - // Parse the envelope - let envelope = try SNProtoEnvelope.parseData(data) - // Decrypt the contents guard let ciphertext = envelope.content else { throw MessageReceiverError.noData } @@ -118,69 +114,50 @@ public enum MessageReceiver { } // Parse the message - let message: Message? = { - if let readReceipt = ReadReceipt.fromProto(proto, sender: sender) { return readReceipt } - if let typingIndicator = TypingIndicator.fromProto(proto, sender: sender) { return typingIndicator } - if let closedGroupControlMessage = ClosedGroupControlMessage.fromProto(proto, sender: sender) { return closedGroupControlMessage } - if let dataExtractionNotification = DataExtractionNotification.fromProto(proto, sender: sender) { return dataExtractionNotification } - if let expirationTimerUpdate = ExpirationTimerUpdate.fromProto(proto, sender: sender) { return expirationTimerUpdate } - if let configurationMessage = ConfigurationMessage.fromProto(proto, sender: sender) { return configurationMessage } - if let unsendRequest = UnsendRequest.fromProto(proto, sender: sender) { return unsendRequest } - if let messageRequestResponse = MessageRequestResponse.fromProto(proto, sender: sender) { return messageRequestResponse } - if let visibleMessage = VisibleMessage.fromProto(proto, sender: sender) { return visibleMessage } - return nil - }() + guard let message: Message = Message.createMessageFrom(proto, sender: sender) else { + throw MessageReceiverError.unknownMessage + } - if let message = message { - // Ignore self sends if needed - if !message.isSelfSendValid { - guard sender != userPublicKey else { throw MessageReceiverError.selfSend } - } - - // Guard against control messages in open groups - if isOpenGroupMessage { - guard message is VisibleMessage else { throw MessageReceiverError.invalidMessage } - } - - // Finish parsing - message.sender = sender - message.recipient = userPublicKey - message.sentTimestamp = envelope.timestamp - message.receivedTimestamp = UInt64((Date().timeIntervalSince1970) * 1000) - message.groupPublicKey = groupPublicKey - message.openGroupServerMessageId = openGroupMessageServerId - - // Validate - var isValid: Bool = message.isValid - if message is VisibleMessage && !isValid && proto.dataMessage?.attachments.isEmpty == false { - isValid = true - } + // Ignore self sends if needed + guard message.isSelfSendValid || sender != userPublicKey else { + throw MessageReceiverError.selfSend + } + + // Guard against control messages in open groups + guard !isOpenGroupMessage || message is VisibleMessage else { + throw MessageReceiverError.invalidMessage + } + + // Finish parsing + message.sender = sender + message.recipient = userPublicKey + message.sentTimestamp = envelope.timestamp + message.receivedTimestamp = UInt64((Date().timeIntervalSince1970) * 1000) + message.groupPublicKey = groupPublicKey + message.openGroupServerMessageId = openGroupMessageServerId + + // Validate + var isValid: Bool = message.isValid + if message is VisibleMessage && !isValid && proto.dataMessage?.attachments.isEmpty == false { + isValid = true + } + + guard isValid else { + throw MessageReceiverError.invalidMessage + } + + // Extract the proper threadId for the message + let threadId: String = { + if let groupPublicKey: String = groupPublicKey { return groupPublicKey } + if let openGroupId: String = openGroupId { return openGroupId } - guard isValid else { - throw MessageReceiverError.invalidMessage + switch message { + case let message as VisibleMessage: return (message.syncTarget ?? sender) + case let message as ExpirationTimerUpdate: return (message.syncTarget ?? sender) + default: return sender } - - // Prevent ControlMessages from being handled multiple times if not supported - try ControlMessageProcessRecord( - threadId: { - if let groupPublicKey: String = groupPublicKey { return groupPublicKey } - if let openGroupId: String = openGroupId { return openGroupId } - - switch message { - case let message as VisibleMessage: return (message.syncTarget ?? sender) - case let message as ExpirationTimerUpdate: return (message.syncTarget ?? sender) - default: return sender - } - }(), - message: message, - serverExpirationTimestamp: serverExpirationTimestamp, - isRetry: false - )?.insert(db) - - // Return - return (message, proto) - } + }() - throw MessageReceiverError.unknownMessage + return (message, proto, threadId) } } diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index eb6672145..f757e2c29 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -468,8 +468,7 @@ public final class MessageSender { } }(), message: message, - serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds), - isRetry: false + serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds) )?.insert(db) // Sync the message if: diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index 9eadf1d85..2a6ab401c 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -174,29 +174,22 @@ public final class ClosedGroupPoller: NSObject { var jobDetailMessages: [MessageReceiveJob.Details.MessageInfo] = [] messages.forEach { message in - guard let envelope = SNProtoEnvelope.from(message) else { return } - do { - let serialisedData: Data = try envelope.serializedData() - _ = try message.info.inserted(db) + let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) - // Ignore hashes for messages we have previously handled - guard try SnodeReceivedMessageInfo.filter(SnodeReceivedMessageInfo.Columns.hash == message.info.hash).fetchCount(db) == 1 else { - throw MessageReceiverError.duplicateMessage - } - - jobDetailMessages.append( - MessageReceiveJob.Details.MessageInfo( - data: serialisedData, - serverHash: message.info.hash, - serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000) - ) - ) + jobDetailMessages = jobDetailMessages + .appending(processedMessage?.messageInfo) } catch { switch error { - // Ignore duplicate messages - case .SQLITE_CONSTRAINT_UNIQUE, MessageReceiverError.duplicateMessage: break + // Ignore duplicate & selfSend message errors (and don't bother logging + // them as there will be a lot since we each service node duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + default: SNLog("Failed to deserialize envelope due to error: \(error).") } } @@ -218,7 +211,7 @@ public final class ClosedGroupPoller: NSObject { ) } - SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (\(messages.count - messageCount) duplicates)") + SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(messages.count - messageCount))") } } .map { _ in } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 4aac480a2..bfc130d5e 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -109,6 +109,9 @@ public final class Poller { if let error = error as? Error, error == .pollLimitReached { self?.pollCount = 0 } + else if UserDefaults.sharedLokiProject?[.isMainAppActive] != true { + // Do nothing when an error gets throws right after returning from the background (happens frequently) + } else { SNLog("Polling \(nextSnode) failed; dropping it and switching to next snode.") SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, publicKey: userPublicKey) @@ -123,7 +126,7 @@ public final class Poller { private func poll(_ snode: Snode, seal longTermSeal: Resolver) -> Promise { guard isPolling.wrappedValue else { return Promise { $0.fulfill(()) } } - let userPublicKey = getUserHexEncodedPublicKey() + let userPublicKey: String = getUserHexEncodedPublicKey() return SnodeAPI.getMessages(from: snode, associatedWith: userPublicKey) .then(on: Threading.pollerQueue) { [weak self] messages -> Promise in @@ -136,43 +139,26 @@ public final class Poller { var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] messages.forEach { message in - guard let envelope = SNProtoEnvelope.from(message) else { return } - - // Extract the threadId and add that to the messageReceive job for - // multi-threading and garbage collection purposes - let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope) - - if threadId == nil { - // TODO: I assume a configuration message doesn't need a 'threadId' (confirm this and set the 'requiresThreadId' requirement accordingly) - // TODO: Does the configuration message come through here???? - print("RAWR WHAT CASES LETS THIS BE NIL????") - } - do { - let serialisedData: Data = try envelope.serializedData() - _ = try message.info.inserted(db) + let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) + let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId) - // Ignore hashes for messages we have previously handled - guard try SnodeReceivedMessageInfo.filter(SnodeReceivedMessageInfo.Columns.hash == message.info.hash).fetchCount(db) == 1 else { - throw MessageReceiverError.duplicateMessage } - threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? []) - .appending( - MessageReceiveJob.Details.MessageInfo( - data: serialisedData, - serverHash: message.info.hash, - serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000) - ) - ) + threadMessages[key] = (threadMessages[key] ?? []) + .appending(processedMessage?.messageInfo) } catch { switch error { - // Ignore duplicate messages - case .SQLITE_CONSTRAINT_UNIQUE, MessageReceiverError.duplicateMessage: break - - default: - SNLog("Failed to deserialize envelope due to error: \(error).") + // Ignore duplicate & selfSend message errors (and don't bother logging + // them as there will be a lot since we each service node duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + default: SNLog("Failed to deserialize envelope due to error: \(error).") } } } @@ -197,7 +183,7 @@ public final class Poller { } } - SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (\(messages.count - messageCount) duplicates)") + SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (duplicates: \(messages.count - messageCount))") } self?.pollCount += 1 diff --git a/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift b/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift index 51c288269..1267a2a71 100644 --- a/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift +++ b/SessionMessagingKit/Shared Models/SessionThreadViewModel.swift @@ -710,6 +710,8 @@ public extension SessionThreadViewModel { // MARK: - Search Queries public extension SessionThreadViewModel { + fileprivate static let searchResultsLimit: Int = 500 + static func searchTermParts(_ searchTerm: String) -> [String] { /// Process the search term in order to extract the parts of the search pattern we want /// @@ -836,6 +838,7 @@ public extension SessionThreadViewModel { ) ORDER BY \(Column.rank), \(interaction[.timestampMs].desc) + LIMIT \(SQL("\(SessionThreadViewModel.searchResultsLimit)")) """ return request.adapted { db in @@ -1194,6 +1197,7 @@ public extension SessionThreadViewModel { \(ViewModel.closedGroupNameKey), \(ViewModel.openGroupNameKey), \(ViewModel.threadIdKey) + LIMIT \(SQL("\(SessionThreadViewModel.searchResultsLimit)")) """ // Construct the actual request diff --git a/SessionNotificationServiceExtension/NotificationServiceExtension.swift b/SessionNotificationServiceExtension/NotificationServiceExtension.swift index 77055d422..b22daa151 100644 --- a/SessionNotificationServiceExtension/NotificationServiceExtension.swift +++ b/SessionNotificationServiceExtension/NotificationServiceExtension.swift @@ -35,8 +35,11 @@ public final class NotificationServiceExtension : UNNotificationServiceExtension } } let notificationContent = self.notificationContent! - guard let base64EncodedData = notificationContent.userInfo["ENCRYPTED_DATA"] as! String?, let data = Data(base64Encoded: base64EncodedData), - let envelope = try? MessageWrapper.unwrap(data: data), let envelopeAsData = try? envelope.serializedData() else { + guard + let base64EncodedData: String = notificationContent.userInfo["ENCRYPTED_DATA"] as? String, + let data: Data = Data(base64Encoded: base64EncodedData), + let envelope = try? MessageWrapper.unwrap(data: data) + else { return self.handleFailure(for: notificationContent) } @@ -45,14 +48,20 @@ public final class NotificationServiceExtension : UNNotificationServiceExtension // is added to notification center GRDBStorage.shared.write { db in do { - let (message, proto) = try MessageReceiver.parse( - db, - data: envelopeAsData, - serverExpirationTimestamp: (Date().timeIntervalSince1970 + ControlMessageProcessRecord.defaultExpirationSeconds) - ) - switch message { + guard let processedMessage: ProcessedMessage = try Message.processRawReceivedMessageAsNotification(db, envelope: envelope) else { + self.handleFailure(for: notificationContent) + return + } + + switch processedMessage.messageInfo.message { case let visibleMessage as VisibleMessage: - let interactionId: Int64 = try MessageReceiver.handleVisibleMessage(db, message: visibleMessage, associatedWithProto: proto, openGroupId: nil, isBackgroundPoll: false) + let interactionId: Int64 = try MessageReceiver.handleVisibleMessage( + db, + message: visibleMessage, + associatedWithProto: processedMessage.proto, + openGroupId: nil, + isBackgroundPoll: false + ) // Remove the notifications if there is an outgoing messages from a linked device if diff --git a/SessionSnodeKit/Database/Migrations/_003_YDBToGRDBMigration.swift b/SessionSnodeKit/Database/Migrations/_003_YDBToGRDBMigration.swift index 2919396f3..7fad36bc4 100644 --- a/SessionSnodeKit/Database/Migrations/_003_YDBToGRDBMigration.swift +++ b/SessionSnodeKit/Database/Migrations/_003_YDBToGRDBMigration.swift @@ -78,20 +78,21 @@ enum _003_YDBToGRDBMigration: Migration { // MARK: --Swarms - // Note: There is no index on the collection column so unfortunately it takes the same amount of - // time to enumerate through all collections as it does to just get the count of collections, as - // a result if the database is very large this part can be slow (~15s with 2,000,000 rows) - we - // want to show some kind of progress while doing this enumeration so the below code includes a - // number of rough values to show some kind of progression while the enumeration occurs (most users - // won't run into issues with this at all) - var swarmCollections: Set = [] + /// **Note:** There is no index on the collection column so unfortunately it takes the same amount of time to enumerate through all + /// collections as it does to just get the count of collections, due to this, if the database is very large, importing thecollections can be + /// very slow (~15s with 2,000,000 rows) - we want to show some kind of progress while enumerating so the below code creates a + /// very rought guess of the number of collections based on the file size of the database (this shouldn't affect most users at all) + let roughMbPerCollection: CGFloat = 2.5 + let oldDatabaseSizeBytes: CGFloat = (try? FileManager.default + .attributesOfItem(atPath: SUKLegacy.legacyDatabaseFilepath)[.size] + .asType(CGFloat.self)) + .defaulting(to: 0) + let roughNumCollections: CGFloat = (((oldDatabaseSizeBytes / 1024) / 1024) / roughMbPerCollection) let startProgress: CGFloat = 0.02 let swarmCompleteProgress: CGFloat = 0.90 - let interEnumerationMaxProgress: CGFloat = ((swarmCompleteProgress - startProgress) * 0.8) - let maxCollectionsEstimate: CGFloat = 1000 - let numCollectionsToTriggerProgressUpdate: CGFloat = 20 + var swarmCollections: Set = [] var collectionIndex: CGFloat = 0 - var oldProgress: CGFloat = startProgress + transaction.enumerateCollections { collectionName, _ in if collectionName.starts(with: SSKLegacy.swarmCollectionPrefix) { swarmCollections.insert(collectionName.substring(from: SSKLegacy.swarmCollectionPrefix.count)) @@ -99,10 +100,14 @@ enum _003_YDBToGRDBMigration: Migration { collectionIndex += 1 - if collectionIndex.truncatingRemainder(dividingBy: numCollectionsToTriggerProgressUpdate) == 0 { - oldProgress = (startProgress + (interEnumerationMaxProgress * (collectionIndex / maxCollectionsEstimate))) - GRDBStorage.shared.update(progress: oldProgress, for: self, in: target) - } + GRDBStorage.shared.update( + progress: min( + swarmCompleteProgress, + ((collectionIndex / roughNumCollections) * (swarmCompleteProgress - startProgress)) + ), + for: self, + in: target + ) } GRDBStorage.shared.update(progress: swarmCompleteProgress, for: self, in: target) diff --git a/SessionUtilitiesKit/Database/LegacyDatabase/SUKLegacy.swift b/SessionUtilitiesKit/Database/LegacyDatabase/SUKLegacy.swift index a691334df..26cfa11a4 100644 --- a/SessionUtilitiesKit/Database/LegacyDatabase/SUKLegacy.swift +++ b/SessionUtilitiesKit/Database/LegacyDatabase/SUKLegacy.swift @@ -25,7 +25,7 @@ public enum SUKLegacy { // MARK: - Database Functions - private static var legacyDatabaseFilepath: String { + public static var legacyDatabaseFilepath: String { let sharedDirUrl: URL = URL(fileURLWithPath: OWSFileSystem.appSharedDataDirectoryPath()) return sharedDirUrl diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 4f15da4d6..7b2073f3c 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -63,7 +63,12 @@ public final class JobRunner { ) let messageReceiveQueue: JobQueue = JobQueue( type: .messageReceive, - executionType: .concurrent, // Allow as many jobs to run at once as supported by the device + // Explicitly serial as executing concurrently means message receives getting processed at + // different speeds which can result in: + // • Small batches of messages appearing in the UI before larger batches + // • Closed group messages encrypted with updated keys could start parsing before it's key + // update message has been processed (ie. guaranteed to fail) + executionType: .serial, qos: .default, jobVariants: [ jobVariants.remove(.messageReceive) diff --git a/SessionUtilitiesKit/Media/NSData+Image.m b/SessionUtilitiesKit/Media/NSData+Image.m index dde75950b..9d2747809 100644 --- a/SessionUtilitiesKit/Media/NSData+Image.m +++ b/SessionUtilitiesKit/Media/NSData+Image.m @@ -15,6 +15,7 @@ typedef NS_ENUM(NSInteger, ImageFormat) { ImageFormat_Bmp, }; +// FIXME: Refactor all of these to be in Swift against 'Data' @implementation NSData (Image) + (BOOL)ows_isValidImageAtPath:(NSString *)filePath