Fix attachments duplication on message syncing with current device

pull/2137/head
audric 3 years ago
parent e97ac5d7c7
commit fc916ce94c

@ -59,7 +59,7 @@ module.exports = {
removeMessage,
getUnreadByConversation,
getUnreadCountByConversation,
getMessageBySender,
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
getMessageBySenderAndTimestamp,
getMessageIdsFromServerIds,
@ -2145,17 +2145,15 @@ function getMessageById(id) {
return jsonToObject(row.json);
}
function getMessageBySender({ source, sourceDevice, sentAt }) {
function getMessageBySenderAndSentAt({ source, sentAt }) {
const rows = globalInstance
.prepare(
`SELECT json FROM ${MESSAGES_TABLE} WHERE
source = $source AND
sourceDevice = $sourceDevice AND
sent_at = $sent_at;`
)
.all({
source,
sourceDevice,
sent_at: sentAt,
});

@ -1359,7 +1359,6 @@
@include color-svg('../images/x-16.svg', $color-gray-60);
}
// Module: Search Results
.module-search-results {

@ -115,7 +115,7 @@ const channelsToMake = {
removeAllMessagesInConversation,
getMessageCount,
getMessageBySender,
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
getMessageBySenderAndTimestamp,
getMessageIdsFromServerIds,
@ -683,18 +683,15 @@ export async function getMessageById(
return new MessageModel(message);
}
export async function getMessageBySender({
export async function getMessageBySenderAndSentAt({
source,
sourceDevice,
sentAt,
}: {
source: string;
sourceDevice: number;
sentAt: number;
}): Promise<MessageModel | null> {
const messages = await channels.getMessageBySender({
const messages = await channels.getMessageBySenderAndSentAt({
source,
sourceDevice,
sentAt,
});
if (!messages || !messages.length) {
@ -854,11 +851,11 @@ export async function getUnprocessedCount(): Promise<number> {
return channels.getUnprocessedCount();
}
export async function getAllUnprocessed(): Promise<any> {
export async function getAllUnprocessed(): Promise<Array<UnprocessedParameter>> {
return channels.getAllUnprocessed();
}
export async function getUnprocessedById(id: string): Promise<any> {
export async function getUnprocessedById(id: string): Promise<UnprocessedParameter | undefined> {
return channels.getUnprocessedById(id);
}
@ -870,6 +867,8 @@ export type UnprocessedParameter = {
attempts: number;
messageHash: string;
senderIdentity?: string;
decrypted?: string; // added once the envelopes's content is decrypted with updateCache
source?: string; // added once the envelopes's content is decrypted with updateCache
};
export async function saveUnprocessed(data: UnprocessedParameter): Promise<string> {

@ -715,18 +715,17 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
const recipients = this.getRecipients();
const now = Date.now();
const networkTimestamp = now - getLatestTimestampOffset();
window?.log?.info(
'Sending message to conversation',
this.idForLogging(),
'with timestamp',
now
'with networkTimestamp: ',
networkTimestamp
);
const editedQuote = _.isEmpty(quote) ? undefined : quote;
const diffTimestamp = Date.now() - getLatestTimestampOffset();
const messageObject: MessageAttributesOptionals = {
type: 'outgoing',
body,
@ -734,7 +733,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
quote: editedQuote,
preview,
attachments,
sent_at: diffTimestamp,
sent_at: networkTimestamp,
received_at: now,
expireTimer,
recipients,

@ -25,7 +25,7 @@ import {
} from '../interactions/conversations/unsendingInteractions';
import { SettingsKey } from '../data/settings-key';
export async function handleContentMessage(envelope: EnvelopePlus, messageHash?: string) {
export async function handleContentMessage(envelope: EnvelopePlus, messageHash: string) {
try {
const plaintext = await decrypt(envelope, envelope.content);
@ -329,7 +329,7 @@ function shouldDropBlockedUserMessage(content: SignalService.Content): boolean {
export async function innerHandleContentMessage(
envelope: EnvelopePlus,
plaintext: ArrayBuffer,
messageHash?: string
messageHash: string
): Promise<void> {
try {
perfStart(`SignalService.Content.decode-${envelope.id}`);

@ -12,7 +12,10 @@ import { getConversationController } from '../session/conversations';
import { handleClosedGroupControlMessage } from './closedGroups';
import { MessageModel } from '../models/message';
import { MessageModelType } from '../models/messageType';
import { getMessageBySender, getMessageBySenderAndServerTimestamp } from '../../ts/data/data';
import {
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
} from '../../ts/data/data';
import { ConversationModel, ConversationTypeEnum } from '../models/conversation';
import { allowOnlyOneAtATime } from '../session/utils/Promise';
import { toHex } from '../session/utils/String';
@ -272,7 +275,7 @@ function isBodyEmpty(body: string) {
export async function handleDataMessage(
envelope: EnvelopePlus,
dataMessage: SignalService.IDataMessage,
messageHash?: string
messageHash: string
): Promise<void> {
// we handle group updates from our other devices in handleClosedGroupControlMessage()
if (dataMessage.closedGroupControlMessage) {
@ -314,15 +317,10 @@ export async function handleDataMessage(
return removeFromCache(envelope);
}
const ev: any = {};
if (isMe) {
// Data messages for medium groups don't arrive as sync messages. Instead,
// linked devices poll for group messages independently, thus they need
// to recognise some of those messages at their own.
ev.type = 'sent';
} else {
ev.type = 'message';
}
// Data messages for medium groups don't arrive as sync messages. Instead,
// linked devices poll for group messages independently, thus they need
// to recognise some of those messages at their own.
const messageEventType: 'sent' | 'message' = isMe ? 'sent' : 'message';
if (envelope.senderIdentity) {
message.group = {
@ -330,19 +328,22 @@ export async function handleDataMessage(
};
}
ev.confirm = () => removeFromCache(envelope);
const confirm = () => removeFromCache(envelope);
ev.data = {
const data: MessageCreationData = {
source: senderPubKey,
destination: isMe ? message.syncTarget : undefined,
destination: isMe ? message.syncTarget : envelope.source,
sourceDevice: 1,
timestamp: _.toNumber(envelope.timestamp),
receivedAt: envelope.receivedAt,
message,
messageHash,
isPublic: false,
serverId: null,
serverTimestamp: null,
};
await handleMessageEvent(ev); // dataMessage
await handleMessageEvent(messageEventType, data, confirm);
}
type MessageDuplicateSearchType = {
@ -354,8 +355,8 @@ type MessageDuplicateSearchType = {
export type MessageId = {
source: string;
serverId: number;
serverTimestamp: number;
serverId?: number | null;
serverTimestamp?: number | null;
sourceDevice: number;
timestamp: number;
message: MessageDuplicateSearchType;
@ -364,7 +365,6 @@ const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s
export async function isMessageDuplicate({
source,
sourceDevice,
timestamp,
message,
serverTimestamp,
@ -372,6 +372,7 @@ export async function isMessageDuplicate({
// serverTimestamp is only used for opengroupv2
try {
let result;
if (serverTimestamp) {
// first try to find a duplicate with the same serverTimestamp from this sender
@ -386,9 +387,8 @@ export async function isMessageDuplicate({
// but we consider that a user sending two messages with the same serverTimestamp is unlikely
return Boolean(result);
}
result = await getMessageBySender({
result = await getMessageBySenderAndSentAt({
source,
sourceDevice,
sentAt: timestamp,
});
@ -442,21 +442,21 @@ async function handleProfileUpdate(
}
}
export interface MessageCreationData {
export type MessageCreationData = {
timestamp: number;
isPublic: boolean;
receivedAt: number;
sourceDevice: number; // always 1 isn't it?
sourceDevice: number; // always 1 for Session
source: string;
serverId: number;
message: any;
serverTimestamp: any;
isPublic: boolean;
serverId: number | null;
serverTimestamp: number | null;
// Needed for synced outgoing messages
expirationStartTimestamp: any; // ???
expirationStartTimestamp?: any; // ???
destination: string;
messageHash?: string;
}
messageHash: string;
};
export function initIncomingMessage(data: MessageCreationData): MessageModel {
const {
@ -472,24 +472,24 @@ export function initIncomingMessage(data: MessageCreationData): MessageModel {
} = data;
const messageGroupId = message?.group?.id;
let groupId = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null;
if (groupId) {
groupId = PubKey.removeTextSecurePrefixIfNeeded(groupId);
const groupIdWithPrefix = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null;
let groupId: string | undefined;
if (groupIdWithPrefix) {
groupId = PubKey.removeTextSecurePrefixIfNeeded(groupIdWithPrefix);
}
const messageData: any = {
source,
sourceDevice,
serverId, // + (not present below in `createSentMessage`)
serverId,
sent_at: timestamp,
serverTimestamp,
received_at: receivedAt || Date.now(),
conversationId: groupId ?? source,
type: 'incoming',
direction: 'incoming', // +
unread: 1, // +
isPublic, // +
unread: 1,
isPublic,
messageHash: messageHash || null,
};
@ -519,17 +519,17 @@ function createSentMessage(data: MessageCreationData): MessageModel {
};
const messageGroupId = message?.group?.id;
let groupId = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null;
if (groupId) {
groupId = PubKey.removeTextSecurePrefixIfNeeded(groupId);
const groupIdWithPrefix = messageGroupId && messageGroupId.length > 0 ? messageGroupId : null;
let groupId: string | undefined;
if (groupIdWithPrefix) {
groupId = PubKey.removeTextSecurePrefixIfNeeded(groupIdWithPrefix);
}
const messageData = {
source: UserUtils.getOurPubKeyStrFromCache(),
sourceDevice,
serverTimestamp,
serverId,
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
sent_at: timestamp,
received_at: isPublic ? receivedAt : now,
isPublic,
@ -550,17 +550,13 @@ export function createMessage(data: MessageCreationData, isIncoming: boolean): M
}
}
export interface MessageEvent {
data: any;
type: string;
confirm: () => void;
}
// tslint:disable:cyclomatic-complexity max-func-body-length */
export async function handleMessageEvent(event: MessageEvent): Promise<void> {
const { data, confirm } = event;
const isIncoming = event.type === 'message';
async function handleMessageEvent(
messageEventType: 'sent' | 'message',
data: MessageCreationData,
confirm: () => void
): Promise<void> {
const isIncoming = messageEventType === 'message';
if (!data || !data.message) {
window?.log?.warn('Invalid data passed to handleMessageEvent.', event);

@ -37,7 +37,7 @@ interface ReqOptions {
const incomingMessagePromises: Array<Promise<any>> = [];
async function handleEnvelope(envelope: EnvelopePlus, messageHash?: string) {
async function handleEnvelope(envelope: EnvelopePlus, messageHash: string) {
if (envelope.content && envelope.content.length > 0) {
return handleContentMessage(envelope, messageHash);
}
@ -69,7 +69,7 @@ class EnvelopeQueue {
const envelopeQueue = new EnvelopeQueue();
function queueEnvelope(envelope: EnvelopePlus, messageHash?: string) {
function queueEnvelope(envelope: EnvelopePlus, messageHash: string) {
const id = getEnvelopeId(envelope);
// window?.log?.info('queueing envelope', id);
@ -201,9 +201,9 @@ async function queueCached(item: any) {
if (decrypted) {
const payloadPlaintext = StringUtils.encode(decrypted, 'base64');
queueDecryptedEnvelope(envelope, payloadPlaintext);
queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash);
} else {
queueEnvelope(envelope);
queueEnvelope(envelope, envelope.messageHash);
}
} catch (error) {
window?.log?.error(
@ -227,11 +227,11 @@ async function queueCached(item: any) {
}
}
function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer) {
function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer, messageHash: string) {
const id = getEnvelopeId(envelope);
window?.log?.info('queueing decrypted envelope', id);
const task = handleDecryptedEnvelope.bind(null, envelope, plaintext);
const task = handleDecryptedEnvelope.bind(null, envelope, plaintext, messageHash);
const taskWithTimeout = createTaskWithTimeout(task, `queueEncryptedEnvelope ${id}`);
try {
envelopeQueue.add(taskWithTimeout);
@ -243,13 +243,17 @@ function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer) {
}
}
async function handleDecryptedEnvelope(envelope: EnvelopePlus, plaintext: ArrayBuffer) {
async function handleDecryptedEnvelope(
envelope: EnvelopePlus,
plaintext: ArrayBuffer,
messageHash: string
) {
// if (this.stoppingProcessing) {
// return Promise.resolve();
// }
if (envelope.content) {
await innerHandleContentMessage(envelope, plaintext);
await innerHandleContentMessage(envelope, plaintext, messageHash);
} else {
await removeFromCache(envelope);
}
@ -315,9 +319,10 @@ export async function handleOpenGroupV2Message(
expirationStartTimestamp: undefined,
source: sender,
message: dataMessage,
messageHash: '', // we do not care of a hash for an opengroup message
};
// WARNING this is important that the isMessageDuplicate is made in the conversation.queueJob
const isDuplicate = await isMessageDuplicate(messageCreationData);
const isDuplicate = await isMessageDuplicate({ ...messageCreationData });
if (isDuplicate) {
window?.log?.info('Received duplicate message. Dropping it.');

@ -96,7 +96,7 @@ export async function send(
// and the isDuplicate messages relies on sent_at timestamp to be valid.
const found = await getMessageById(message.identifier);
// make sure to not update the send timestamp if this a currently syncing message
// make sure to not update the sent timestamp if this a currently syncing message
if (found && !found.get('sentSync')) {
found.set({ sent_at: diffTimestamp });
await found.commit();

Loading…
Cancel
Save