From 3b772b7f902f690f4d12f876a3d5c06ba0a07a14 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Tue, 30 May 2023 17:45:46 +1000 Subject: [PATCH] [WIP] started adding logic to ignore messages invalidated by config Created a ConfigMessageReceiveJob just to clean up the logs a bit Updated the poller to make any MessageReceive jobs dependant on any ConfigMessageReceive jobs which are created Updated legacy groups to delete the group content when you are removed Fixed an issue where the JobRunner wouldn't stop pending jobs while clearing data Fixed another issue with the profile view in the message cell --- Session.xcodeproj/project.pbxproj | 4 + .../Message Cells/VisibleMessageCell.swift | 8 +- Session/Settings/NukeDataModal.swift | 6 ++ SessionMessagingKit/Configuration.swift | 1 + .../Migrations/_013_SessionUtilChanges.swift | 2 + .../_014_GenerateInitialUserConfigDumps.swift | 13 ++- .../Database/Models/ConfigDump.swift | 8 +- .../Jobs/Types/ConfigMessageReceiveJob.swift | 63 ++++++++++++++ .../Jobs/Types/MessageReceiveJob.swift | 23 +++-- .../SharedConfigMessage.swift | 5 +- .../MessageReceiver+ClosedGroups.swift | 75 ++++++++++------- .../Sending & Receiving/Pollers/Poller.swift | 84 +++++++++++++++++-- .../Config Handling/SessionUtil+Shared.swift | 37 +++++++- .../SessionUtil+UserGroups.swift | 4 +- .../SessionUtil+UserProfile.swift | 4 +- .../SessionUtil/SessionUtil.swift | 34 +++++--- .../Shared Models/MessageViewModel.swift | 12 +++ SessionUtilitiesKit/Database/Models/Job.swift | 5 ++ SessionUtilitiesKit/JobRunner/JobRunner.swift | 11 ++- 19 files changed, 322 insertions(+), 77 deletions(-) create mode 100644 SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index d515422c3..cfa0cdded 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -587,6 +587,7 @@ FD2B4AFD294688D000AB4848 /* SessionUtil+Contacts.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4AFC294688D000AB4848 /* SessionUtil+Contacts.swift */; }; FD2B4AFF2946C93200AB4848 /* ConfigurationSyncJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */; }; FD2B4B042949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4B032949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift */; }; + FD3003662A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD3003652A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift */; }; FD368A6829DE8F9C000DBF1E /* _012_AddFTSIfNeeded.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD368A6729DE8F9B000DBF1E /* _012_AddFTSIfNeeded.swift */; }; FD368A6A29DE9E30000DBF1E /* UIContextualAction+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD368A6929DE9E30000DBF1E /* UIContextualAction+Utilities.swift */; }; FD37E9C328A1C6F3003AE748 /* ThemeManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD37E9C228A1C6F3003AE748 /* ThemeManager.swift */; }; @@ -1725,6 +1726,7 @@ FD2B4AFC294688D000AB4848 /* SessionUtil+Contacts.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SessionUtil+Contacts.swift"; sourceTree = ""; }; FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConfigurationSyncJob.swift; sourceTree = ""; }; FD2B4B032949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "QueryInterfaceRequest+Utilities.swift"; sourceTree = ""; }; + FD3003652A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConfigMessageReceiveJob.swift; sourceTree = ""; }; FD368A6729DE8F9B000DBF1E /* _012_AddFTSIfNeeded.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = _012_AddFTSIfNeeded.swift; sourceTree = ""; }; FD368A6929DE9E30000DBF1E /* UIContextualAction+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "UIContextualAction+Utilities.swift"; sourceTree = ""; }; FD37E9C228A1C6F3003AE748 /* ThemeManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThemeManager.swift; sourceTree = ""; }; @@ -4296,6 +4298,7 @@ FDD2506F2837199200198BDA /* GarbageCollectionJob.swift */, C352A2FE25574B6300338F3E /* MessageSendJob.swift */, C352A31225574F5200338F3E /* MessageReceiveJob.swift */, + FD3003652A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift */, C352A32E2557549C00338F3E /* NotifyPushServerJob.swift */, FDF0B74E28079E5E004C14C5 /* SendReadReceiptsJob.swift */, C352A348255781F400338F3E /* AttachmentDownloadJob.swift */, @@ -5729,6 +5732,7 @@ C3471ECB2555356A00297E91 /* MessageSender+Encryption.swift in Sources */, FDF40CDE2897A1BC006A0CC4 /* _004_RemoveLegacyYDB.swift in Sources */, FDF0B74928060D13004C14C5 /* QuotedReplyModel.swift in Sources */, + FD3003662A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift in Sources */, 7B81682C28B72F480069F315 /* PendingChange.swift in Sources */, FD77289A284AF1BD0018502F /* Sodium+Utilities.swift in Sources */, FD5C7309285007920029977D /* BlindedIdLookup.swift in Sources */, diff --git a/Session/Conversations/Message Cells/VisibleMessageCell.swift b/Session/Conversations/Message Cells/VisibleMessageCell.swift index a89b3d80e..79b137e5d 100644 --- a/Session/Conversations/Message Cells/VisibleMessageCell.swift +++ b/Session/Conversations/Message Cells/VisibleMessageCell.swift @@ -299,8 +299,14 @@ final class VisibleMessageCell: MessageCell, TappableLabelDelegate { ) // Profile picture view (should always be handled as a standard 'contact' profile picture) + let profileShouldBeVisible: Bool = ( + cellViewModel.canHaveProfile && + cellViewModel.shouldShowProfile && + cellViewModel.profile != nil + ) profilePictureViewLeadingConstraint.constant = (isGroupThread ? VisibleMessageCell.groupThreadHSpacing : 0) - profilePictureView.isHidden = (!cellViewModel.shouldShowProfile || cellViewModel.profile == nil) + profilePictureView.isHidden = !cellViewModel.canHaveProfile + profilePictureView.alpha = (profileShouldBeVisible ? 1 : 0) profilePictureView.update( publicKey: cellViewModel.authorId, threadVariant: .contact, // Always show the display picture in 'contact' mode diff --git a/Session/Settings/NukeDataModal.swift b/Session/Settings/NukeDataModal.swift index 2d8ceeaff..1564f2441 100644 --- a/Session/Settings/NukeDataModal.swift +++ b/Session/Settings/NukeDataModal.swift @@ -229,6 +229,12 @@ final class NukeDataModal: Modal { PushNotificationAPI.unregister(data).sinkUntilComplete() } + /// Stop and cancel all current jobs (don't want to inadvertantly have a job store data after it's table has already been cleared) + /// + /// **Note:** This is file as long as this process kills the app, if it doesn't then we need an alternate mechanism to flag that + /// the `JobRunner` is allowed to start it's queues again + JobRunner.stopAndClearPendingJobs() + // Clear the app badge and notifications AppEnvironment.shared.notificationPresenter.clearAllNotifications() CurrentAppContext().setMainAppBadgeNumber(0) diff --git a/SessionMessagingKit/Configuration.swift b/SessionMessagingKit/Configuration.swift index de6575edb..c6d92487c 100644 --- a/SessionMessagingKit/Configuration.swift +++ b/SessionMessagingKit/Configuration.swift @@ -58,5 +58,6 @@ public enum SNMessagingKit { // Just to make the external API nice JobRunner.add(executor: GroupLeavingJob.self, for: .groupLeaving) JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload) JobRunner.add(executor: ConfigurationSyncJob.self, for: .configurationSync) + JobRunner.add(executor: ConfigMessageReceiveJob.self, for: .configMessageReceive) } } diff --git a/SessionMessagingKit/Database/Migrations/_013_SessionUtilChanges.swift b/SessionMessagingKit/Database/Migrations/_013_SessionUtilChanges.swift index 8445948f9..2412e0970 100644 --- a/SessionMessagingKit/Database/Migrations/_013_SessionUtilChanges.swift +++ b/SessionMessagingKit/Database/Migrations/_013_SessionUtilChanges.swift @@ -154,6 +154,8 @@ enum _013_SessionUtilChanges: Migration { .indexed() t.column(.data, .blob) .notNull() + t.column(.timestampMs, .integer) + .notNull() t.primaryKey([.variant, .publicKey]) } diff --git a/SessionMessagingKit/Database/Migrations/_014_GenerateInitialUserConfigDumps.swift b/SessionMessagingKit/Database/Migrations/_014_GenerateInitialUserConfigDumps.swift index 703acf6a1..3091e2531 100644 --- a/SessionMessagingKit/Database/Migrations/_014_GenerateInitialUserConfigDumps.swift +++ b/SessionMessagingKit/Database/Migrations/_014_GenerateInitialUserConfigDumps.swift @@ -23,6 +23,7 @@ enum _014_GenerateInitialUserConfigDumps: Migration { // Create the initial config state let userPublicKey: String = getUserHexEncodedPublicKey(db) + let timestampMs: Int64 = Int64(Date().timeIntervalSince1970 * 1000) SessionUtil.loadState(db, userPublicKey: userPublicKey, ed25519SecretKey: secretKey) @@ -56,7 +57,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration { .createDump( conf: conf, for: .userProfile, - publicKey: userPublicKey + publicKey: userPublicKey, + timestampMs: timestampMs )? .save(db) } @@ -120,7 +122,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration { .createDump( conf: conf, for: .contacts, - publicKey: userPublicKey + publicKey: userPublicKey, + timestampMs: timestampMs )? .save(db) } @@ -144,7 +147,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration { .createDump( conf: conf, for: .convoInfoVolatile, - publicKey: userPublicKey + publicKey: userPublicKey, + timestampMs: timestampMs )? .save(db) } @@ -179,7 +183,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration { .createDump( conf: conf, for: .userGroups, - publicKey: userPublicKey + publicKey: userPublicKey, + timestampMs: timestampMs )? .save(db) } diff --git a/SessionMessagingKit/Database/Models/ConfigDump.swift b/SessionMessagingKit/Database/Models/ConfigDump.swift index cca17d29f..8f20f1466 100644 --- a/SessionMessagingKit/Database/Models/ConfigDump.swift +++ b/SessionMessagingKit/Database/Models/ConfigDump.swift @@ -13,6 +13,7 @@ public struct ConfigDump: Codable, Equatable, Hashable, FetchableRecord, Persist case variant case publicKey case data + case timestampMs } public enum Variant: String, Codable, DatabaseValueConvertible { @@ -33,14 +34,19 @@ public struct ConfigDump: Codable, Equatable, Hashable, FetchableRecord, Persist /// The data for this dump public let data: Data + /// When the configDump was created in milliseconds since epoch + public let timestampMs: Int64 + internal init( variant: Variant, publicKey: String, - data: Data + data: Data, + timestampMs: Int64 ) { self.variant = variant self.publicKey = publicKey self.data = data + self.timestampMs = timestampMs } } diff --git a/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift new file mode 100644 index 000000000..c5f0ae21f --- /dev/null +++ b/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift @@ -0,0 +1,63 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB +import SessionUtilitiesKit + +public enum ConfigMessageReceiveJob: JobExecutor { + public static var maxFailureCount: Int = 0 + public static var requiresThreadId: Bool = true + public static let requiresInteractionId: Bool = false + + public static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool) -> (), + failure: @escaping (Job, Error?, Bool) -> (), + deferred: @escaping (Job) -> () + ) { + guard + let detailsData: Data = job.details, + let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) + else { + failure(job, JobRunnerError.missingRequiredDetails, true) + return + } + + // Ensure no standard messages are sent through this job + guard !details.messages.contains(where: { $0.variant != .sharedConfigMessage }) else { + SNLog("[ConfigMessageReceiveJob] Standard messages incorrectly sent to the 'configMessageReceive' job") + failure(job, MessageReceiverError.invalidMessage, true) + return + } + + var lastError: Error? + let sharedConfigMessages: [SharedConfigMessage] = details.messages + .compactMap { $0.message as? SharedConfigMessage } + + Storage.shared.write { db in + // Send any SharedConfigMessages to the SessionUtil to handle it + do { + try SessionUtil.handleConfigMessages( + db, + messages: sharedConfigMessages, + publicKey: (job.threadId ?? "") + ) + } + catch { lastError = error } + } + + // Handle the result + switch lastError { + case let error as MessageReceiverError where !error.isRetryable: failure(job, error, true) + case .some(let error): failure(job, error, false) + case .none: success(job, false) + } + } +} + +// MARK: - ConfigMessageReceiveJob.Details + +extension ConfigMessageReceiveJob { + typealias Details = MessageReceiveJob.Details +} diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index 726624dcd..10e3fb15d 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -25,10 +25,17 @@ public enum MessageReceiveJob: JobExecutor { return } + // Ensure no config messages are sent through this job + guard !details.messages.contains(where: { $0.variant == .sharedConfigMessage }) else { + SNLog("[MessageReceiveJob] Config messages incorrectly sent to the 'messageReceive' job") + failure(job, MessageReceiverError.invalidSharedConfigMessageHandling, true) + return + } + var updatedJob: Job = job var lastError: Error? var remainingMessagesToProcess: [Details.MessageInfo] = [] - let nonConfigMessages: [(info: Details.MessageInfo, proto: SNProtoContent)] = details.messages + let messageData: [(info: Details.MessageInfo, proto: SNProtoContent)] = details.messages .filter { $0.variant != .sharedConfigMessage } .compactMap { messageInfo -> (info: Details.MessageInfo, proto: SNProtoContent)? in do { @@ -44,19 +51,9 @@ public enum MessageReceiveJob: JobExecutor { return nil } } - let sharedConfigMessages: [SharedConfigMessage] = details.messages - .compactMap { $0.message as? SharedConfigMessage } Storage.shared.write { db in - // Send any SharedConfigMessages to the SessionUtil to handle it - try SessionUtil.handleConfigMessages( - db, - messages: sharedConfigMessages, - publicKey: (job.threadId ?? "") - ) - - // Handle the remaining messages - for (messageInfo, protoContent) in nonConfigMessages { + for (messageInfo, protoContent) in messageData { do { try MessageReceiver.handle( db, @@ -98,6 +95,8 @@ public enum MessageReceiveJob: JobExecutor { // If any messages failed to process then we want to update the job to only include // those failed messages + guard !remainingMessagesToProcess.isEmpty else { return } + updatedJob = try job .with( details: Details( diff --git a/SessionMessagingKit/Messages/Control Messages/SharedConfigMessage.swift b/SessionMessagingKit/Messages/Control Messages/SharedConfigMessage.swift index 2e67ea96e..09aecc1c5 100644 --- a/SessionMessagingKit/Messages/Control Messages/SharedConfigMessage.swift +++ b/SessionMessagingKit/Messages/Control Messages/SharedConfigMessage.swift @@ -42,13 +42,14 @@ public final class SharedConfigMessage: ControlMessage { public init( kind: Kind, seqNo: Int64, - data: Data + data: Data, + sentTimestamp: UInt64? = nil ) { self.kind = kind self.seqNo = seqNo self.data = data - super.init() + super.init(sentTimestamp: sentTimestamp) } // MARK: - Codable diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift index ae9b3a53d..0fb862067 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift @@ -69,7 +69,15 @@ extension MessageReceiver { guard case let .new(publicKeyAsData, name, encryptionKeyPair, membersAsData, adminsAsData, expirationTimer) = message.kind else { return } - guard let sentTimestamp: UInt64 = message.sentTimestamp else { return } + guard + let sentTimestamp: UInt64 = message.sentTimestamp, + SessionUtil.canPerformChange( + db, + threadId: publicKeyAsData.toHexString(), + targetConfig: .userGroups, + changeTimestampMs: Int64(sentTimestamp) + ) + else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") } try handleNewClosedGroup( db, @@ -473,16 +481,11 @@ extension MessageReceiver { let wasCurrentUserRemoved: Bool = !members.contains(userPublicKey) if wasCurrentUserRemoved { - ClosedGroupPoller.shared.stopPolling(for: threadId) - - _ = try closedGroup - .keyPairs - .deleteAll(db) - - let _ = PushNotificationAPI.performOperation( - .unsubscribe, - for: threadId, - publicKey: userPublicKey + try ClosedGroup.removeKeysAndUnsubscribe( + db, + threadId: threadId, + removeGroupData: true, + calledFromConfigHandling: false ) } } @@ -584,27 +587,35 @@ extension MessageReceiver { return SNLog("Ignoring group update for nonexistent group.") } - // Legacy groups used these control messages for making changes, new groups only use them - // for information purposes - switch threadVariant { - case .legacyGroup: - // Check that the message isn't from before the group was created - guard Double(message.sentTimestamp ?? 0) > closedGroup.formationTimestamp else { - return SNLog("Ignoring legacy group update from before thread was created.") - } - - // If these values are missing then we probably won't be able to validly handle the message - guard - let allMembers: [GroupMember] = try? closedGroup.allMembers.fetchAll(db), - allMembers.contains(where: { $0.profileId == sender }) - else { return SNLog("Ignoring legacy group update from non-member.") } - - try legacyGroupChanges(sender, closedGroup, allMembers) - - case .group: - break - - default: return // Ignore as invalid + let timestampMs: Int64 = ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) + // Only actually make the change if SessionUtil says we can (we always want to insert the info + // message though) + if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs ) { + // Legacy groups used these control messages for making changes, new groups only use them + // for information purposes + switch threadVariant { + case .legacyGroup: + // Check that the message isn't from before the group was created + guard Double(message.sentTimestamp ?? 0) > closedGroup.formationTimestamp else { + return SNLog("Ignoring legacy group update from before thread was created.") + } + + // If these values are missing then we probably won't be able to validly handle the message + guard + let allMembers: [GroupMember] = try? closedGroup.allMembers.fetchAll(db), + allMembers.contains(where: { $0.profileId == sender }) + else { return SNLog("Ignoring legacy group update from non-member.") } + + try legacyGroupChanges(sender, closedGroup, allMembers) + + case .group: + break + + default: return // Ignore as invalid + } } // Insert the info message for this group control message diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 52fa670f1..8dbcbdd91 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -248,10 +248,11 @@ public class Poller { var messageCount: Int = 0 var processedMessages: [Message] = [] var hadValidHashUpdate: Bool = false - var jobsToRun: [Job] = [] + var configMessageJobsToRun: [Job] = [] + var standardMessageJobsToRun: [Job] = [] Storage.shared.write { db in - allMessages + let allProcessedMessages: [ProcessedMessage] = allMessages .compactMap { message -> ProcessedMessage? in do { return try Message.processRawReceivedMessage(db, rawMessage: message) @@ -284,6 +285,39 @@ public class Poller { return nil } } + + // Add a job to process the config messages first + let configJobIds: [Int64] = allProcessedMessages + .filter { $0.messageInfo.variant == .sharedConfigMessage } + .grouped { threadId, _, _, _ in threadId } + .compactMap { threadId, threadMessages in + messageCount += threadMessages.count + processedMessages += threadMessages.map { $0.messageInfo.message } + + let jobToRun: Job? = Job( + variant: .configMessageReceive, + behaviour: .runOnce, + threadId: threadId, + details: ConfigMessageReceiveJob.Details( + messages: threadMessages.map { $0.messageInfo }, + calledFromBackgroundPoller: calledFromBackgroundPoller + ) + ) + configMessageJobsToRun = configMessageJobsToRun.appending(jobToRun) + + // If we are force-polling then add to the JobRunner so they are + // persistent and will retry on the next app run if they fail but + // don't let them auto-start + let updatedJob: Job? = JobRunner + .add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller) + + return updatedJob?.id + } + + // Add jobs for processing non-config messages which are dependant on the config message + // processing jobs + allProcessedMessages + .filter { $0.messageInfo.variant != .sharedConfigMessage } .grouped { threadId, _, _, _ in threadId } .forEach { threadId, threadMessages in messageCount += threadMessages.count @@ -298,12 +332,29 @@ public class Poller { calledFromBackgroundPoller: calledFromBackgroundPoller ) ) - jobsToRun = jobsToRun.appending(jobToRun) + standardMessageJobsToRun = standardMessageJobsToRun.appending(jobToRun) // If we are force-polling then add to the JobRunner so they are // persistent and will retry on the next app run if they fail but // don't let them auto-start - JobRunner.add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller) + let updatedJob: Job? = JobRunner + .add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller) + + // Create the dependency between the jobs + if let updatedJobId: Int64 = updatedJob?.id { + do { + try configJobIds.forEach { configJobId in + try JobDependencies( + jobId: updatedJobId, + dependantId: configJobId + ) + .insert(db) + } + } + catch { + SNLog("Failed to add dependency between config processing and non-config processing messageReceive jobs.") + } + } } // Clean up message hashes and add some logs about the poll results @@ -334,11 +385,11 @@ public class Poller { // We want to try to handle the receive jobs immediately in the background return Publishers .MergeMany( - jobsToRun.map { job -> AnyPublisher in + configMessageJobsToRun.map { job -> AnyPublisher in Deferred { Future { resolver in // Note: In the background we just want jobs to fail silently - MessageReceiveJob.run( + ConfigMessageReceiveJob.run( job, queue: queue, success: { _, _ in resolver(Result.success(())) }, @@ -351,6 +402,27 @@ public class Poller { } ) .collect() + .flatMap { _ in + Publishers + .MergeMany( + standardMessageJobsToRun.map { job -> AnyPublisher in + Deferred { + Future { resolver in + // Note: In the background we just want jobs to fail silently + MessageReceiveJob.run( + job, + queue: queue, + success: { _, _ in resolver(Result.success(())) }, + failure: { _, _, _ in resolver(Result.success(())) }, + deferred: { _ in resolver(Result.success(())) } + ) + } + } + .eraseToAnyPublisher() + } + ) + .collect() + } .map { _ in processedMessages } .eraseToAnyPublisher() } diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift index 4a8c7e9d2..3ea0d9194 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift @@ -9,6 +9,11 @@ import SessionUtilitiesKit // MARK: - Convenience internal extension SessionUtil { + /// This is a buffer period within which we will process messages which would result in a config change, any message which would normally + /// result in a config change which was sent before `lastConfigMessage.timestamp - configChangeBufferPeriod` will not + /// actually have it's changes applied (info messages would still be inserted though) + static let configChangeBufferPeriod: TimeInterval = (2 * 60) + static let columnsRelatedToThreads: [ColumnExpression] = [ SessionThread.Columns.pinnedPriority, SessionThread.Columns.shouldBeVisible @@ -66,7 +71,8 @@ internal extension SessionUtil { try SessionUtil.createDump( conf: conf, for: variant, - publicKey: publicKey + publicKey: publicKey, + timestampMs: Int64(Date().timeIntervalSince1970 * 1000) )?.save(db) return config_needs_push(conf) @@ -293,6 +299,35 @@ internal extension SessionUtil { } } } + + static func canPerformChange( + _ db: Database, + threadId: String, + targetConfig: ConfigDump.Variant, + changeTimestampMs: Int64 + ) -> Bool { + // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent + guard SessionUtil.userConfigsEnabled(db) else { return true } + + let targetPublicKey: String = { + switch targetConfig { + default: return getUserHexEncodedPublicKey(db) + } + }() + + let configDumpTimestampMs: Int64 = (try? ConfigDump + .filter( + ConfigDump.Columns.variant == targetConfig && + ConfigDump.Columns.publicKey == targetPublicKey + ) + .select(.timestampMs) + .asRequest(of: Int64.self) + .fetchOne(db)) + .defaulting(to: 0) + + // Ensure the change occurred after the last config message was handled (minus the buffer period) + return (changeTimestampMs > (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000))) + } } // MARK: - External Outgoing Changes diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserGroups.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserGroups.swift index 0ba663d29..3433ea5dd 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserGroups.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserGroups.swift @@ -29,7 +29,7 @@ internal extension SessionUtil { _ db: Database, in conf: UnsafeMutablePointer?, mergeNeedsDump: Bool, - latestConfigUpdateSentTimestamp: TimeInterval + latestConfigSentTimestampMs: Int64 ) throws { guard mergeNeedsDump else { return } guard conf != nil else { throw SessionUtilError.nilConfigObject } @@ -219,7 +219,7 @@ internal extension SessionUtil { .map { $0.profileId }, admins: updatedAdmins.map { $0.profileId }, expirationTimer: UInt32(group.disappearingConfig?.durationSeconds ?? 0), - formationTimestampMs: UInt64((group.joinedAt ?? Int64(latestConfigUpdateSentTimestamp)) * 1000), + formationTimestampMs: UInt64((group.joinedAt.map { $0 * 1000 } ?? latestConfigSentTimestampMs)), calledFromConfigHandling: true ) } diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserProfile.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserProfile.swift index 920eb3e47..2bbe3cc4d 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserProfile.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+UserProfile.swift @@ -18,7 +18,7 @@ internal extension SessionUtil { _ db: Database, in conf: UnsafeMutablePointer?, mergeNeedsDump: Bool, - latestConfigUpdateSentTimestamp: TimeInterval + latestConfigSentTimestampMs: Int64 ) throws { typealias ProfileData = (profileName: String, profilePictureUrl: String?, profilePictureKey: Data?) @@ -50,7 +50,7 @@ internal extension SessionUtil { fileName: nil ) }(), - sentTimestamp: latestConfigUpdateSentTimestamp, + sentTimestamp: (TimeInterval(latestConfigSentTimestampMs) / 1000), calledFromConfigHandling: true ) diff --git a/SessionMessagingKit/SessionUtil/SessionUtil.swift b/SessionMessagingKit/SessionUtil/SessionUtil.swift index 1735f6b6c..391a4a45a 100644 --- a/SessionMessagingKit/SessionUtil/SessionUtil.swift +++ b/SessionMessagingKit/SessionUtil/SessionUtil.swift @@ -260,7 +260,8 @@ public enum SessionUtil { internal static func createDump( conf: UnsafeMutablePointer?, for variant: ConfigDump.Variant, - publicKey: String + publicKey: String, + timestampMs: Int64 ) throws -> ConfigDump? { guard conf != nil else { throw SessionUtilError.nilConfigObject } @@ -279,7 +280,8 @@ public enum SessionUtil { return ConfigDump( variant: variant, publicKey: publicKey, - data: dumpData + data: dumpData, + timestampMs: timestampMs ) } @@ -363,7 +365,8 @@ public enum SessionUtil { return try? SessionUtil.createDump( conf: conf, for: message.kind.configDumpVariant, - publicKey: publicKey + publicKey: publicKey, + timestampMs: (message.sentTimestamp.map { Int64($0) } ?? 0) ) } } @@ -427,9 +430,7 @@ public enum SessionUtil { let needsPush: Bool = try groupedMessages .sorted { lhs, rhs in lhs.key.processingOrder < rhs.key.processingOrder } .reduce(false) { prevNeedsPush, next -> Bool in - let messageSentTimestamp: TimeInterval = TimeInterval( - (next.value.compactMap { $0.sentTimestamp }.max() ?? 0) / 1000 - ) + let latestConfigSentTimestampMs: Int64 = Int64(next.value.compactMap { $0.sentTimestamp }.max() ?? 0) let needsPush: Bool = try SessionUtil .config(for: next.key, publicKey: publicKey) .mutate { conf in @@ -453,7 +454,7 @@ public enum SessionUtil { db, in: conf, mergeNeedsDump: config_needs_dump(conf), - latestConfigUpdateSentTimestamp: messageSentTimestamp + latestConfigSentTimestampMs: latestConfigSentTimestampMs ) case .contacts: @@ -475,7 +476,7 @@ public enum SessionUtil { db, in: conf, mergeNeedsDump: config_needs_dump(conf), - latestConfigUpdateSentTimestamp: messageSentTimestamp + latestConfigSentTimestampMs: latestConfigSentTimestampMs ) } } @@ -486,12 +487,25 @@ public enum SessionUtil { // Need to check if the config needs to be dumped (this might have changed // after handling the merge changes) - guard config_needs_dump(conf) else { return config_needs_push(conf) } + guard config_needs_dump(conf) else { + try ConfigDump + .filter( + ConfigDump.Columns.variant == next.key && + ConfigDump.Columns.publicKey == publicKey + ) + .updateAll( + db, + ConfigDump.Columns.timestampMs.set(to: latestConfigSentTimestampMs) + ) + + return config_needs_push(conf) + } try SessionUtil.createDump( conf: conf, for: next.key, - publicKey: publicKey + publicKey: publicKey, + timestampMs: latestConfigSentTimestampMs )?.save(db) return config_needs_push(conf) diff --git a/SessionMessagingKit/Shared Models/MessageViewModel.swift b/SessionMessagingKit/Shared Models/MessageViewModel.swift index 880f9c0bb..177dcbbbb 100644 --- a/SessionMessagingKit/Shared Models/MessageViewModel.swift +++ b/SessionMessagingKit/Shared Models/MessageViewModel.swift @@ -34,6 +34,7 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable, public static let currentUserPublicKeyKey: SQL = SQL(stringLiteral: CodingKeys.currentUserPublicKey.stringValue) public static let cellTypeKey: SQL = SQL(stringLiteral: CodingKeys.cellType.stringValue) public static let authorNameKey: SQL = SQL(stringLiteral: CodingKeys.authorName.stringValue) + public static let canHaveProfileKey: SQL = SQL(stringLiteral: CodingKeys.canHaveProfile.stringValue) public static let shouldShowProfileKey: SQL = SQL(stringLiteral: CodingKeys.shouldShowProfile.stringValue) public static let shouldShowDateHeaderKey: SQL = SQL(stringLiteral: CodingKeys.shouldShowDateHeader.stringValue) public static let positionInClusterKey: SQL = SQL(stringLiteral: CodingKeys.positionInCluster.stringValue) @@ -115,6 +116,9 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable, /// **Note:** This will only be populated for incoming messages public let senderName: String? + /// A flag indicating whether the profile view can be displayed + public let canHaveProfile: Bool + /// A flag indicating whether the profile view should be displayed public let shouldShowProfile: Bool @@ -191,6 +195,7 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable, cellType: self.cellType, authorName: self.authorName, senderName: self.senderName, + canHaveProfile: self.canHaveProfile, shouldShowProfile: self.shouldShowProfile, shouldShowDateHeader: self.shouldShowDateHeader, containsOnlyEmoji: self.containsOnlyEmoji, @@ -393,6 +398,11 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable, return authorDisplayName }(), + canHaveProfile: ( + // Only group threads and incoming messages + isGroupThread && + self.variant == .standardIncoming + ), shouldShowProfile: ( // Only group threads isGroupThread && @@ -564,6 +574,7 @@ public extension MessageViewModel { self.cellType = cellType self.authorName = "" self.senderName = nil + self.canHaveProfile = false self.shouldShowProfile = false self.shouldShowDateHeader = false self.containsOnlyEmoji = nil @@ -733,6 +744,7 @@ public extension MessageViewModel { -- query from crashing when decoding we need to provide default values \(CellType.textOnlyMessage) AS \(ViewModel.cellTypeKey), '' AS \(ViewModel.authorNameKey), + false AS \(ViewModel.canHaveProfileKey), false AS \(ViewModel.shouldShowProfileKey), false AS \(ViewModel.shouldShowDateHeaderKey), \(Position.middle) AS \(ViewModel.positionInClusterKey), diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index 8fd845cba..d1c588c39 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -111,6 +111,11 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, /// This is a job that runs once whenever the user config or a closed group config changes, it retrieves the /// state of all config objects and syncs any that are flagged as needing to be synced case configurationSync + + /// This is a job that runs once whenever a config message is received to attempt to decode it and update the + /// config state with the changes; this job will generally be scheduled along since a `messageReceive` job + /// and will block the standard message receive job + case configMessageReceive } public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable { diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index c6b0bad56..19a750f26 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -80,7 +80,8 @@ public final class JobRunner { executionType: .serial, qos: .default, jobVariants: [ - jobVariants.remove(.messageReceive) + jobVariants.remove(.messageReceive), + jobVariants.remove(.configMessageReceive) ].compactMap { $0 } ) let attachmentDownloadQueue: JobQueue = JobQueue( @@ -127,15 +128,15 @@ public final class JobRunner { /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started - public static func add(_ db: Database, job: Job?, canStartJob: Bool = true) { + @discardableResult public static func add(_ db: Database, job: Job?, canStartJob: Bool = true) -> Job? { // Store the job into the database (getting an id for it) guard let updatedJob: Job = try? job?.inserted(db) else { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") - return + return nil } guard !canStartJob || updatedJob.id != nil else { SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") - return + return nil } // Wait until the transaction has been completed before updating the queue (to ensure anything @@ -149,6 +150,8 @@ public final class JobRunner { queues.wrappedValue[updatedJob.variant]?.start() } + + return updatedJob } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start