// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import Combine
import GRDB
import DifferenceKit
import SessionMessagingKit
import SessionUtilitiesKit
public class ConversationViewModel: OWSAudioPlayerDelegate {
public typealias SectionModel = ArraySection<Section, MessageViewModel>
// MARK: - FocusBehaviour
public enum FocusBehaviour {
case none
case highlight
// MARK: - Action
public enum Action {
case none
case compose
case audioCall
case videoCall
// MARK: - Section
public enum Section: Differentiable, Equatable, Comparable, Hashable {
case loadOlder
case messages
case loadNewer
// MARK: - Variables
public static let pageSize: Int = 50
private var threadId: String
public let initialThreadVariant: SessionThread.Variant
public var sentMessageBeforeUpdate: Bool = false
public var lastSearchedText: String?
public let focusedInteractionInfo: Interaction.TimestampInfo? // Note: This is used for global search
public let focusBehaviour: FocusBehaviour
private let initialUnreadInteractionId: Int64?
private let markAsReadTrigger: PassthroughSubject<(SessionThreadViewModel.ReadTarget, Int64?), Never> = PassthroughSubject()
private var markAsReadPublisher: AnyPublisher<Void, Never>?
public lazy var blockedBannerMessage: String = {
let threadData: SessionThreadViewModel = self._threadData.wrappedValue
switch threadData.threadVariant {
case .contact:
let name: String = Profile.displayName(
id: threadData.threadId,
threadVariant: threadData.threadVariant
return "\(name) is blocked. Unblock them?"
default: return "Thread is blocked. Unblock it?"
// MARK: - Initialization
init(threadId: String, threadVariant: SessionThread.Variant, focusedInteractionInfo: Interaction.TimestampInfo?) {
typealias InitialData = (
currentUserPublicKey: String,
initialUnreadInteractionInfo: Interaction.TimestampInfo?,
threadIsBlocked: Bool,
currentUserIsClosedGroupMember: Bool?,
openGroupPermissions: OpenGroup.Permissions?,
blinded15Key: String?,
blinded25Key: String?
let initialData: InitialData? = { db -> InitialData in
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let groupMember: TypedTableAlias<GroupMember> = TypedTableAlias()
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db)
// If we have a specified 'focusedInteractionInfo' then use that, otherwise retrieve the oldest
// unread interaction and start focused around that one
let initialUnreadInteractionInfo: Interaction.TimestampInfo? = try Interaction
.select(.id, .timestampMs)
.filter(interaction[.wasRead] == false)
.filter(interaction[.threadId] == threadId)
.asRequest(of: Interaction.TimestampInfo.self)
let threadIsBlocked: Bool = (threadVariant != .contact ? false :
try Contact
.filter(id: threadId)
.asRequest(of: Bool.self)
.defaulting(to: false)
let currentUserIsClosedGroupMember: Bool? = (![.legacyGroup, .group].contains(threadVariant) ? nil :
.filter(groupMember[.groupId] == threadId)
.filter(groupMember[.profileId] == currentUserPublicKey)
.filter(groupMember[.role] == GroupMember.Role.standard)
let openGroupPermissions: OpenGroup.Permissions? = (threadVariant != .community ? nil :
try OpenGroup
.filter(id: threadId)
.asRequest(of: OpenGroup.Permissions.self)
let blinded15Key: String? = SessionThread.getUserHexEncodedBlindedKey(
threadId: threadId,
threadVariant: threadVariant,
blindingPrefix: .blinded15
let blinded25Key: String? = SessionThread.getUserHexEncodedBlindedKey(
threadId: threadId,
threadVariant: threadVariant,
blindingPrefix: .blinded25
return (
self.threadId = threadId
self.initialThreadVariant = threadVariant
self.focusedInteractionInfo = (focusedInteractionInfo ?? initialData?.initialUnreadInteractionInfo)
self.focusBehaviour = (focusedInteractionInfo == nil ? .none : .highlight)
self.initialUnreadInteractionId = initialData?.initialUnreadInteractionInfo?.id
self._threadData = Atomic(
threadId: threadId,
threadVariant: threadVariant,
threadIsNoteToSelf: (initialData?.currentUserPublicKey == threadId),
threadIsBlocked: initialData?.threadIsBlocked,
currentUserIsClosedGroupMember: initialData?.currentUserIsClosedGroupMember,
openGroupPermissions: initialData?.openGroupPermissions
currentUserBlinded15PublicKeyForThisThread: initialData?.blinded15Key,
currentUserBlinded25PublicKeyForThisThread: initialData?.blinded25Key
self.pagedDataObserver = nil
// Note: Since this references self we need to finish initializing before setting it, we
// also want to skip the initial query and trigger it async so that the push animation
// doesn't stutter (it should load basically immediately but without this there is a
// distinct stutter)
self.pagedDataObserver = self.setupPagedObserver(
for: threadId,
userPublicKey: (initialData?.currentUserPublicKey ?? getUserHexEncodedPublicKey()),
blinded15PublicKey: initialData?.blinded15Key,
blinded25PublicKey: initialData?.blinded25Key
// Run the initial query on a background thread so we don't block the push transition .userInitiated).async { [weak self] in
// If we don't have a `initialFocusedInfo` then default to `.pageBefore` (it'll query
// from a `0` offset)
guard let initialFocusedInfo: Interaction.TimestampInfo = (focusedInteractionInfo ?? initialData?.initialUnreadInteractionInfo) else {
// MARK: - Thread Data
private var _threadData: Atomic<SessionThreadViewModel>
/// This value is the current state of the view
public var threadData: SessionThreadViewModel { _threadData.wrappedValue }
/// This is all the data the screen needs to populate itself, please see the following link for tips to help optimise
/// performance
/// **Note:** The 'trackingConstantRegion' is optimised in such a way that the request needs to be static
/// otherwise there may be situations where it doesn't get updates, this means we can't have conditional queries
/// **Note:** This observation will be triggered twice immediately (and be de-duped by the `removeDuplicates`)
/// this is due to the behaviour of `ValueConcurrentObserver.asyncStartObservation` which triggers it's own
/// fetch (after the ones in `ValueConcurrentObserver.asyncStart`/`ValueConcurrentObserver.syncStart`)
/// just in case the database has changed between the two reads - unfortunately it doesn't look like there is a way to prevent this
public typealias ThreadObservation = ValueObservation<ValueReducers.Trace<ValueReducers.RemoveDuplicates<ValueReducers.Fetch<SessionThreadViewModel?>>>>
public lazy var observableThreadData: ThreadObservation = setupObservableThreadData(for: self.threadId)
private func setupObservableThreadData(for threadId: String) -> ThreadObservation {
return ValueObservation
.trackingConstantRegion { [weak self] db -> SessionThreadViewModel? in
let userPublicKey: String = getUserHexEncodedPublicKey(db)
let recentReactionEmoji: [String] = try Emoji.getRecent(db, withDefaultEmoji: true)
let oldThreadData: SessionThreadViewModel? = self?._threadData.wrappedValue
let threadViewModel: SessionThreadViewModel? = try SessionThreadViewModel
.conversationQuery(threadId: threadId, userPublicKey: userPublicKey)
return threadViewModel
.map { $0.with(recentReactionEmoji: recentReactionEmoji) }
.map { viewModel -> SessionThreadViewModel in
currentUserBlinded15PublicKeyForThisThread: oldThreadData?.currentUserBlinded15PublicKey,
currentUserBlinded25PublicKeyForThisThread: oldThreadData?.currentUserBlinded25PublicKey
.handleEvents(didFail: { SNLog("[ConversationViewModel] Observation failed with error: \($0)") })
public func updateThreadData(_ updatedData: SessionThreadViewModel) {
self._threadData.mutate { $0 = updatedData }
// MARK: - Interaction Data
private var lastInteractionIdMarkedAsRead: Int64? = nil
private var lastInteractionTimestampMsMarkedAsRead: Int64 = 0
public private(set) var unobservedInteractionDataChanges: ([SectionModel], StagedChangeset<[SectionModel]>)?
public private(set) var interactionData: [SectionModel] = []
public private(set) var reactionExpandedInteractionIds: Set<Int64> = []
public private(set) var pagedDataObserver: PagedDatabaseObserver<Interaction, MessageViewModel>?
public var onInteractionChange: (([SectionModel], StagedChangeset<[SectionModel]>) -> ())? {
didSet {
// When starting to observe interaction changes we want to trigger a UI update just in case the
// data was changed while we weren't observing
if let changes: ([SectionModel], StagedChangeset<[SectionModel]>) = self.unobservedInteractionDataChanges {
let performChange: (([SectionModel], StagedChangeset<[SectionModel]>) -> ())? = onInteractionChange
switch Thread.isMainThread {
case true: performChange?(changes.0, changes.1)
case false: DispatchQueue.main.async { performChange?(changes.0, changes.1) }
self.unobservedInteractionDataChanges = nil
private func setupPagedObserver(
for threadId: String,
userPublicKey: String,
blinded15PublicKey: String?,
blinded25PublicKey: String?
) -> PagedDatabaseObserver<Interaction, MessageViewModel> {
return PagedDatabaseObserver(
pagedTable: Interaction.self,
pageSize: ConversationViewModel.pageSize,
idColumn: .id,
observedChanges: [
table: Interaction.self,
columns: Interaction.Columns
.filter { $0 != .wasRead }
table: Attachment.self,
columns: [.state],
joinToPagedType: {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let linkPreview: TypedTableAlias<LinkPreview> = TypedTableAlias()
let linkPreviewAttachment: TypedTableAlias<Attachment> = TypedTableAlias()
return SQL("""
LEFT JOIN \(LinkPreview.self) ON (
\(linkPreview[.url]) = \(interaction[.linkPreviewUrl]) AND
LEFT JOIN \(linkPreviewAttachment) ON \(linkPreviewAttachment[.id]) = \(linkPreview[.attachmentId])
table: Contact.self,
columns: [.isTrusted],
joinToPagedType: {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let contact: TypedTableAlias<Contact> = TypedTableAlias()
return SQL("JOIN \(Contact.self) ON \(contact[.id]) = \(interaction[.threadId])")
table: Profile.self,
columns: [.profilePictureFileName],
joinToPagedType: {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let profile: TypedTableAlias<Profile> = TypedTableAlias()
return SQL("JOIN \(Profile.self) ON \(profile[.id]) = \(interaction[.authorId])")
table: RecipientState.self,
columns: [.state, .readTimestampMs, .mostRecentFailureText],
joinToPagedType: {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let recipientState: TypedTableAlias<RecipientState> = TypedTableAlias()
return SQL("LEFT JOIN \(RecipientState.self) ON \(recipientState[.interactionId]) = \(interaction[.id])")
table: DisappearingMessagesConfiguration.self,
columns: [ .isEnabled, .type, .durationSeconds ],
joinToPagedType: {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let disappearingMessagesConfiguration: TypedTableAlias<DisappearingMessagesConfiguration> = TypedTableAlias()
return SQL("LEFT JOIN \(DisappearingMessagesConfiguration.self) ON \(disappearingMessagesConfiguration[.threadId]) = \(interaction[.threadId])")
filterSQL: MessageViewModel.filterSQL(threadId: threadId),
groupSQL: MessageViewModel.groupSQL,
orderSQL: MessageViewModel.orderSQL,
dataQuery: MessageViewModel.baseQuery(
userPublicKey: userPublicKey,
blinded15PublicKey: blinded15PublicKey,
blinded25PublicKey: blinded25PublicKey,
orderSQL: MessageViewModel.orderSQL,
groupSQL: MessageViewModel.groupSQL
associatedRecords: [
AssociatedRecord<MessageViewModel.AttachmentInteractionInfo, MessageViewModel>(
trackedAgainst: Attachment.self,
observedChanges: [
table: Attachment.self,
columns: [.state]
dataQuery: MessageViewModel.AttachmentInteractionInfo.baseQuery,
joinToPagedType: MessageViewModel.AttachmentInteractionInfo.joinToViewModelQuerySQL,
associateData: MessageViewModel.AttachmentInteractionInfo.createAssociateDataClosure()
AssociatedRecord<MessageViewModel.ReactionInfo, MessageViewModel>(
trackedAgainst: Reaction.self,
observedChanges: [
table: Reaction.self,
columns: [.count]
dataQuery: MessageViewModel.ReactionInfo.baseQuery,
joinToPagedType: MessageViewModel.ReactionInfo.joinToViewModelQuerySQL,
associateData: MessageViewModel.ReactionInfo.createAssociateDataClosure()
AssociatedRecord<MessageViewModel.TypingIndicatorInfo, MessageViewModel>(
trackedAgainst: ThreadTypingIndicator.self,
observedChanges: [
table: ThreadTypingIndicator.self,
events: [.insert, .delete],
columns: []
dataQuery: MessageViewModel.TypingIndicatorInfo.baseQuery,
joinToPagedType: MessageViewModel.TypingIndicatorInfo.joinToViewModelQuerySQL,
associateData: MessageViewModel.TypingIndicatorInfo.createAssociateDataClosure()
onChangeUnsorted: { [weak self] updatedData, updatedPageInfo in
self?.resolveOptimisticUpdates(with: updatedData)
updatedData: self?.process(
data: updatedData,
for: updatedPageInfo,
optimisticMessages: (self?.optimisticallyInsertedMessages.wrappedValue.values)
.map { $ { $0.messageViewModel } },
initialUnreadInteractionId: self?.initialUnreadInteractionId
currentDataRetriever: { self?.interactionData },
onDataChange: self?.onInteractionChange,
onUnobservedDataChange: { updatedData, changeset in
self?.unobservedInteractionDataChanges = (changeset.isEmpty ?
nil :
(updatedData, changeset)
private func process(
data: [MessageViewModel],
for pageInfo: PagedData.PageInfo,
optimisticMessages: [MessageViewModel]?,
initialUnreadInteractionId: Int64?
) -> [SectionModel] {
let threadData: SessionThreadViewModel = self._threadData.wrappedValue
let typingIndicator: MessageViewModel? = data.first(where: { $0.isTypingIndicator == true })
let sortedData: [MessageViewModel] = data
.filter { $ != MessageViewModel.optimisticUpdateId } // Remove old optimistic updates
.appending(contentsOf: (optimisticMessages ?? [])) // Insert latest optimistic updates
.filter { !$0.cellType.isPostProcessed } // Remove headers and other
.sorted { lhs, rhs -> Bool in lhs.timestampMs < rhs.timestampMs }
// We load messages from newest to oldest so having a pageOffset larger than zero means
// there are newer pages to load
return [
(!data.isEmpty && (pageInfo.pageOffset + pageInfo.currentCount) < pageInfo.totalCount ?
[SectionModel(section: .loadOlder)] :
section: .messages,
elements: sortedData
.map { index, cellViewModel -> MessageViewModel in
prevModel: (index > 0 ? sortedData[index - 1] : nil),
nextModel: (index < (sortedData.count - 1) ? sortedData[index + 1] : nil),
isLast: (
// The database query sorts by timestampMs descending so the "last"
// interaction will actually have a 'pageOffset' of '0' even though
// it's the last element in the 'sortedData' array
index == (sortedData.count - 1) &&
pageInfo.pageOffset == 0
isLastOutgoing: ( == sortedData
.filter {
$0.authorId == threadData.currentUserPublicKey ||
$0.authorId == threadData.currentUserBlinded15PublicKey ||
$0.authorId == threadData.currentUserBlinded25PublicKey
currentUserBlinded15PublicKey: threadData.currentUserBlinded15PublicKey,
currentUserBlinded25PublicKey: threadData.currentUserBlinded25PublicKey
.reduce([]) { result, message in
let updatedResult: [MessageViewModel] = result
.appending(initialUnreadInteractionId == nil || != initialUnreadInteractionId ?
nil :
timestampMs: message.timestampMs,
cellType: .unreadMarker
guard message.shouldShowDateHeader else {
return updatedResult.appending(message)
return updatedResult
timestampMs: message.timestampMs,
cellType: .dateHeader
(!data.isEmpty && pageInfo.pageOffset > 0 ?
[SectionModel(section: .loadNewer)] :
].flatMap { $0 }
public func updateInteractionData(_ updatedData: [SectionModel]) {
self.interactionData = updatedData
// MARK: - Optimistic Message Handling
public typealias OptimisticMessageData = (
id: UUID,
messageViewModel: MessageViewModel,
interaction: Interaction,
attachmentData: Attachment.PreparedData?,
linkPreviewDraft: LinkPreviewDraft?,
linkPreviewAttachment: Attachment?,
quoteModel: QuotedReplyModel?
private var optimisticallyInsertedMessages: Atomic<[UUID: OptimisticMessageData]> = Atomic([:])
private var optimisticMessageAssociatedInteractionIds: Atomic<[Int64: UUID]> = Atomic([:])
public func optimisticallyAppendOutgoingMessage(
text: String?,
sentTimestampMs: Int64,
attachments: [SignalAttachment]?,
linkPreviewDraft: LinkPreviewDraft?,
quoteModel: QuotedReplyModel?
) -> OptimisticMessageData {
// Generate the optimistic data
let optimisticMessageId: UUID = UUID()
let threadData: SessionThreadViewModel = self._threadData.wrappedValue
let currentUserProfile: Profile = Profile.fetchOrCreateCurrentUser()
let interaction: Interaction = Interaction(
threadId: threadData.threadId,
authorId: (threadData.currentUserBlinded15PublicKey ?? threadData.currentUserPublicKey),
variant: .standardOutgoing,
body: text,
timestampMs: sentTimestampMs,
hasMention: Interaction.isUserMentioned(
publicKeysToCheck: [
].compactMap { $0 },
body: text
expiresInSeconds: threadData.disappearingMessagesConfiguration?.durationSeconds,
expiresStartedAtMs: (threadData.disappearingMessagesConfiguration?.type == .disappearAfterSend ? Double(sentTimestampMs) : nil),
linkPreviewUrl: linkPreviewDraft?.urlString
let optimisticAttachments: Attachment.PreparedData? = attachments
.map { Attachment.prepare(attachments: $0) }
let linkPreviewAttachment: Attachment? = { draft in
try? LinkPreview.generateAttachmentIfPossible(
imageData: draft.jpegImageData,
mimeType: OWSMimeTypeImageJpeg
// Generate the actual 'MessageViewModel'
let messageViewModel: MessageViewModel = MessageViewModel(
optimisticMessageId: optimisticMessageId,
threadId: threadData.threadId,
threadVariant: threadData.threadVariant,
threadExpirationType: threadData.disappearingMessagesConfiguration?.type,
threadExpirationTimer: threadData.disappearingMessagesConfiguration?.durationSeconds,
threadOpenGroupServer: threadData.openGroupServer,
threadOpenGroupPublicKey: threadData.openGroupPublicKey,
threadContactNameInternal: threadData.threadContactName(),
timestampMs: interaction.timestampMs,
receivedAtTimestampMs: interaction.receivedAtTimestampMs,
authorId: interaction.authorId,
authorNameInternal: currentUserProfile.displayName(),
body: interaction.body,
expiresStartedAtMs: interaction.expiresStartedAtMs,
expiresInSeconds: interaction.expiresInSeconds,
isSenderOpenGroupModerator: OpenGroupManager.isUserModeratorOrAdmin(
for: threadData.openGroupRoomToken,
on: threadData.openGroupServer
currentUserProfile: currentUserProfile,
quote: { model in
// Don't care about this optimistic quote (the proper one will be generated in the database)
interactionId: -1, // Can't save to db optimistically
authorId: model.authorId,
timestampMs: model.timestampMs,
body: model.body,
attachmentId: model.attachment?.id
quoteAttachment: quoteModel?.attachment,
linkPreview: { draft in
url: draft.urlString,
title: draft.title,
attachmentId: nil // Can't save to db optimistically
linkPreviewAttachment: linkPreviewAttachment,
attachments: optimisticAttachments?.attachments
let optimisticData: OptimisticMessageData = (
optimisticallyInsertedMessages.mutate { $0[optimisticMessageId] = optimisticData }
return optimisticData
public func failedToStoreOptimisticOutgoingMessage(id: UUID, error: Error) {
optimisticallyInsertedMessages.mutate {
$0[id] = $0[id].map {
state: .failed,
mostRecentFailureText: "FAILED_TO_STORE_OUTGOING_MESSAGE".localized()
/// Record an association between an `optimisticMessageId` and a specific `interactionId`
public func associate(optimisticMessageId: UUID, to interactionId: Int64?) {
guard let interactionId: Int64 = interactionId else { return }
optimisticMessageAssociatedInteractionIds.mutate { $0[interactionId] = optimisticMessageId }
public func optimisticMessageData(for optimisticMessageId: UUID) -> OptimisticMessageData? {
return optimisticallyInsertedMessages.wrappedValue[optimisticMessageId]
/// Remove any optimisticUpdate entries which have an associated interactionId in the provided data
private func resolveOptimisticUpdates(with data: [MessageViewModel]) {
let interactionIds: [Int64] = { $ }
let idsToRemove: [UUID] = optimisticMessageAssociatedInteractionIds
.mutate { associatedIds in interactionIds.compactMap { associatedIds.removeValue(forKey: $0) } }
optimisticallyInsertedMessages.mutate { messages in idsToRemove.forEach { messages.removeValue(forKey: $0) } }
private func forceUpdateDataIfPossible() {
// If we can't get the current page data then don't bother trying to update (it's not going to work)
guard let currentPageInfo: PagedData.PageInfo = self.pagedDataObserver?.pageInfo.wrappedValue else { return }
/// **MUST** have the same logic as in the 'PagedDataObserver.onChangeUnsorted' above
let currentData: [SectionModel] = (unobservedInteractionDataChanges?.0 ?? interactionData)
updatedData: process(
data: (currentData.first(where: { $0.model == .messages })?.elements ?? []),
for: currentPageInfo,
optimisticMessages: { $0.messageViewModel },
initialUnreadInteractionId: initialUnreadInteractionId
currentDataRetriever: { [weak self] in self?.interactionData },
onDataChange: self.onInteractionChange,
onUnobservedDataChange: { [weak self] updatedData, changeset in
self?.unobservedInteractionDataChanges = (changeset.isEmpty ?
nil :
(updatedData, changeset)
// MARK: - Mentions
public func mentions(for query: String = "") -> [MentionInfo] {
let threadData: SessionThreadViewModel = self._threadData.wrappedValue
return Storage.shared
.read { db -> [MentionInfo] in
let userPublicKey: String = getUserHexEncodedPublicKey(db)
let pattern: FTS5Pattern? = try? SessionThreadViewModel.pattern(db, searchTerm: query, forTable: Profile.self)
let capabilities: Set<Capability.Variant> = (threadData.threadVariant != .community ?
nil :
try? Capability
.filter(Capability.Columns.openGroupServer == threadData.openGroupServer)
.asRequest(of: Capability.Variant.self)
.defaulting(to: [])
let targetPrefixes: [SessionId.Prefix] = (capabilities.contains(.blind) ?
[.blinded15, .blinded25] :
return (try MentionInfo
userPublicKey: userPublicKey,
threadId: threadData.threadId,
threadVariant: threadData.threadVariant,
targetPrefixes: targetPrefixes,
pattern: pattern
.defaulting(to: [])
.defaulting(to: [])
// MARK: - Functions
public func updateDraft(to draft: String) {
let threadId: String = self.threadId
let currentDraft: String = Storage.shared
.read { db in
try SessionThread
.filter(id: threadId)
.asRequest(of: String.self)
.defaulting(to: "")
// Only write the updated draft to the database if it's changed (avoid unnecessary writes)
guard draft != currentDraft else { return }
Storage.shared.writeAsync { db in
try SessionThread
.filter(id: threadId)
.updateAll(db, SessionThread.Columns.messageDraft.set(to: draft))
/// This method marks a thread as read and depending on the target may also update the interactions within a thread as read
public func markAsRead(
target: SessionThreadViewModel.ReadTarget,
timestampMs: Int64?
) {
/// Since this method now gets triggered when scrolling we want to try to optimise it and avoid busying the database
/// write queue when it isn't needed, in order to do this we:
/// - Throttle the updates to 100ms (quick enough that users shouldn't notice, but will help the DB when the user flings the list)
/// - Only mark interactions as read if they have newer `timestampMs` or `id` values (ie. were sent later or were more-recent
/// entries in the database), **Note:** Old messages will be marked as read upon insertion so shouldn't be an issue
/// The `ThreadViewModel.markAsRead` method also tries to avoid marking as read if a conversation is already fully read
if markAsReadPublisher == nil {
markAsReadPublisher = markAsReadTrigger
.throttle(for: .milliseconds(100), scheduler: .userInitiated), latest: true)
receiveOutput: { [weak self] target, timestampMs in
let threadData: SessionThreadViewModel? = self?._threadData.wrappedValue
switch target {
case .thread: threadData?.markAsRead(target: target)
case .threadAndInteractions(let interactionId):
timestampMs == nil ||
(self?.lastInteractionTimestampMsMarkedAsRead ?? 0) < (timestampMs ?? 0) ||
(self?.lastInteractionIdMarkedAsRead ?? 0) < (interactionId ?? 0)
else {
threadData?.markAsRead(target: .thread)
// If we were given a timestamp then update the 'lastInteractionTimestampMsMarkedAsRead'
// to avoid needless updates
if let timestampMs: Int64 = timestampMs {
self?.lastInteractionTimestampMsMarkedAsRead = timestampMs
self?.lastInteractionIdMarkedAsRead = (interactionId ?? threadData?.interactionId)
threadData?.markAsRead(target: target)
.map { _ in () }
markAsReadTrigger.send((target, timestampMs))
public func swapToThread(updatedThreadId: String) {
let oldestMessageId: Int64? = self.interactionData
.filter { $0.model == .messages }
self.threadId = updatedThreadId
self.observableThreadData = self.setupObservableThreadData(for: updatedThreadId)
self.pagedDataObserver = self.setupPagedObserver(
for: updatedThreadId,
userPublicKey: getUserHexEncodedPublicKey(),
blinded15PublicKey: nil,
blinded25PublicKey: nil
// Try load everything up to the initial visible message, fallback to just the initial page of messages
// if we don't have one
switch oldestMessageId {
case .some(let id): self.pagedDataObserver?.load(.untilInclusive(id: id, padding: 0))
case .none: self.pagedDataObserver?.load(.pageBefore)
public func trustContact() {
guard self._threadData.wrappedValue.threadVariant == .contact else { return }
let threadId: String = self.threadId
Storage.shared.writeAsync { db in
try Contact
.filter(id: threadId)
.updateAll(db, Contact.Columns.isTrusted.set(to: true))
// Start downloading any pending attachments for this contact (UI will automatically be
// updated due to the database observation)
try Attachment
.stateInfo(authorId: threadId, state: .pendingDownload)
.forEach { attachmentDownloadInfo in
job: Job(
variant: .attachmentDownload,
threadId: threadId,
interactionId: attachmentDownloadInfo.interactionId,
details: AttachmentDownloadJob.Details(
attachmentId: attachmentDownloadInfo.attachmentId
public func unblockContact() {
guard self._threadData.wrappedValue.threadVariant == .contact else { return }
let threadId: String = self.threadId
Storage.shared.writeAsync { db in
try Contact
.filter(id: threadId)
.updateAllAndConfig(db, Contact.Columns.isBlocked.set(to: false))
public func expandReactions(for interactionId: Int64) {
public func collapseReactions(for interactionId: Int64) {
// MARK: - Audio Playback
public struct PlaybackInfo {
let state: AudioPlaybackState
let progress: TimeInterval
let playbackRate: Double
let oldPlaybackRate: Double
let updateCallback: (PlaybackInfo?, Error?) -> ()
public func with(
state: AudioPlaybackState? = nil,
progress: TimeInterval? = nil,
playbackRate: Double? = nil,
updateCallback: ((PlaybackInfo?, Error?) -> ())? = nil
) -> PlaybackInfo {
return PlaybackInfo(
state: (state ?? self.state),
progress: (progress ?? self.progress),
playbackRate: (playbackRate ?? self.playbackRate),
oldPlaybackRate: self.playbackRate,
updateCallback: (updateCallback ?? self.updateCallback)
private var audioPlayer: Atomic<OWSAudioPlayer?> = Atomic(nil)
private var currentPlayingInteraction: Atomic<Int64?> = Atomic(nil)
private var playbackInfo: Atomic<[Int64: PlaybackInfo]> = Atomic([:])
public func playbackInfo(for viewModel: MessageViewModel, updateCallback: ((PlaybackInfo?, Error?) -> ())? = nil) -> PlaybackInfo? {
// Use the existing info if it already exists (update it's callback if provided as that means
// the cell was reloaded)
if let currentPlaybackInfo: PlaybackInfo = playbackInfo.wrappedValue[] {
let updatedPlaybackInfo: PlaybackInfo = currentPlaybackInfo
.with(updateCallback: updateCallback)
playbackInfo.mutate { $0[] = updatedPlaybackInfo }
return updatedPlaybackInfo
// Validate the item is a valid audio item
let updateCallback: ((PlaybackInfo?, Error?) -> ()) = updateCallback,
let attachment: Attachment = viewModel.attachments?.first,
let originalFilePath: String = attachment.originalFilePath,
FileManager.default.fileExists(atPath: originalFilePath)
else { return nil }
// Create the info with the update callback
let newPlaybackInfo: PlaybackInfo = PlaybackInfo(
state: .stopped,
progress: 0,
playbackRate: 1,
oldPlaybackRate: 1,
updateCallback: updateCallback
// Cache the info
playbackInfo.mutate { $0[] = newPlaybackInfo }
return newPlaybackInfo
public func playOrPauseAudio(for viewModel: MessageViewModel) {
let attachment: Attachment = viewModel.attachments?.first,
let originalFilePath: String = attachment.originalFilePath,
FileManager.default.fileExists(atPath: originalFilePath)
else { return }
// If the user interacted with the currently playing item
guard currentPlayingInteraction.wrappedValue != else {
let currentPlaybackInfo: PlaybackInfo? = playbackInfo.wrappedValue[]
let updatedPlaybackInfo: PlaybackInfo? = currentPlaybackInfo?
state: (currentPlaybackInfo?.state != .playing ? .playing : .paused),
playbackRate: 1
audioPlayer.wrappedValue?.playbackRate = 1
switch currentPlaybackInfo?.state {
case .playing: audioPlayer.wrappedValue?.pause()
default: audioPlayer.wrappedValue?.play()
// Update the state and then update the UI with the updated state
playbackInfo.mutate { $0[] = updatedPlaybackInfo }
updatedPlaybackInfo?.updateCallback(updatedPlaybackInfo, nil)
// First stop any existing audio
// Then setup the state for the new audio
currentPlayingInteraction.mutate { $0 = }
let currentPlaybackTime: TimeInterval? = playbackInfo.wrappedValue[]?.progress
audioPlayer.mutate { [weak self] player in
// Note: We clear the delegate and explicitly set to nil here as when the OWSAudioPlayer
// gets deallocated it triggers state changes which cause UI bugs when auto-playing
player?.delegate = nil
player = nil
let audioPlayer: OWSAudioPlayer = OWSAudioPlayer(
mediaUrl: URL(fileURLWithPath: originalFilePath),
audioBehavior: .audioMessagePlayback,
delegate: self
audioPlayer.setCurrentTime(currentPlaybackTime ?? 0)
player = audioPlayer
public func speedUpAudio(for viewModel: MessageViewModel) {
// If we aren't playing the specified item then just start playing it
guard == currentPlayingInteraction.wrappedValue else {
playOrPauseAudio(for: viewModel)
let updatedPlaybackInfo: PlaybackInfo? = playbackInfo.wrappedValue[]?
.with(playbackRate: 1.5)
// Speed up the audio player
audioPlayer.wrappedValue?.playbackRate = 1.5
playbackInfo.mutate { $0[] = updatedPlaybackInfo }
updatedPlaybackInfo?.updateCallback(updatedPlaybackInfo, nil)
public func stopAudio() {
currentPlayingInteraction.mutate { $0 = nil }
audioPlayer.mutate {
// Note: We clear the delegate and explicitly set to nil here as when the OWSAudioPlayer
// gets deallocated it triggers state changes which cause UI bugs when auto-playing
$0?.delegate = nil
$0 = nil
// MARK: - OWSAudioPlayerDelegate
public func audioPlaybackState() -> AudioPlaybackState {
guard let interactionId: Int64 = currentPlayingInteraction.wrappedValue else { return .stopped }
return (playbackInfo.wrappedValue[interactionId]?.state ?? .stopped)
public func setAudioPlaybackState(_ state: AudioPlaybackState) {
guard let interactionId: Int64 = currentPlayingInteraction.wrappedValue else { return }
let updatedPlaybackInfo: PlaybackInfo? = playbackInfo.wrappedValue[interactionId]?
.with(state: state)
playbackInfo.mutate { $0[interactionId] = updatedPlaybackInfo }
updatedPlaybackInfo?.updateCallback(updatedPlaybackInfo, nil)
public func setAudioProgress(_ progress: CGFloat, duration: CGFloat) {
guard let interactionId: Int64 = currentPlayingInteraction.wrappedValue else { return }
let updatedPlaybackInfo: PlaybackInfo? = playbackInfo.wrappedValue[interactionId]?
.with(progress: TimeInterval(progress))
playbackInfo.mutate { $0[interactionId] = updatedPlaybackInfo }
updatedPlaybackInfo?.updateCallback(updatedPlaybackInfo, nil)
public func audioPlayerDidFinishPlaying(_ player: OWSAudioPlayer, successfully: Bool) {
guard let interactionId: Int64 = currentPlayingInteraction.wrappedValue else { return }
guard successfully else { return }
let updatedPlaybackInfo: PlaybackInfo? = playbackInfo.wrappedValue[interactionId]?
state: .stopped,
progress: 0,
playbackRate: 1
// Safe the changes and send one final update to the UI
playbackInfo.mutate { $0[interactionId] = updatedPlaybackInfo }
updatedPlaybackInfo?.updateCallback(updatedPlaybackInfo, nil)
// Clear out the currently playing record
currentPlayingInteraction.mutate { $0 = nil }
audioPlayer.mutate {
// Note: We clear the delegate and explicitly set to nil here as when the OWSAudioPlayer
// gets deallocated it triggers state changes which cause UI bugs when auto-playing
$0?.delegate = nil
$0 = nil
// If the next interaction is another voice message then autoplay it
let messageSection: SectionModel = self.interactionData
.first(where: { $0.model == .messages }),
let currentIndex: Int = messageSection.elements
.firstIndex(where: { $ == interactionId }),
currentIndex < (messageSection.elements.count - 1),
messageSection.elements[currentIndex + 1].cellType == .voiceMessage,
Storage.shared[.shouldAutoPlayConsecutiveAudioMessages] == true
else { return }
let nextItem: MessageViewModel = messageSection.elements[currentIndex + 1]
playOrPauseAudio(for: nextItem)
public func showInvalidAudioFileAlert() {
guard let interactionId: Int64 = currentPlayingInteraction.wrappedValue else { return }
let updatedPlaybackInfo: PlaybackInfo? = playbackInfo.wrappedValue[interactionId]?
state: .stopped,
progress: 0,
playbackRate: 1
currentPlayingInteraction.mutate { $0 = nil }
playbackInfo.mutate { $0[interactionId] = updatedPlaybackInfo }
updatedPlaybackInfo?.updateCallback(updatedPlaybackInfo, AttachmentError.invalidData)