diff --git a/Session/Conversations/ConversationVC.swift b/Session/Conversations/ConversationVC.swift index 5b22e7ae5..37a218c4f 100644 --- a/Session/Conversations/ConversationVC.swift +++ b/Session/Conversations/ConversationVC.swift @@ -1587,6 +1587,10 @@ final class ConversationVC: BaseVC, SessionUtilRespondingViewController, Convers self.scrollButton.alpha = self.getScrollButtonOpacity() self.unreadCountView.alpha = self.scrollButton.alpha + // The initial scroll can trigger this logic but we already mark the initially focused message + // as read so don't run the below until the user actually scrolls after the initial layout + guard self.didFinishInitialLayout else { return } + // We want to mark messages as read while we scroll, so grab the newest message and mark // everything older as read // diff --git a/Session/Home/GlobalSearch/GlobalSearchViewController.swift b/Session/Home/GlobalSearch/GlobalSearchViewController.swift index f36992c24..7d4103a85 100644 --- a/Session/Home/GlobalSearch/GlobalSearchViewController.swift +++ b/Session/Home/GlobalSearch/GlobalSearchViewController.swift @@ -311,6 +311,19 @@ extension GlobalSearchViewController { return } + // If it's a one-to-one thread then make sure the thread exists before pushing to it (in case the + // contact has been hidden) + if threadVariant == .contact { + Storage.shared.write { db in + try SessionThread.fetchOrCreate( + db, + id: threadId, + variant: threadVariant, + shouldBeVisible: nil // Don't change current state + ) + } + } + let viewController: ConversationVC = ConversationVC( threadId: threadId, threadVariant: threadVariant, diff --git a/Session/Open Groups/JoinOpenGroupVC.swift b/Session/Open Groups/JoinOpenGroupVC.swift index 0d36e4063..3f25f1175 100644 --- a/Session/Open Groups/JoinOpenGroupVC.swift +++ b/Session/Open Groups/JoinOpenGroupVC.swift @@ -159,10 +159,10 @@ final class JoinOpenGroupVC: BaseVC, UIPageViewControllerDataSource, UIPageViewC return } - joinOpenGroup(roomToken: room, server: server, publicKey: publicKey) + joinOpenGroup(roomToken: room, server: server, publicKey: publicKey, shouldOpenCommunity: true) } - fileprivate func joinOpenGroup(roomToken: String, server: String, publicKey: String, shouldOpenCommunity: Bool = false) { + fileprivate func joinOpenGroup(roomToken: String, server: String, publicKey: String, shouldOpenCommunity: Bool) { guard !isJoining, let navigationController: UINavigationController = navigationController else { return } isJoining = true diff --git a/SessionMessagingKit/Messages/Message.swift b/SessionMessagingKit/Messages/Message.swift index 97d6198e9..9e382e7a2 100644 --- a/SessionMessagingKit/Messages/Message.swift +++ b/SessionMessagingKit/Messages/Message.swift @@ -277,15 +277,9 @@ public extension Message { return processedMessage } catch { - // If we get 'selfSend' or 'duplicateControlMessage' errors then we still want to insert - // the SnodeReceivedMessageInfo to prevent retrieving and attempting to process the same - // message again (as well as ensure the next poll doesn't retrieve the same message) - switch error { - case MessageReceiverError.selfSend, MessageReceiverError.duplicateControlMessage: - _ = try? rawMessage.info.inserted(db) - break - - default: break + // For some error cases we want to update the last hash so do so + if (error as? MessageReceiverError)?.shouldUpdateLastHash == true { + _ = try? rawMessage.info.inserted(db) } throw error diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index 3716460df..0cb6f4a6b 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -605,18 +605,19 @@ public final class OpenGroupManager { return } - let seqNo: Int64? = messages.map { $0.seqNo }.max() let sortedMessages: [OpenGroupAPI.Message] = messages .filter { $0.deleted != true } .sorted { lhs, rhs in lhs.id < rhs.id } - var messageServerIdsToRemove: [Int64] = messages + var messageServerInfoToRemove: [(id: Int64, seqNo: Int64)] = messages .filter { $0.deleted == true } - .map { $0.id } - - if let seqNo: Int64 = seqNo { + .map { ($0.id, $0.seqNo) } + let updateSeqNo: (Database, String, inout Int64, Int64, OGMDependencies) -> () = { db, openGroupId, lastValidSeqNo, seqNo, dependencies in + // Only update the data if the 'seqNo' is larger than the lastValidSeqNo (only want it to increase) + guard seqNo > lastValidSeqNo else { return } + // Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId') _ = try? OpenGroup - .filter(id: openGroup.id) + .filter(id: openGroupId) .updateAll(db, OpenGroup.Columns.sequenceNumber.set(to: seqNo)) // Update pendingChange cache @@ -624,13 +625,18 @@ public final class OpenGroupManager { $0.pendingChanges = $0.pendingChanges .filter { $0.seqNo == nil || $0.seqNo! > seqNo } } + + // Update the inout value + lastValidSeqNo = seqNo } // Process the messages + var lastValidSeqNo: Int64 = -1 sortedMessages.forEach { message in if message.base64EncodedData == nil && message.reactions == nil { - messageServerIdsToRemove.append(Int64(message.id)) - return + messageServerInfoToRemove.append((message.id, message.seqNo)) + + return updateSeqNo(db, openGroup.id, &lastValidSeqNo, message.seqNo, dependencies) } // Handle messages @@ -657,6 +663,7 @@ public final class OpenGroupManager { associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData), dependencies: dependencies ) + updateSeqNo(db, openGroup.id, &lastValidSeqNo, message.seqNo, dependencies) } } catch { @@ -701,6 +708,7 @@ public final class OpenGroupManager { openGroupMessageServerId: message.id, openGroupReactions: reactions ) + updateSeqNo(db, openGroup.id, &lastValidSeqNo, message.seqNo, dependencies) } catch { SNLog("Couldn't handle open group reactions due to error: \(error).") @@ -709,12 +717,18 @@ public final class OpenGroupManager { } // Handle any deletions that are needed - guard !messageServerIdsToRemove.isEmpty else { return } + guard !messageServerInfoToRemove.isEmpty else { return } + let messageServerIdsToRemove: [Int64] = messageServerInfoToRemove.map { $0.id } _ = try? Interaction .filter(Interaction.Columns.threadId == openGroup.threadId) .filter(messageServerIdsToRemove.contains(Interaction.Columns.openGroupServerMessageId)) .deleteAll(db) + + // Update the seqNo for deletions + if let lastDeletionSeqNo: Int64 = messageServerInfoToRemove.map({ $0.seqNo }).max() { + updateSeqNo(db, openGroup.id, &lastValidSeqNo, lastDeletionSeqNo, dependencies) + } } internal static func handleDirectMessages( diff --git a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift index bcc097efe..8cc59e6e4 100644 --- a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift +++ b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift @@ -34,6 +34,19 @@ public enum MessageReceiverError: LocalizedError { default: return true } } + + public var shouldUpdateLastHash: Bool { + switch self { + // If we get one of these errors then we still want to update the last hash to prevent + // retrieving and attempting to process the same messages again (as well as ensure the + // next poll doesn't retrieve the same message - these errors are essentially considered + // "already successfully processed") + case .selfSend, .duplicateControlMessage, .outdatedMessage: + return true + + default: return false + } + } public var errorDescription: String? { switch self { diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift index dafc634d2..8737f2643 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift @@ -13,31 +13,8 @@ extension MessageReceiver { threadVariant: SessionThread.Variant, message: CallMessage ) throws { - let timestampMs: Int64 = (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) - // Only support calls from contact threads - guard - threadVariant == .contact, - /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period - ( - SessionThread - .filter(id: threadId) - .filter(SessionThread.Columns.shouldBeVisible == true) - .isNotEmpty(db) || - SessionUtil.conversationInConfig( - db, - threadId: threadId, - threadVariant: threadVariant, - visibleOnly: true - ) || - SessionUtil.canPerformChange( - db, - threadId: threadId, - targetConfig: .contacts, - changeTimestampMs: timestampMs - ) - ) - else { return } + guard threadVariant == .contact else { return } switch message.kind { case .preOffer: try MessageReceiver.handleNewCallMessage(db, message: message) diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift index e9397be26..48b91fba4 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift @@ -3,7 +3,6 @@ import Foundation import GRDB import SessionSnodeKit -import SessionUtilitiesKit extension MessageReceiver { internal static func handleDataExtractionNotification( @@ -12,46 +11,12 @@ extension MessageReceiver { threadVariant: SessionThread.Variant, message: DataExtractionNotification ) throws { - let timestampMs: Int64 = ( - message.sentTimestamp.map { Int64($0) } ?? - SnodeAPI.currentOffsetTimestampMs() - ) - guard threadVariant == .contact, let sender: String = message.sender, let messageKind: DataExtractionNotification.Kind = message.kind else { throw MessageReceiverError.invalidMessage } - /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period - guard - SessionThread - .filter(id: threadId) - .filter(SessionThread.Columns.shouldBeVisible == true) - .isNotEmpty(db) || - SessionUtil.conversationInConfig( - db, - threadId: threadId, - threadVariant: threadVariant, - visibleOnly: true - ) || - SessionUtil.canPerformChange( - db, - threadId: threadId, - targetConfig: { - switch threadVariant { - case .contact: - let currentUserPublicKey: String = getUserHexEncodedPublicKey(db) - - return (threadId == currentUserPublicKey ? .userProfile : .contacts) - - default: return .userGroups - } - }(), - changeTimestampMs: timestampMs - ) - else { throw MessageReceiverError.outdatedMessage } - _ = try Interaction( serverHash: message.serverHash, threadId: threadId, @@ -62,7 +27,10 @@ extension MessageReceiver { case .mediaSaved: return .infoMediaSavedNotification } }(), - timestampMs: timestampMs + timestampMs: ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) ).inserted(db) } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift index fbaad1124..4921a2d48 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift @@ -13,10 +13,6 @@ extension MessageReceiver { dependencies: SMKDependencies ) throws { let userPublicKey = getUserHexEncodedPublicKey(db, dependencies: dependencies) - let timestampMs: Int64 = ( - message.sentTimestamp.map { Int64($0) } ?? - SnodeAPI.currentOffsetTimestampMs() - ) var blindedContactIds: [String] = [] // Ignore messages which were sent from the current user @@ -25,26 +21,6 @@ extension MessageReceiver { let senderId: String = message.sender else { throw MessageReceiverError.invalidMessage } - /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period - guard - SessionThread - .filter(id: senderId) - .filter(SessionThread.Columns.shouldBeVisible == true) - .isNotEmpty(db) || - SessionUtil.conversationInConfig( - db, - threadId: senderId, - threadVariant: .contact, - visibleOnly: true - ) || - SessionUtil.canPerformChange( - db, - threadId: senderId, - targetConfig: .contacts, - changeTimestampMs: timestampMs - ) - else { throw MessageReceiverError.outdatedMessage } - // Update profile if needed (want to do this regardless of whether the message exists or // not to ensure the profile info gets sync between a users devices at every chance) if let profile = message.profile { @@ -160,7 +136,10 @@ extension MessageReceiver { threadId: unblindedThread.id, authorId: senderId, variant: .infoMessageRequestAccepted, - timestampMs: timestampMs + timestampMs: ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) ).inserted(db) } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift index d41cf4f40..1fc33de52 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift @@ -23,32 +23,6 @@ extension MessageReceiver { // seconds to maintain the accuracy) let messageSentTimestamp: TimeInterval = (TimeInterval(message.sentTimestamp ?? 0) / 1000) let isMainAppActive: Bool = (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) - let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) - - /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period - guard - SessionThread - .filter(id: threadId) - .filter(SessionThread.Columns.shouldBeVisible == true) - .isNotEmpty(db) || - SessionUtil.conversationInConfig( - db, - threadId: threadId, - threadVariant: threadVariant, - visibleOnly: true - ) || - SessionUtil.canPerformChange( - db, - threadId: threadId, - targetConfig: { - switch threadVariant { - case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts) - default: return .userGroups - } - }(), - changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) - ) - else { throw MessageReceiverError.outdatedMessage } // Update profile if needed (want to do this regardless of whether the message exists or // not to ensure the profile info gets sync between a users devices at every chance) @@ -90,6 +64,7 @@ extension MessageReceiver { } // Store the message variant so we can run variant-specific behaviours + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) let thread: SessionThread = try SessionThread .fetchOrCreate(db, id: threadId, variant: threadVariant, shouldBeVisible: nil) let maybeOpenGroup: OpenGroup? = { diff --git a/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift b/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift index ba5f4263b..847f17805 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift @@ -192,6 +192,15 @@ public enum MessageReceiver { SessionUtil.conversationInConfig(db, threadId: threadId, threadVariant: threadVariant, visibleOnly: false) else { throw MessageReceiverError.requiredThreadNotInConfig } + // Throw if the message is outdated and shouldn't be processed + try throwIfMessageOutdated( + db, + message: message, + threadId: threadId, + threadVariant: threadVariant, + dependencies: dependencies + ) + switch message { case let message as ReadReceipt: try MessageReceiver.handleReadReceipt( @@ -315,7 +324,8 @@ public enum MessageReceiver { .filter(id: threadId) .updateAllAndConfig( db, - SessionThread.Columns.shouldBeVisible.set(to: true) + SessionThread.Columns.shouldBeVisible.set(to: true), + SessionThread.Columns.pinnedPriority.set(to: SessionUtil.visiblePriority) ) } } @@ -344,4 +354,44 @@ public enum MessageReceiver { try reaction.with(interactionId: interactionId).insert(db) } } + + public static func throwIfMessageOutdated( + _ db: Database, + message: Message, + threadId: String, + threadVariant: SessionThread.Variant, + dependencies: SMKDependencies = SMKDependencies() + ) throws { + switch message { + case is ReadReceipt: return // No visible artifact created so better to keep for more reliable read states + case is UnsendRequest: return // We should always process the removal of messages just in case + default: break + } + + // Determine the state of the conversation and the validity of the message + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) + let conversationVisibleInConfig: Bool = SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) + let canPerformChange: Bool = SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts) + default: return .userGroups + } + }(), + changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) + ) + + // If the thread is visible or the message was sent more recently than the last config message (minus + // buffer period) then we should process the message, if not then throw as the message is outdated + guard !conversationVisibleInConfig && !canPerformChange else { return } + + throw MessageReceiverError.outdatedMessage + } } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift index d4ff78b6c..7458ca7a8 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift @@ -53,16 +53,32 @@ extension OpenGroupAPI { .fetchOne(db) } .defaulting(to: 0) + let lastPollStart: TimeInterval = Date().timeIntervalSince1970 let nextPollInterval: TimeInterval = getInterval(for: minPollFailureCount, minInterval: Poller.minPollInterval, maxInterval: Poller.maxPollInterval) - poll(using: dependencies).sinkUntilComplete() - timer = Timer.scheduledTimerOnMainThread(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in - timer.invalidate() - - Threading.pollerQueue.async { - self?.pollRecursively(using: dependencies) - } - } + // Wait until the last poll completes before polling again ensuring we don't poll any faster than + // the 'nextPollInterval' value + poll(using: dependencies) + .sinkUntilComplete( + receiveCompletion: { [weak self] _ in + let currentTime: TimeInterval = Date().timeIntervalSince1970 + let remainingInterval: TimeInterval = max(0, nextPollInterval - (currentTime - lastPollStart)) + + guard remainingInterval > 0 else { + return Threading.pollerQueue.async { + self?.pollRecursively(using: dependencies) + } + } + + self?.timer = Timer.scheduledTimerOnMainThread(withTimeInterval: remainingInterval, repeats: false) { timer in + timer.invalidate() + + Threading.pollerQueue.async { + self?.pollRecursively(using: dependencies) + } + } + } + ) } public func poll( diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift index 8fb7d949e..a925cbcb0 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift @@ -34,11 +34,14 @@ internal extension SessionUtil { return !allColumnsThatTriggerConfigUpdate.isDisjoint(with: targetColumns) } + /// A `0` `priority` value indicates visible, but not pinned + static let visiblePriority: Int32 = 0 + /// A negative `priority` value indicates hidden static let hiddenPriority: Int32 = -1 static func shouldBeVisible(priority: Int32) -> Bool { - return (priority >= 0) + return (priority >= SessionUtil.visiblePriority) } static func performAndPushChange( @@ -127,8 +130,8 @@ internal extension SessionUtil { guard noteToSelf.shouldBeVisible else { return SessionUtil.hiddenPriority } return noteToSelf.pinnedPriority - .map { Int32($0 == 0 ? 0 : max($0, 1)) } - .defaulting(to: 0) + .map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) } + .defaulting(to: SessionUtil.visiblePriority) }(), in: conf ) @@ -154,8 +157,8 @@ internal extension SessionUtil { guard thread.shouldBeVisible else { return SessionUtil.hiddenPriority } return thread.pinnedPriority - .map { Int32($0 == 0 ? 0 : max($0, 1)) } - .defaulting(to: 0) + .map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) } + .defaulting(to: SessionUtil.visiblePriority) }() ) }, @@ -176,8 +179,8 @@ internal extension SessionUtil { CommunityInfo( urlInfo: urlInfo, priority: thread.pinnedPriority - .map { Int32($0 == 0 ? 0 : max($0, 1)) } - .defaulting(to: 0) + .map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) } + .defaulting(to: SessionUtil.visiblePriority) ) } }, @@ -197,8 +200,8 @@ internal extension SessionUtil { LegacyGroupInfo( id: thread.id, priority: thread.pinnedPriority - .map { Int32($0 == 0 ? 0 : max($0, 1)) } - .defaulting(to: 0) + .map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) } + .defaulting(to: SessionUtil.visiblePriority) ) }, in: conf diff --git a/SessionNotificationServiceExtension/NotificationServiceExtension.swift b/SessionNotificationServiceExtension/NotificationServiceExtension.swift index 3a61aa8b3..e423e2784 100644 --- a/SessionNotificationServiceExtension/NotificationServiceExtension.swift +++ b/SessionNotificationServiceExtension/NotificationServiceExtension.swift @@ -75,6 +75,14 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension return } + // Throw if the message is outdated and shouldn't be processed + try MessageReceiver.throwIfMessageOutdated( + db, + message: processedMessage.messageInfo.message, + threadId: processedMessage.threadId, + threadVariant: processedMessage.threadVariant + ) + switch processedMessage.messageInfo.message { case let visibleMessage as VisibleMessage: let interactionId: Int64 = try MessageReceiver.handleVisibleMessage( @@ -174,7 +182,7 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension catch { if let error = error as? MessageReceiverError, error.isRetryable { switch error { - case .invalidGroupPublicKey, .noGroupKeyPair: self.completeSilenty() + case .invalidGroupPublicKey, .noGroupKeyPair, .outdatedMessage: self.completeSilenty() default: self.handleFailure(for: notificationContent) } }