fix deduplication based on serverTimestamp

pull/1683/head
Audric Ackermann 4 years ago
parent bf9a3e6fad
commit 722f240f3d
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -73,6 +73,7 @@ module.exports = {
getUnreadCountByConversation,
getMessageBySender,
getMessageBySenderAndServerId,
getMessageBySenderAndServerTimestamp,
getMessageIdsFromServerIds,
getMessageById,
getAllMessages,
@ -2047,6 +2048,20 @@ async function getMessageBySenderAndServerId({ source, serverId }) {
return map(rows, row => jsonToObject(row.json));
}
async function getMessageBySenderAndServerTimestamp({ source, serverTimestamp }) {
const rows = await db.all(
`SELECT json FROM ${MESSAGES_TABLE} WHERE
source = $source AND
serverTimestamp = $serverTimestamp;`,
{
$source: source,
$serverTimestamp: serverTimestamp,
}
);
return map(rows, row => jsonToObject(row.json));
}
async function getUnreadByConversation(conversationId) {
const rows = await db.all(
`SELECT json FROM ${MESSAGES_TABLE} WHERE

@ -70,8 +70,6 @@ import { InView } from 'react-intersection-observer';
import { useTheme, withTheme } from 'styled-components';
import { MessageMetadata } from './message/MessageMetadata';
import { PubKey } from '../../session/types';
import { ToastUtils, UserUtils } from '../../session/utils';
import { ConversationController } from '../../session/conversations';
import { MessageRegularProps } from '../../models/messageType';
import { useEncryptedFileFetch } from '../../hooks/useEncryptedFileFetch';
import { addSenderAsModerator, removeSenderFromModerator } from '../../interactions/message';

@ -119,6 +119,7 @@ const channelsToMake = {
getMessageBySender,
getMessageBySenderAndServerId,
getMessageBySenderAndServerTimestamp,
getMessageIdsFromServerIds,
getMessageById,
getAllMessages,
@ -761,6 +762,24 @@ export async function getMessageBySenderAndServerId({
return new MessageModel(messages[0]);
}
export async function getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
}: {
source: string;
serverTimestamp: number;
}): Promise<MessageModel | null> {
const messages = await channels.getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
});
if (!messages || !messages.length) {
return null;
}
return new MessageModel(messages[0]);
}
export async function getUnreadByConversation(conversationId: string): Promise<MessageCollection> {
const messages = await channels.getUnreadByConversation(conversationId);
return new MessageCollection(messages);

@ -13,7 +13,11 @@ import { ConversationController } from '../session/conversations';
import { handleClosedGroupControlMessage } from './closedGroups';
import { MessageModel } from '../models/message';
import { MessageModelType } from '../models/messageType';
import { getMessageBySender, getMessageBySenderAndServerId } from '../../ts/data/data';
import {
getMessageBySender,
getMessageBySenderAndServerId,
getMessageBySenderAndServerTimestamp,
} from '../../ts/data/data';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import { DeliveryReceiptMessage } from '../session/messages/outgoing/controlMessage/receipt/DeliveryReceiptMessage';
import { allowOnlyOneAtATime } from '../session/utils/Promise';
@ -359,6 +363,7 @@ type MessageDuplicateSearchType = {
export type MessageId = {
source: string;
serverId: number;
serverTimestamp: number;
sourceDevice: number;
timestamp: number;
message: MessageDuplicateSearchType;
@ -371,16 +376,33 @@ export async function isMessageDuplicate({
timestamp,
message,
serverId,
serverTimestamp,
}: MessageId) {
const { Errors } = window.Signal.Types;
// serverId is only used for opengroupv2
try {
let result;
if (serverId) {
result = await getMessageBySenderAndServerId({
source,
serverId,
});
if (serverId || serverTimestamp) {
// first try to find a duplicate serverId from this sender
if (serverId) {
result = await getMessageBySenderAndServerId({
source,
serverId,
});
}
// if no result, try to find a duplicate with the same serverTimestamp from this sender
if (!result && serverTimestamp) {
result = await getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
});
}
// if we have a result, it means a specific user sent two messages either with the same
// serverId or the same serverTimestamp.
// no need to do anything else, those messages must be the same
// Note: this test is not based on which conversation the user sent the message
// but we consider that a user sending two messages with the same serverTimestamp is unlikely
return Boolean(result);
} else {
result = await getMessageBySender({
source,
@ -392,9 +414,6 @@ export async function isMessageDuplicate({
return false;
}
const filteredResult = [result].filter((m: any) => m.attributes.body === message.body);
if (serverId) {
return filteredResult.some(m => isDuplicateServerId(m, { ...message, serverId }, source));
}
return filteredResult.some(m => isDuplicate(m, message, source));
} catch (error) {
window?.log?.error('isMessageDuplicate error:', Errors.toLogFormat(error));
@ -402,25 +421,6 @@ export async function isMessageDuplicate({
}
}
/**
* This function is to be used to check for duplicates for open group v2 messages.
* It just check that the sender and the serverId of a received and an already saved messages are the same
*/
export const isDuplicateServerId = (
m: MessageModel,
testedMessage: MessageDuplicateSearchType,
source: string
) => {
// The username in this case is the users pubKey
const sameUsername = m.attributes.source === source;
// testedMessage.id is needed as long as we support opengroupv1
const sameServerId =
m.attributes.serverId !== undefined &&
(testedMessage.serverId || testedMessage.id) === m.attributes.serverId;
return sameUsername && sameServerId;
};
export const isDuplicate = (
m: MessageModel,
testedMessage: MessageDuplicateSearchType,

@ -340,35 +340,10 @@ export async function handleOpenGroupV2Message(
window?.log?.error('Received a message for an unknown convo. Skipping');
return;
}
const isMe = UserUtils.isUsFromCache(sender);
// for an opengroupv2 incoming message the serverTimestamp and the timestamp
const messageCreationData: MessageCreationData = {
isPublic: true,
sourceDevice: 1,
serverId,
serverTimestamp: sentTimestamp,
receivedAt: Date.now(),
destination: conversationId,
timestamp: sentTimestamp,
unidentifiedStatus: undefined,
expirationStartTimestamp: undefined,
source: sender,
message: dataMessage,
};
if (await isMessageDuplicate(messageCreationData)) {
window?.log?.info('Received duplicate message. Dropping it.');
return;
}
// this line just create an empty message with some basic stuff set.
// the whole decoding of data is happening in handleMessageJob()
const msg = createMessage(messageCreationData, !isMe);
// if the message is `sent` (from secondary device) we have to set the sender manually... (at least for now)
// source = source || msg.get('source');
const ourNumber = UserUtils.getOurPubKeyStrFromCache();
const conversation = await ConversationController.getInstance().getOrCreateAndWait(
conversationId,
ConversationTypeEnum.GROUP
@ -380,6 +355,33 @@ export async function handleOpenGroupV2Message(
}
conversation.queueJob(async () => {
const isMe = UserUtils.isUsFromCache(sender);
// for an opengroupv2 incoming message the serverTimestamp and the timestamp
const messageCreationData: MessageCreationData = {
isPublic: true,
sourceDevice: 1,
serverId,
serverTimestamp: sentTimestamp,
receivedAt: Date.now(),
destination: conversationId,
timestamp: sentTimestamp,
unidentifiedStatus: undefined,
expirationStartTimestamp: undefined,
source: sender,
message: dataMessage,
};
// WARNING this is very important that the isMessageDuplicate is made in the conversation.queueJob
const isDuplicate = await isMessageDuplicate(messageCreationData);
if (isDuplicate) {
window?.log?.info('Received duplicate message. Dropping it.');
return;
}
// this line just create an empty message with some basic stuff set.
// the whole decoding of data is happening in handleMessageJob()
const msg = createMessage(messageCreationData, !isMe);
const ourNumber = UserUtils.getOurPubKeyStrFromCache();
await handleMessageJob(msg, conversation, decoded?.dataMessage, ourNumber, noop, sender);
});
}

Loading…
Cancel
Save