From 01bb200b24f7bd934587ddd42c4b596397f1c4d3 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Wed, 13 Apr 2022 16:51:53 +1000 Subject: [PATCH] reduce number of commits during opengroup handling of message --- .../message/message-item/ReadableMessage.tsx | 9 +- ts/models/conversation.ts | 50 ++---- ts/node/sql.ts | 5 +- ts/receiver/contentMessage.ts | 11 +- ts/receiver/dataMessage.ts | 5 +- ts/receiver/opengroup.ts | 2 +- ts/receiver/queuedJob.ts | 160 +++++++++--------- ts/receiver/receiver.ts | 4 +- 8 files changed, 114 insertions(+), 132 deletions(-) diff --git a/ts/components/conversation/message/message-item/ReadableMessage.tsx b/ts/components/conversation/message/message-item/ReadableMessage.tsx index 174482ba0..90c5bcb0c 100644 --- a/ts/components/conversation/message/message-item/ReadableMessage.tsx +++ b/ts/components/conversation/message/message-item/ReadableMessage.tsx @@ -104,12 +104,11 @@ export const ReadableMessage = (props: ReadableMessageProps) => { // make sure the app is focused, because we mark message as read here if (inView === true && isAppFocused) { dispatch(showScrollToBottomButton(false)); - void getConversationController() + getConversationController() .get(selectedConversationKey) - ?.markRead(receivedAt || 0) - .then(() => { - dispatch(markConversationFullyRead(selectedConversationKey)); - }); + ?.markRead(receivedAt || 0); + + dispatch(markConversationFullyRead(selectedConversationKey)); } else if (inView === false) { dispatch(showScrollToBottomButton(true)); } diff --git a/ts/models/conversation.ts b/ts/models/conversation.ts index 5e2b58bca..42680e856 100644 --- a/ts/models/conversation.ts +++ b/ts/models/conversation.ts @@ -196,7 +196,7 @@ export class ConversationModel extends Backbone.Model { public updateLastMessage: () => any; public throttledBumpTyping: () => void; public throttledNotify: (message: MessageModel) => void; - public markRead: (newestUnreadDate: number, providedOptions?: any) => Promise; + public markRead: (newestUnreadDate: number, providedOptions?: any) => void; public initialPromise: any; private typingRefreshTimer?: NodeJS.Timeout | null; @@ -226,8 +226,7 @@ export class ConversationModel extends Backbone.Model { leading: true, trailing: true, }); - // tslint:disable-next-line: no-async-without-await - this.markRead = async (newestUnreadDate: number) => { + this.markRead = (newestUnreadDate: number) => { const lastReadTimestamp = this.lastReadTimestamp; if (newestUnreadDate > lastReadTimestamp) { this.lastReadTimestamp = newestUnreadDate; @@ -901,8 +900,9 @@ export class ConversationModel extends Backbone.Model { receivedAt?: number, // is set if it comes from outside options: { fromSync?: boolean; - } = {} - ) { + } = {}, + shouldCommit = true + ): Promise { let expireTimer = providedExpireTimer; let source = providedSource; @@ -912,7 +912,7 @@ export class ConversationModel extends Backbone.Model { expireTimer = 0; } if (this.get('expireTimer') === expireTimer || (!expireTimer && !this.get('expireTimer'))) { - return null; + return; } window?.log?.info("Update conversation 'expireTimer'", { @@ -964,12 +964,13 @@ export class ConversationModel extends Backbone.Model { this.set('active_at', timestamp); } - // tell the UI this conversation was updated - await this.commit(); - + if (shouldCommit) { + // tell the UI this conversation was updated + await this.commit(); + } // if change was made remotely, don't send it to the number/group if (receivedAt) { - return message; + return; } const expireUpdate = { @@ -998,7 +999,7 @@ export class ConversationModel extends Backbone.Model { await getMessageQueue().sendToGroup(expirationTimerMessage); } - return message; + return; } public triggerUIRefresh() { @@ -1577,7 +1578,7 @@ export class ConversationModel extends Backbone.Model { }); } - public async notifyTyping({ isTyping, sender }: any) { + public async notifyTypingNoCommit({ isTyping, sender }: { isTyping: boolean; sender: string }) { // We don't do anything with typing messages from our other devices if (UserUtils.isUsFromCache(sender)) { return; @@ -1588,32 +1589,15 @@ export class ConversationModel extends Backbone.Model { return; } - const wasTyping = !!this.typingTimer; if (this.typingTimer) { global.clearTimeout(this.typingTimer); this.typingTimer = null; } - // Note: We trigger two events because: - // 'change' causes a re-render of this conversation's list item in the left pane - - if (isTyping) { - this.typingTimer = global.setTimeout( - this.clearContactTypingTimer.bind(this, sender), - 15 * 1000 - ); - - if (!wasTyping) { - // User was not previously typing before. State change! - await this.commit(); - } - } else { - this.typingTimer = null; - if (wasTyping) { - // User was previously typing, and is no longer. State change! - await this.commit(); - } - } + // we do not trigger a state change here, instead we rely on the caller to do the commit once it is done with the queue of messages + this.typingTimer = isTyping + ? global.setTimeout(this.clearContactTypingTimer.bind(this, sender), 15 * 1000) + : null; } private async addSingleMessage(messageAttributes: MessageAttributesOptionals) { diff --git a/ts/node/sql.ts b/ts/node/sql.ts index 2bd7f2e73..5b1e27b4f 100644 --- a/ts/node/sql.ts +++ b/ts/node/sql.ts @@ -1471,6 +1471,7 @@ async function initializeSql({ vacuumDatabase(db); const msgCount = getMessageCount(); const convoCount = getConversationCount(); + console.info('total message count: ', msgCount); console.info('total conversation count: ', convoCount); } catch (error) { @@ -2234,7 +2235,7 @@ function getMessageBySenderAndTimestamp({ function filterAlreadyFetchedOpengroupMessage( msgDetails: Array<{ sender: string; serverTimestamp: number }> // MsgDuplicateSearchOpenGroup -) { +): Array<{ sender: string; serverTimestamp: number }> { return msgDetails.filter(msg => { const rows = assertGlobalInstance() .prepare( @@ -2248,7 +2249,7 @@ function filterAlreadyFetchedOpengroupMessage( }); if (rows.length) { console.info( - `filtering out already received message from ${msg.sender} at ${msg.serverTimestamp} ` + `filtering out already received sogs message from ${msg.sender} at ${msg.serverTimestamp} ` ); return false; } diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index 894149c1e..41c613c4f 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -27,15 +27,13 @@ export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageH const plaintext = await decrypt(envelope, envelope.content); if (!plaintext) { - // window?.log?.warn('handleSwarmContentMessage: plaintext was falsey'); return; } else if (plaintext instanceof ArrayBuffer && plaintext.byteLength === 0) { return; } - perfStart(`innerHandleSwarmContentMessage-${envelope.id}`); + const sentAtTimestamp = _.toNumber(envelope.timestamp); - await innerHandleSwarmContentMessage(envelope, plaintext, messageHash); - perfEnd(`innerHandleSwarmContentMessage-${envelope.id}`, 'innerHandleSwarmContentMessage'); + await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash); } catch (e) { window?.log?.warn(e); } @@ -326,6 +324,7 @@ function shouldDropBlockedUserMessage(content: SignalService.Content): boolean { // tslint:disable-next-line: cyclomatic-complexity export async function innerHandleSwarmContentMessage( envelope: EnvelopePlus, + sentAtTimestamp: number, plaintext: ArrayBuffer, messageHash: string ): Promise { @@ -381,6 +380,7 @@ export async function innerHandleSwarmContentMessage( await handleSwarmDataMessage( envelope, + sentAtTimestamp, content.dataMessage as SignalService.DataMessage, messageHash, senderConversationModel @@ -514,7 +514,8 @@ async function handleTypingMessage( const started = action === SignalService.TypingMessage.Action.STARTED; if (conversation) { - await conversation.notifyTyping({ + // this does not commit, instead the caller should commit to trigger UI updates + await conversation.notifyTypingNoCommit({ isTyping: started, sender: source, }); diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index 69b821d9f..77cd18913 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -11,7 +11,6 @@ import { getConversationController } from '../session/conversations'; import { handleClosedGroupControlMessage } from './closedGroups'; import { getMessageBySenderAndSentAt } from '../../ts/data/data'; import { ConversationModel, ConversationTypeEnum } from '../models/conversation'; -import { toLogFormat } from '../types/attachments/Errors'; import { createSwarmMessageSentFromNotUs, @@ -20,6 +19,7 @@ import { import { MessageModel } from '../models/message'; import { isUsFromCache } from '../session/utils/User'; import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates'; +import { toLogFormat } from '../types/attachments/Errors'; function cleanAttachment(attachment: any) { return { @@ -154,6 +154,7 @@ async function cleanIncomingDataMessage( // tslint:disable-next-line: cyclomatic-complexity export async function handleSwarmDataMessage( envelope: EnvelopePlus, + sentAtTimestamp: number, rawDataMessage: SignalService.DataMessage, messageHash: string, senderConversationModel: ConversationModel @@ -221,8 +222,6 @@ export async function handleSwarmDataMessage( return removeFromCache(envelope); } - const sentAtTimestamp = _.toNumber(envelope.timestamp); - if (!convoIdToAddTheMessageTo) { window?.log?.error('We cannot handle a message without a conversationId'); confirm(); diff --git a/ts/receiver/opengroup.ts b/ts/receiver/opengroup.ts index 97ef0de17..d18de80e2 100644 --- a/ts/receiver/opengroup.ts +++ b/ts/receiver/opengroup.ts @@ -66,7 +66,7 @@ export async function handleOpenGroupV2Message( const commonAttributes = { serverTimestamp: sentTimestamp, serverId, conversationId }; const attributesForNotUs = { ...commonAttributes, sender }; - // those lines just create an empty message only in memory with some basic stuff set. + // those lines just create an empty message only in-memory with some basic stuff set. // the whole decoding of data is happening in handleMessageJob() const msgModel = isMe ? createPublicMessageSentFromUs(commonAttributes) diff --git a/ts/receiver/queuedJob.ts b/ts/receiver/queuedJob.ts index 7e3625d9e..fb2edf970 100644 --- a/ts/receiver/queuedJob.ts +++ b/ts/receiver/queuedJob.ts @@ -6,7 +6,7 @@ import _ from 'lodash'; import { getConversationController } from '../session/conversations'; import { ConversationModel, ConversationTypeEnum } from '../models/conversation'; import { MessageModel } from '../models/message'; -import { getMessageById, getMessageCountByType, getMessagesBySentAt } from '../../ts/data/data'; +import { getMessageCountByType, getMessagesBySentAt } from '../../ts/data/data'; import { SignalService } from '../protobuf'; import { UserUtils } from '../session/utils'; @@ -21,8 +21,12 @@ function contentTypeSupported(type: string): boolean { return Chrome.isImageTypeSupported(type) || Chrome.isVideoTypeSupported(type); } -// tslint:disable-next-line: cyclomatic-complexity +/** + * Note: this function does not trigger a write to the db nor trigger redux update. + * You have to call msg.commit() once you are done with the handling of this message + */ async function copyFromQuotedMessage( + // tslint:disable-next-line: cyclomatic-complexity msg: MessageModel, quote?: SignalService.DataMessage.IQuote | null ): Promise { @@ -41,7 +45,7 @@ async function copyFromQuotedMessage( const firstAttachment = attachments?.[0] || undefined; - const id: number = _.toNumber(quoteId); + const id = _.toNumber(quoteId); // We always look for the quote by sentAt timestamp, for opengroups, closed groups and session chats // this will return an array of sent message by id we have locally. @@ -56,7 +60,6 @@ async function copyFromQuotedMessage( window?.log?.warn(`We did not found quoted message ${id}.`); quoteLocal.referencedMessageNotFound = true; msg.set({ quote: quoteLocal }); - await msg.commit(); return; } @@ -72,7 +75,6 @@ async function copyFromQuotedMessage( !contentTypeSupported(firstAttachment.contentType) ) { msg.set({ quote: quoteLocal }); - await msg.commit(); return; } @@ -107,10 +109,11 @@ async function copyFromQuotedMessage( quoteLocal.attachments = [firstAttachment]; msg.set({ quote: quoteLocal }); - await msg.commit(); - return; } +/** + * Note: This does not trigger a redux update, nor write to the DB + */ function handleLinkPreviews(messageBody: string, messagePreview: any, message: MessageModel) { const urls = LinkPreviews.findLinks(messageBody); const incomingPreview = messagePreview || []; @@ -127,15 +130,15 @@ function handleLinkPreviews(messageBody: string, messagePreview: any, message: M message.set({ preview }); } -async function processProfileKey( +async function processProfileKeyNoCommit( conversation: ConversationModel, sendingDeviceConversation: ConversationModel, profileKeyBuffer?: Uint8Array ) { if (conversation.isPrivate()) { - await conversation.setProfileKey(profileKeyBuffer); + await conversation.setProfileKey(profileKeyBuffer, false); } else { - await sendingDeviceConversation.setProfileKey(profileKeyBuffer); + await sendingDeviceConversation.setProfileKey(profileKeyBuffer, false); } } @@ -150,22 +153,17 @@ function handleMentions( } } -function updateReadStatus(message: MessageModel, conversation: ConversationModel) { +function updateReadStatus(message: MessageModel) { if (message.isExpirationTimerUpdate()) { message.set({ unread: 0 }); - - // This is primarily to allow the conversation to mark all older - // messages as read, as is done when we receive a read sync for - // a message we already know about. - void conversation.onReadMessage(message, Date.now()); } } -async function handleSyncedReceipts(message: MessageModel, conversation: ConversationModel) { +function handleSyncedReceiptsNoCommit(message: MessageModel, conversation: ConversationModel) { // If the newly received message is from us, we assume that we've seen the messages up until that point const sentTimestamp = message.get('sent_at'); if (sentTimestamp) { - await conversation.markRead(sentTimestamp); + conversation.markRead(sentTimestamp); } } @@ -204,12 +202,14 @@ export function toRegularMessage(rawDataMessage: SignalService.DataMessage): Reg async function handleRegularMessage( conversation: ConversationModel, + sendingDeviceConversation: ConversationModel, message: MessageModel, rawDataMessage: RegularMessageType, source: string, messageHash: string -) { +): Promise { const type = message.get('type'); + // this does not trigger a UI update nor write to the db await copyFromQuotedMessage(message, rawDataMessage.quote); if (rawDataMessage.openGroupInvitation) { @@ -241,8 +241,8 @@ async function handleRegularMessage( handleMentions(message, conversation, ourPubKey); if (type === 'incoming') { - updateReadStatus(message, conversation); if (conversation.isPrivate()) { + updateReadStatus(message); const incomingMessageCount = await getMessageCountByType( conversation.id, MessageDirection.incoming @@ -266,12 +266,10 @@ async function handleRegularMessage( } // should only occur after isOutgoing request as it relies on didApproveMe being false. await conversation.setDidApproveMe(true); - // edge case end } - } - - if (type === 'outgoing') { - await handleSyncedReceipts(message, conversation); + } else if (type === 'outgoing') { + // we want to do this for all types of conversations, not just private chats + handleSyncedReceiptsNoCommit(message, conversation); if (conversation.isPrivate()) { await conversation.setIsApproved(true); @@ -286,34 +284,22 @@ async function handleRegularMessage( }); } - const sendingDeviceConversation = await getConversationController().getOrCreateAndWait( - source, - ConversationTypeEnum.PRIVATE - ); - - // Check if we need to update any profile names - // the only profile we don't update with what is coming here is ours, - // as our profile is shared accross our devices with a ConfigurationMessage - if (type === 'incoming' && rawDataMessage.profile) { - void appendFetchAvatarAndProfileJob( + if (rawDataMessage.profileKey) { + await processProfileKeyNoCommit( + conversation, sendingDeviceConversation, - rawDataMessage.profile, rawDataMessage.profileKey ); } - if (rawDataMessage.profileKey) { - await processProfileKey(conversation, sendingDeviceConversation, rawDataMessage.profileKey); - } - // we just received a message from that user so we reset the typing indicator for this convo - await conversation.notifyTyping({ + await conversation.notifyTypingNoCommit({ isTyping: false, sender: source, }); } -async function handleExpirationTimerUpdate( +async function handleExpirationTimerUpdateNoCommit( conversation: ConversationModel, message: MessageModel, source: string, @@ -328,7 +314,7 @@ async function handleExpirationTimerUpdate( }); conversation.set({ expireTimer }); - await conversation.updateExpireTimer(expireTimer, source, message.get('received_at')); + await conversation.updateExpireTimer(expireTimer, source, message.get('received_at'), {}, false); } export async function handleMessageJob( @@ -340,11 +326,15 @@ export async function handleMessageJob( messageHash: string ) { window?.log?.info( - `Starting handleSwarmDataMessage for message ${messageModel.idForLogging()}, ${messageModel.get( + `Starting handleMessageJob for message ${messageModel.idForLogging()}, ${messageModel.get( 'serverTimestamp' ) || messageModel.get('timestamp')} in conversation ${conversation.idForLogging()}` ); + const sendingDeviceConversation = await getConversationController().getOrCreateAndWait( + source, + ConversationTypeEnum.PRIVATE + ); try { messageModel.set({ flags: regularDataMessage.flags }); if (messageModel.isExpirationTimerUpdate()) { @@ -357,10 +347,12 @@ export async function handleMessageJob( ); return; } - await handleExpirationTimerUpdate(conversation, messageModel, source, expireTimer); + await handleExpirationTimerUpdateNoCommit(conversation, messageModel, source, expireTimer); } else { + // this does not commit to db nor UI unless we need to approve a convo await handleRegularMessage( conversation, + sendingDeviceConversation, messageModel, regularDataMessage, source, @@ -368,7 +360,7 @@ export async function handleMessageJob( ); } - // save the message model to the db and it save the messageId generated to our copy + // save the message model to the db and it save the messageId generated to our in-memory copy const id = await messageModel.commit(); messageModel.set({ id }); @@ -376,42 +368,53 @@ export async function handleMessageJob( // call it after we have an id for this message, because the jobs refer back // to their source message. - void queueAttachmentDownloads(messageModel, conversation); - const unreadCount = await conversation.getUnreadCount(); conversation.set({ unreadCount }); - // this is a throttled call and will only run once every 1 sec + // this is a throttled call and will only run once every 1 sec at most conversation.updateLastMessage(); await conversation.commit(); - try { - // We go to the database here because, between the message save above and - // the previous line's trigger() call, we might have marked all messages - // unread in the database. This message might already be read! - const fetched = await getMessageById(messageModel.get('id')); - - const previousUnread = messageModel.get('unread'); - - // Important to update message with latest read state from database - messageModel.merge(fetched); - - if (previousUnread !== messageModel.get('unread')) { - window?.log?.warn( - 'Caught race condition on new message read state! ' + 'Manually starting timers.' - ); - // We call markRead() even though the message is already - // marked read because we need to start expiration - // timers, etc. - await messageModel.markRead(Date.now()); - } - } catch (error) { - window?.log?.warn( - 'handleSwarmDataMessage: Message', - messageModel.idForLogging(), - 'was deleted' + void queueAttachmentDownloads(messageModel, conversation); + // Check if we need to update any profile names + // the only profile we don't update with what is coming here is ours, + // as our profile is shared accross our devices with a ConfigurationMessage + if (messageModel.isIncoming() && regularDataMessage.profile) { + void appendFetchAvatarAndProfileJob( + sendingDeviceConversation, + regularDataMessage.profile, + regularDataMessage.profileKey ); } + // even with all the warnings, I am very sus about if this is usefull or not + // try { + // // We go to the database here because, between the message save above and + // // the previous line's trigger() call, we might have marked all messages + // // unread in the database. This message might already be read! + // const fetched = await getMessageById(messageModel.get('id')); + + // const previousUnread = messageModel.get('unread'); + + // // Important to update message with latest read state from database + // messageModel.merge(fetched); + + // if (previousUnread !== messageModel.get('unread')) { + // window?.log?.warn( + // 'Caught race condition on new message read state! ' + 'Manually starting timers.' + // ); + // // We call markRead() even though the message is already + // // marked read because we need to start expiration + // // timers, etc. + // await messageModel.markRead(Date.now()); + // } + // } catch (error) { + // window?.log?.warn( + // 'handleMessageJob: Message', + // messageModel.idForLogging(), + // 'was deleted' + // ); + // } + if (messageModel.get('unread')) { conversation.throttledNotify(messageModel); } @@ -419,13 +422,6 @@ export async function handleMessageJob( confirm?.(); } catch (error) { const errorForLog = error && error.stack ? error.stack : error; - window?.log?.error( - 'handleSwarmDataMessage', - messageModel.idForLogging(), - 'error:', - errorForLog - ); - - throw error; + window?.log?.error('handleMessageJob', messageModel.idForLogging(), 'error:', errorForLog); } } diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index b233e1a5f..ce2b85edd 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -231,7 +231,9 @@ async function handleDecryptedEnvelope( messageHash: string ) { if (envelope.content) { - await innerHandleSwarmContentMessage(envelope, plaintext, messageHash); + const sentAtTimestamp = _.toNumber(envelope.timestamp); + + await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash); } else { await removeFromCache(envelope); }