From 722f240f3d01e9f9282bbfc383d569ecbe7c6489 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Mon, 7 Jun 2021 15:54:44 +1000 Subject: [PATCH] fix deduplication based on serverTimestamp --- app/sql.js | 15 +++++++ ts/components/conversation/Message.tsx | 2 - ts/data/data.ts | 19 +++++++++ ts/receiver/dataMessage.ts | 56 +++++++++++++------------- ts/receiver/receiver.ts | 52 ++++++++++++------------ 5 files changed, 89 insertions(+), 55 deletions(-) diff --git a/app/sql.js b/app/sql.js index e18322a3d..e5d9d2f07 100644 --- a/app/sql.js +++ b/app/sql.js @@ -73,6 +73,7 @@ module.exports = { getUnreadCountByConversation, getMessageBySender, getMessageBySenderAndServerId, + getMessageBySenderAndServerTimestamp, getMessageIdsFromServerIds, getMessageById, getAllMessages, @@ -2047,6 +2048,20 @@ async function getMessageBySenderAndServerId({ source, serverId }) { return map(rows, row => jsonToObject(row.json)); } +async function getMessageBySenderAndServerTimestamp({ source, serverTimestamp }) { + const rows = await db.all( + `SELECT json FROM ${MESSAGES_TABLE} WHERE + source = $source AND + serverTimestamp = $serverTimestamp;`, + { + $source: source, + $serverTimestamp: serverTimestamp, + } + ); + + return map(rows, row => jsonToObject(row.json)); +} + async function getUnreadByConversation(conversationId) { const rows = await db.all( `SELECT json FROM ${MESSAGES_TABLE} WHERE diff --git a/ts/components/conversation/Message.tsx b/ts/components/conversation/Message.tsx index cb8b7ab52..d3f181ca7 100644 --- a/ts/components/conversation/Message.tsx +++ b/ts/components/conversation/Message.tsx @@ -70,8 +70,6 @@ import { InView } from 'react-intersection-observer'; import { useTheme, withTheme } from 'styled-components'; import { MessageMetadata } from './message/MessageMetadata'; import { PubKey } from '../../session/types'; -import { ToastUtils, UserUtils } from '../../session/utils'; -import { ConversationController } from '../../session/conversations'; import { MessageRegularProps } from '../../models/messageType'; import { useEncryptedFileFetch } from '../../hooks/useEncryptedFileFetch'; import { addSenderAsModerator, removeSenderFromModerator } from '../../interactions/message'; diff --git a/ts/data/data.ts b/ts/data/data.ts index 275926369..6f806e3d6 100644 --- a/ts/data/data.ts +++ b/ts/data/data.ts @@ -119,6 +119,7 @@ const channelsToMake = { getMessageBySender, getMessageBySenderAndServerId, + getMessageBySenderAndServerTimestamp, getMessageIdsFromServerIds, getMessageById, getAllMessages, @@ -761,6 +762,24 @@ export async function getMessageBySenderAndServerId({ return new MessageModel(messages[0]); } +export async function getMessageBySenderAndServerTimestamp({ + source, + serverTimestamp, +}: { + source: string; + serverTimestamp: number; +}): Promise { + const messages = await channels.getMessageBySenderAndServerTimestamp({ + source, + serverTimestamp, + }); + if (!messages || !messages.length) { + return null; + } + + return new MessageModel(messages[0]); +} + export async function getUnreadByConversation(conversationId: string): Promise { const messages = await channels.getUnreadByConversation(conversationId); return new MessageCollection(messages); diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index 0c05ba87b..92eee5fb6 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -13,7 +13,11 @@ import { ConversationController } from '../session/conversations'; import { handleClosedGroupControlMessage } from './closedGroups'; import { MessageModel } from '../models/message'; import { MessageModelType } from '../models/messageType'; -import { getMessageBySender, getMessageBySenderAndServerId } from '../../ts/data/data'; +import { + getMessageBySender, + getMessageBySenderAndServerId, + getMessageBySenderAndServerTimestamp, +} from '../../ts/data/data'; import { ConversationModel, ConversationTypeEnum } from '../models/conversation'; import { DeliveryReceiptMessage } from '../session/messages/outgoing/controlMessage/receipt/DeliveryReceiptMessage'; import { allowOnlyOneAtATime } from '../session/utils/Promise'; @@ -359,6 +363,7 @@ type MessageDuplicateSearchType = { export type MessageId = { source: string; serverId: number; + serverTimestamp: number; sourceDevice: number; timestamp: number; message: MessageDuplicateSearchType; @@ -371,16 +376,33 @@ export async function isMessageDuplicate({ timestamp, message, serverId, + serverTimestamp, }: MessageId) { const { Errors } = window.Signal.Types; // serverId is only used for opengroupv2 try { let result; - if (serverId) { - result = await getMessageBySenderAndServerId({ - source, - serverId, - }); + if (serverId || serverTimestamp) { + // first try to find a duplicate serverId from this sender + if (serverId) { + result = await getMessageBySenderAndServerId({ + source, + serverId, + }); + } + // if no result, try to find a duplicate with the same serverTimestamp from this sender + if (!result && serverTimestamp) { + result = await getMessageBySenderAndServerTimestamp({ + source, + serverTimestamp, + }); + } + // if we have a result, it means a specific user sent two messages either with the same + // serverId or the same serverTimestamp. + // no need to do anything else, those messages must be the same + // Note: this test is not based on which conversation the user sent the message + // but we consider that a user sending two messages with the same serverTimestamp is unlikely + return Boolean(result); } else { result = await getMessageBySender({ source, @@ -392,9 +414,6 @@ export async function isMessageDuplicate({ return false; } const filteredResult = [result].filter((m: any) => m.attributes.body === message.body); - if (serverId) { - return filteredResult.some(m => isDuplicateServerId(m, { ...message, serverId }, source)); - } return filteredResult.some(m => isDuplicate(m, message, source)); } catch (error) { window?.log?.error('isMessageDuplicate error:', Errors.toLogFormat(error)); @@ -402,25 +421,6 @@ export async function isMessageDuplicate({ } } -/** - * This function is to be used to check for duplicates for open group v2 messages. - * It just check that the sender and the serverId of a received and an already saved messages are the same - */ -export const isDuplicateServerId = ( - m: MessageModel, - testedMessage: MessageDuplicateSearchType, - source: string -) => { - // The username in this case is the users pubKey - const sameUsername = m.attributes.source === source; - // testedMessage.id is needed as long as we support opengroupv1 - const sameServerId = - m.attributes.serverId !== undefined && - (testedMessage.serverId || testedMessage.id) === m.attributes.serverId; - - return sameUsername && sameServerId; -}; - export const isDuplicate = ( m: MessageModel, testedMessage: MessageDuplicateSearchType, diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index a67ce37f1..e5c0919d8 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -340,35 +340,10 @@ export async function handleOpenGroupV2Message( window?.log?.error('Received a message for an unknown convo. Skipping'); return; } - const isMe = UserUtils.isUsFromCache(sender); - // for an opengroupv2 incoming message the serverTimestamp and the timestamp - const messageCreationData: MessageCreationData = { - isPublic: true, - sourceDevice: 1, - serverId, - serverTimestamp: sentTimestamp, - receivedAt: Date.now(), - destination: conversationId, - timestamp: sentTimestamp, - unidentifiedStatus: undefined, - expirationStartTimestamp: undefined, - source: sender, - message: dataMessage, - }; - - if (await isMessageDuplicate(messageCreationData)) { - window?.log?.info('Received duplicate message. Dropping it.'); - return; - } - - // this line just create an empty message with some basic stuff set. - // the whole decoding of data is happening in handleMessageJob() - const msg = createMessage(messageCreationData, !isMe); // if the message is `sent` (from secondary device) we have to set the sender manually... (at least for now) // source = source || msg.get('source'); - const ourNumber = UserUtils.getOurPubKeyStrFromCache(); const conversation = await ConversationController.getInstance().getOrCreateAndWait( conversationId, ConversationTypeEnum.GROUP @@ -380,6 +355,33 @@ export async function handleOpenGroupV2Message( } conversation.queueJob(async () => { + const isMe = UserUtils.isUsFromCache(sender); + // for an opengroupv2 incoming message the serverTimestamp and the timestamp + const messageCreationData: MessageCreationData = { + isPublic: true, + sourceDevice: 1, + serverId, + serverTimestamp: sentTimestamp, + receivedAt: Date.now(), + destination: conversationId, + timestamp: sentTimestamp, + unidentifiedStatus: undefined, + expirationStartTimestamp: undefined, + source: sender, + message: dataMessage, + }; + // WARNING this is very important that the isMessageDuplicate is made in the conversation.queueJob + const isDuplicate = await isMessageDuplicate(messageCreationData); + if (isDuplicate) { + window?.log?.info('Received duplicate message. Dropping it.'); + return; + } + + // this line just create an empty message with some basic stuff set. + // the whole decoding of data is happening in handleMessageJob() + const msg = createMessage(messageCreationData, !isMe); + const ourNumber = UserUtils.getOurPubKeyStrFromCache(); + await handleMessageJob(msg, conversation, decoded?.dataMessage, ourNumber, noop, sender); }); }