From fc916ce94c8758dbd63c08f907a2651562b0a3d6 Mon Sep 17 00:00:00 2001 From: audric Date: Fri, 14 Jan 2022 14:56:55 +1100 Subject: [PATCH] Fix attachments duplication on message syncing with current device --- app/sql.js | 6 +- stylesheets/_modules.scss | 1 - ts/data/data.ts | 15 +++-- ts/models/conversation.ts | 9 ++- ts/receiver/contentMessage.ts | 4 +- ts/receiver/dataMessage.ts | 98 ++++++++++++++--------------- ts/receiver/receiver.ts | 23 ++++--- ts/session/sending/MessageSender.ts | 2 +- 8 files changed, 77 insertions(+), 81 deletions(-) diff --git a/app/sql.js b/app/sql.js index b45e9a9be..c5a9561b8 100644 --- a/app/sql.js +++ b/app/sql.js @@ -59,7 +59,7 @@ module.exports = { removeMessage, getUnreadByConversation, getUnreadCountByConversation, - getMessageBySender, + getMessageBySenderAndSentAt, getMessageBySenderAndServerTimestamp, getMessageBySenderAndTimestamp, getMessageIdsFromServerIds, @@ -2145,17 +2145,15 @@ function getMessageById(id) { return jsonToObject(row.json); } -function getMessageBySender({ source, sourceDevice, sentAt }) { +function getMessageBySenderAndSentAt({ source, sentAt }) { const rows = globalInstance .prepare( `SELECT json FROM ${MESSAGES_TABLE} WHERE source = $source AND - sourceDevice = $sourceDevice AND sent_at = $sent_at;` ) .all({ source, - sourceDevice, sent_at: sentAt, }); diff --git a/stylesheets/_modules.scss b/stylesheets/_modules.scss index e1ce2b3cb..d2f7a34f9 100644 --- a/stylesheets/_modules.scss +++ b/stylesheets/_modules.scss @@ -1359,7 +1359,6 @@ @include color-svg('../images/x-16.svg', $color-gray-60); } - // Module: Search Results .module-search-results { diff --git a/ts/data/data.ts b/ts/data/data.ts index 3b8b8c293..4b693b838 100644 --- a/ts/data/data.ts +++ b/ts/data/data.ts @@ -115,7 +115,7 @@ const channelsToMake = { removeAllMessagesInConversation, getMessageCount, - getMessageBySender, + getMessageBySenderAndSentAt, getMessageBySenderAndServerTimestamp, getMessageBySenderAndTimestamp, getMessageIdsFromServerIds, @@ -683,18 +683,15 @@ export async function getMessageById( return new MessageModel(message); } -export async function getMessageBySender({ +export async function getMessageBySenderAndSentAt({ source, - sourceDevice, sentAt, }: { source: string; - sourceDevice: number; sentAt: number; }): Promise { - const messages = await channels.getMessageBySender({ + const messages = await channels.getMessageBySenderAndSentAt({ source, - sourceDevice, sentAt, }); if (!messages || !messages.length) { @@ -854,11 +851,11 @@ export async function getUnprocessedCount(): Promise { return channels.getUnprocessedCount(); } -export async function getAllUnprocessed(): Promise { +export async function getAllUnprocessed(): Promise> { return channels.getAllUnprocessed(); } -export async function getUnprocessedById(id: string): Promise { +export async function getUnprocessedById(id: string): Promise { return channels.getUnprocessedById(id); } @@ -870,6 +867,8 @@ export type UnprocessedParameter = { attempts: number; messageHash: string; senderIdentity?: string; + decrypted?: string; // added once the envelopes's content is decrypted with updateCache + source?: string; // added once the envelopes's content is decrypted with updateCache }; export async function saveUnprocessed(data: UnprocessedParameter): Promise { diff --git a/ts/models/conversation.ts b/ts/models/conversation.ts index 51309cac9..f7903e665 100644 --- a/ts/models/conversation.ts +++ b/ts/models/conversation.ts @@ -715,18 +715,17 @@ export class ConversationModel extends Backbone.Model { const recipients = this.getRecipients(); const now = Date.now(); + const networkTimestamp = now - getLatestTimestampOffset(); window?.log?.info( 'Sending message to conversation', this.idForLogging(), - 'with timestamp', - now + 'with networkTimestamp: ', + networkTimestamp ); const editedQuote = _.isEmpty(quote) ? undefined : quote; - const diffTimestamp = Date.now() - getLatestTimestampOffset(); - const messageObject: MessageAttributesOptionals = { type: 'outgoing', body, @@ -734,7 +733,7 @@ export class ConversationModel extends Backbone.Model { quote: editedQuote, preview, attachments, - sent_at: diffTimestamp, + sent_at: networkTimestamp, received_at: now, expireTimer, recipients, diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index b16c9a120..61d425df7 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -25,7 +25,7 @@ import { } from '../interactions/conversations/unsendingInteractions'; import { SettingsKey } from '../data/settings-key'; -export async function handleContentMessage(envelope: EnvelopePlus, messageHash?: string) { +export async function handleContentMessage(envelope: EnvelopePlus, messageHash: string) { try { const plaintext = await decrypt(envelope, envelope.content); @@ -329,7 +329,7 @@ function shouldDropBlockedUserMessage(content: SignalService.Content): boolean { export async function innerHandleContentMessage( envelope: EnvelopePlus, plaintext: ArrayBuffer, - messageHash?: string + messageHash: string ): Promise { try { perfStart(`SignalService.Content.decode-${envelope.id}`); diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index eacbcc22f..36d26a156 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -12,7 +12,10 @@ import { getConversationController } from '../session/conversations'; import { handleClosedGroupControlMessage } from './closedGroups'; import { MessageModel } from '../models/message'; import { MessageModelType } from '../models/messageType'; -import { getMessageBySender, getMessageBySenderAndServerTimestamp } from '../../ts/data/data'; +import { + getMessageBySenderAndSentAt, + getMessageBySenderAndServerTimestamp, +} from '../../ts/data/data'; import { ConversationModel, ConversationTypeEnum } from '../models/conversation'; import { allowOnlyOneAtATime } from '../session/utils/Promise'; import { toHex } from '../session/utils/String'; @@ -272,7 +275,7 @@ function isBodyEmpty(body: string) { export async function handleDataMessage( envelope: EnvelopePlus, dataMessage: SignalService.IDataMessage, - messageHash?: string + messageHash: string ): Promise { // we handle group updates from our other devices in handleClosedGroupControlMessage() if (dataMessage.closedGroupControlMessage) { @@ -314,15 +317,10 @@ export async function handleDataMessage( return removeFromCache(envelope); } - const ev: any = {}; - if (isMe) { - // Data messages for medium groups don't arrive as sync messages. Instead, - // linked devices poll for group messages independently, thus they need - // to recognise some of those messages at their own. - ev.type = 'sent'; - } else { - ev.type = 'message'; - } + // Data messages for medium groups don't arrive as sync messages. Instead, + // linked devices poll for group messages independently, thus they need + // to recognise some of those messages at their own. + const messageEventType: 'sent' | 'message' = isMe ? 'sent' : 'message'; if (envelope.senderIdentity) { message.group = { @@ -330,19 +328,22 @@ export async function handleDataMessage( }; } - ev.confirm = () => removeFromCache(envelope); + const confirm = () => removeFromCache(envelope); - ev.data = { + const data: MessageCreationData = { source: senderPubKey, - destination: isMe ? message.syncTarget : undefined, + destination: isMe ? message.syncTarget : envelope.source, sourceDevice: 1, timestamp: _.toNumber(envelope.timestamp), receivedAt: envelope.receivedAt, message, messageHash, + isPublic: false, + serverId: null, + serverTimestamp: null, }; - await handleMessageEvent(ev); // dataMessage + await handleMessageEvent(messageEventType, data, confirm); } type MessageDuplicateSearchType = { @@ -354,8 +355,8 @@ type MessageDuplicateSearchType = { export type MessageId = { source: string; - serverId: number; - serverTimestamp: number; + serverId?: number | null; + serverTimestamp?: number | null; sourceDevice: number; timestamp: number; message: MessageDuplicateSearchType; @@ -364,7 +365,6 @@ const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s export async function isMessageDuplicate({ source, - sourceDevice, timestamp, message, serverTimestamp, @@ -372,6 +372,7 @@ export async function isMessageDuplicate({ // serverTimestamp is only used for opengroupv2 try { let result; + if (serverTimestamp) { // first try to find a duplicate with the same serverTimestamp from this sender @@ -386,9 +387,8 @@ export async function isMessageDuplicate({ // but we consider that a user sending two messages with the same serverTimestamp is unlikely return Boolean(result); } - result = await getMessageBySender({ + result = await getMessageBySenderAndSentAt({ source, - sourceDevice, sentAt: timestamp, }); @@ -442,21 +442,21 @@ async function handleProfileUpdate( } } -export interface MessageCreationData { +export type MessageCreationData = { timestamp: number; - isPublic: boolean; receivedAt: number; - sourceDevice: number; // always 1 isn't it? + sourceDevice: number; // always 1 for Session source: string; - serverId: number; message: any; - serverTimestamp: any; + isPublic: boolean; + serverId: number | null; + serverTimestamp: number | null; // Needed for synced outgoing messages - expirationStartTimestamp: any; // ??? + expirationStartTimestamp?: any; // ??? destination: string; - messageHash?: string; -} + messageHash: string; +}; export function initIncomingMessage(data: MessageCreationData): MessageModel { const { @@ -472,24 +472,24 @@ export function initIncomingMessage(data: MessageCreationData): MessageModel { } = data; const messageGroupId = message?.group?.id; - let groupId = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null; - - if (groupId) { - groupId = PubKey.removeTextSecurePrefixIfNeeded(groupId); + const groupIdWithPrefix = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null; + let groupId: string | undefined; + if (groupIdWithPrefix) { + groupId = PubKey.removeTextSecurePrefixIfNeeded(groupIdWithPrefix); } const messageData: any = { source, sourceDevice, - serverId, // + (not present below in `createSentMessage`) + serverId, sent_at: timestamp, serverTimestamp, received_at: receivedAt || Date.now(), conversationId: groupId ?? source, type: 'incoming', direction: 'incoming', // + - unread: 1, // + - isPublic, // + + unread: 1, + isPublic, messageHash: messageHash || null, }; @@ -519,17 +519,17 @@ function createSentMessage(data: MessageCreationData): MessageModel { }; const messageGroupId = message?.group?.id; - let groupId = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null; - - if (groupId) { - groupId = PubKey.removeTextSecurePrefixIfNeeded(groupId); + const groupIdWithPrefix = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null; + let groupId: string | undefined; + if (groupIdWithPrefix) { + groupId = PubKey.removeTextSecurePrefixIfNeeded(groupIdWithPrefix); } const messageData = { source: UserUtils.getOurPubKeyStrFromCache(), sourceDevice, - serverTimestamp, - serverId, + serverTimestamp: serverTimestamp || undefined, + serverId: serverId || undefined, sent_at: timestamp, received_at: isPublic ? receivedAt : now, isPublic, @@ -550,17 +550,13 @@ export function createMessage(data: MessageCreationData, isIncoming: boolean): M } } -export interface MessageEvent { - data: any; - type: string; - confirm: () => void; -} - // tslint:disable:cyclomatic-complexity max-func-body-length */ -export async function handleMessageEvent(event: MessageEvent): Promise { - const { data, confirm } = event; - - const isIncoming = event.type === 'message'; +async function handleMessageEvent( + messageEventType: 'sent' | 'message', + data: MessageCreationData, + confirm: () => void +): Promise { + const isIncoming = messageEventType === 'message'; if (!data || !data.message) { window?.log?.warn('Invalid data passed to handleMessageEvent.', event); diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index 758499aab..075d413fd 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -37,7 +37,7 @@ interface ReqOptions { const incomingMessagePromises: Array> = []; -async function handleEnvelope(envelope: EnvelopePlus, messageHash?: string) { +async function handleEnvelope(envelope: EnvelopePlus, messageHash: string) { if (envelope.content && envelope.content.length > 0) { return handleContentMessage(envelope, messageHash); } @@ -69,7 +69,7 @@ class EnvelopeQueue { const envelopeQueue = new EnvelopeQueue(); -function queueEnvelope(envelope: EnvelopePlus, messageHash?: string) { +function queueEnvelope(envelope: EnvelopePlus, messageHash: string) { const id = getEnvelopeId(envelope); // window?.log?.info('queueing envelope', id); @@ -201,9 +201,9 @@ async function queueCached(item: any) { if (decrypted) { const payloadPlaintext = StringUtils.encode(decrypted, 'base64'); - queueDecryptedEnvelope(envelope, payloadPlaintext); + queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash); } else { - queueEnvelope(envelope); + queueEnvelope(envelope, envelope.messageHash); } } catch (error) { window?.log?.error( @@ -227,11 +227,11 @@ async function queueCached(item: any) { } } -function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer) { +function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer, messageHash: string) { const id = getEnvelopeId(envelope); window?.log?.info('queueing decrypted envelope', id); - const task = handleDecryptedEnvelope.bind(null, envelope, plaintext); + const task = handleDecryptedEnvelope.bind(null, envelope, plaintext, messageHash); const taskWithTimeout = createTaskWithTimeout(task, `queueEncryptedEnvelope ${id}`); try { envelopeQueue.add(taskWithTimeout); @@ -243,13 +243,17 @@ function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer) { } } -async function handleDecryptedEnvelope(envelope: EnvelopePlus, plaintext: ArrayBuffer) { +async function handleDecryptedEnvelope( + envelope: EnvelopePlus, + plaintext: ArrayBuffer, + messageHash: string +) { // if (this.stoppingProcessing) { // return Promise.resolve(); // } if (envelope.content) { - await innerHandleContentMessage(envelope, plaintext); + await innerHandleContentMessage(envelope, plaintext, messageHash); } else { await removeFromCache(envelope); } @@ -315,9 +319,10 @@ export async function handleOpenGroupV2Message( expirationStartTimestamp: undefined, source: sender, message: dataMessage, + messageHash: '', // we do not care of a hash for an opengroup message }; // WARNING this is important that the isMessageDuplicate is made in the conversation.queueJob - const isDuplicate = await isMessageDuplicate(messageCreationData); + const isDuplicate = await isMessageDuplicate({ ...messageCreationData }); if (isDuplicate) { window?.log?.info('Received duplicate message. Dropping it.'); diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index 840d6e043..00d45e987 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -96,7 +96,7 @@ export async function send( // and the isDuplicate messages relies on sent_at timestamp to be valid. const found = await getMessageById(message.identifier); - // make sure to not update the send timestamp if this a currently syncing message + // make sure to not update the sent timestamp if this a currently syncing message if (found && !found.get('sentSync')) { found.set({ sent_at: diffTimestamp }); await found.commit();