fix: already read DaR messages start with right time left on receive

pull/2940/head
Audric Ackermann 1 year ago
parent fd4aedb54b
commit 98fd834367

@ -1028,7 +1028,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
const expireUpdate = await DisappearingMessages.checkForExpireUpdateInContentMessage(
content,
conversation,
true
null
);
const syncMessage = buildSyncMessage(
@ -1130,15 +1130,16 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
true
);
}
this.set({
expirationStartTimestamp: DisappearingMessages.setExpirationStartTimestamp(
expirationMode,
readAt,
'markMessageReadNoCommit',
this.get('id')
),
});
if (!this.getExpirationStartTimestamp()) {
this.set({
expirationStartTimestamp: DisappearingMessages.setExpirationStartTimestamp(
expirationMode,
readAt,
'markMessageReadNoCommit',
this.get('id')
),
});
}
}
}

@ -54,13 +54,14 @@ export function createSwarmMessageSentFromNotUs(args: {
sender: string;
conversationId: string;
}): MessageModel {
const messageData: MessageAttributesOptionals = {
const messageAttributes: MessageAttributesOptionals = {
...getSharedAttributesForSwarmMessage(args),
...getSharedAttributesForIncomingMessage(),
source: args.sender,
};
return new MessageModel(messageData);
markAttributesAsReadIfNeeded(messageAttributes);
return new MessageModel(messageAttributes);
}
function getSharedAttributesForPublicMessage({
@ -98,7 +99,7 @@ function getSharedAttributesForOutgoingMessage() {
function getSharedAttributesForIncomingMessage() {
return {
unread: READ_MESSAGE_STATE.unread,
unread: READ_MESSAGE_STATE.unread, // default to unread, but markAttributesAsReadIfNeeded will override it if needed
type: 'incoming' as MessageModelType,
direction: 'incoming' as MessageModelType,
};

@ -38,7 +38,11 @@ import { getAllCachedECKeyPair, sentAtMoreRecentThanWrapper } from './closedGrou
import { ConfigMessageHandler } from './configMessage';
import { ECKeyPair } from './keypairs';
export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) {
export async function handleSwarmContentMessage(
envelope: EnvelopePlus,
messageHash: string,
messageExpirationFromRetrieve: number | null
) {
try {
const plaintext = await decrypt(envelope);
@ -51,7 +55,13 @@ export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageH
const sentAtTimestamp = toNumber(envelope.timestamp);
// swarm messages already comes with a timestamp in milliseconds, so this sentAtTimestamp is correct.
// the sogs messages do not come as milliseconds but just seconds, so we override it
await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash);
await innerHandleSwarmContentMessage({
envelope,
sentAtTimestamp,
plaintext,
messageHash,
messageExpirationFromRetrieve,
});
} catch (e) {
window?.log?.warn(e.message);
}
@ -386,12 +396,19 @@ function shouldDropBlockedUserMessage(
return !isControlDataMessageOnly;
}
export async function innerHandleSwarmContentMessage(
envelope: EnvelopePlus,
sentAtTimestamp: number,
plaintext: ArrayBuffer,
messageHash: string
): Promise<void> {
export async function innerHandleSwarmContentMessage({
envelope,
messageHash,
plaintext,
sentAtTimestamp,
messageExpirationFromRetrieve,
}: {
envelope: EnvelopePlus;
sentAtTimestamp: number;
plaintext: ArrayBuffer;
messageHash: string;
messageExpirationFromRetrieve: number | null;
}): Promise<void> {
try {
perfStart(`SignalService.Content.decode-${envelope.id}`);
window.log.info('innerHandleSwarmContentMessage');
@ -472,7 +489,8 @@ export async function innerHandleSwarmContentMessage(
const expireUpdate = await DisappearingMessages.checkForExpireUpdateInContentMessage(
content,
conversationModelForUIUpdate
conversationModelForUIUpdate,
messageExpirationFromRetrieve
);
// TODO legacy messages support will be removed in a future release
if (expireUpdate?.isDisappearingMessagesV2Released) {
@ -488,14 +506,14 @@ export async function innerHandleSwarmContentMessage(
}
perfStart(`handleSwarmDataMessage-${envelope.id}`);
await handleSwarmDataMessage(
await handleSwarmDataMessage({
envelope,
sentAtTimestamp,
content.dataMessage as SignalService.DataMessage,
rawDataMessage: content.dataMessage as SignalService.DataMessage,
messageHash,
senderConversationModel,
expireUpdate
);
expireUpdate,
});
perfEnd(`handleSwarmDataMessage-${envelope.id}`, 'handleSwarmDataMessage');
return;
}

@ -153,14 +153,21 @@ export function cleanIncomingDataMessage(
* * dataMessage.syncTarget is either the group public key OR the private conversation this message is about.
*/
export async function handleSwarmDataMessage(
envelope: EnvelopePlus,
sentAtTimestamp: number,
rawDataMessage: SignalService.DataMessage,
messageHash: string,
senderConversationModel: ConversationModel,
expireUpdate?: DisappearingMessageUpdate
): Promise<void> {
export async function handleSwarmDataMessage({
envelope,
messageHash,
rawDataMessage,
senderConversationModel,
sentAtTimestamp,
expireUpdate,
}: {
envelope: EnvelopePlus;
sentAtTimestamp: number;
rawDataMessage: SignalService.DataMessage;
messageHash: string;
senderConversationModel: ConversationModel;
expireUpdate?: DisappearingMessageUpdate;
}): Promise<void> {
window.log.info('handleSwarmDataMessage');
const cleanDataMessage = cleanIncomingDataMessage(rawDataMessage, envelope);

@ -392,7 +392,7 @@ export async function handleMessageJob(
if (
conversation &&
messageModel.getExpireTimer() > 0 &&
Boolean(messageModel.getExpirationStartTimestamp()) === false
!messageModel.getExpirationStartTimestamp()
) {
const expirationMode = DisappearingMessages.changeToDisappearingConversationMode(
conversation,

@ -1,6 +1,6 @@
/* eslint-disable more/no-then */
import { v4 as uuidv4 } from 'uuid';
import _ from 'lodash';
import { v4 as uuidv4 } from 'uuid';
import { EnvelopePlus } from './types';
@ -21,9 +21,13 @@ export { downloadAttachment } from './attachments';
const incomingMessagePromises: Array<Promise<any>> = [];
async function handleSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) {
async function handleSwarmEnvelope(
envelope: EnvelopePlus,
messageHash: string,
messageExpiration: number | null
) {
if (envelope.content && envelope.content.length > 0) {
return handleSwarmContentMessage(envelope, messageHash);
return handleSwarmContentMessage(envelope, messageHash, messageExpiration);
}
await removeFromCache(envelope);
@ -53,9 +57,13 @@ class EnvelopeQueue {
const envelopeQueue = new EnvelopeQueue();
function queueSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) {
function queueSwarmEnvelope(
envelope: EnvelopePlus,
messageHash: string,
messageExpiration: number | null
) {
const id = getEnvelopeId(envelope);
const task = handleSwarmEnvelope.bind(null, envelope, messageHash);
const task = handleSwarmEnvelope.bind(null, envelope, messageHash, messageExpiration);
const taskWithTimeout = createTaskWithTimeout(task, `queueSwarmEnvelope ${id}`);
try {
@ -74,7 +82,8 @@ async function handleRequestDetail(
plaintext: Uint8Array,
inConversation: string | null,
lastPromise: Promise<any>,
messageHash: string
messageHash: string,
messageExpiration: number
): Promise<void> {
const envelope: any = SignalService.Envelope.decode(plaintext);
@ -115,7 +124,7 @@ async function handleRequestDetail(
// To ensure that we queue in the same order we receive messages
await lastPromise;
queueSwarmEnvelope(envelope, messageHash);
queueSwarmEnvelope(envelope, messageHash, messageExpiration);
} catch (error) {
window?.log?.error(
'handleRequest error trying to add message to cache:',
@ -131,15 +140,20 @@ async function handleRequestDetail(
export function handleRequest(
plaintext: Uint8Array,
inConversation: string | null,
messageHash: string
messageHash: string,
messageExpiration: number
): void {
const lastPromise = _.last(incomingMessagePromises) || Promise.resolve();
const promise = handleRequestDetail(plaintext, inConversation, lastPromise, messageHash).catch(
e => {
window?.log?.error('Error handling incoming message:', e && e.stack ? e.stack : e);
}
);
const promise = handleRequestDetail(
plaintext,
inConversation,
lastPromise,
messageHash,
messageExpiration
).catch(e => {
window?.log?.error('Error handling incoming message:', e && e.stack ? e.stack : e);
});
incomingMessagePromises.push(promise);
}
@ -182,10 +196,11 @@ async function queueCached(item: UnprocessedParameter) {
if (decrypted) {
const payloadPlaintext = StringUtils.encode(decrypted, 'base64');
queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash);
// TODO we don't store the expiration in the cache, but we want to get rid of the cache at some point
queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash, null);
} else {
queueSwarmEnvelope(envelope, envelope.messageHash);
// TODO we don't store the expiration in the cache, but we want to get rid of the cache at some point
queueSwarmEnvelope(envelope, envelope.messageHash, null);
}
} catch (error) {
window?.log?.error(
@ -208,11 +223,22 @@ async function queueCached(item: UnprocessedParameter) {
}
}
function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer, messageHash: string) {
function queueDecryptedEnvelope(
envelope: any,
plaintext: ArrayBuffer,
messageHash: string,
messageExpiration: number | null
) {
const id = getEnvelopeId(envelope);
window?.log?.info('queueing decrypted envelope', id);
const task = handleDecryptedEnvelope.bind(null, envelope, plaintext, messageHash);
const task = handleDecryptedEnvelope.bind(
null,
envelope,
plaintext,
messageHash,
messageExpiration
);
const taskWithTimeout = createTaskWithTimeout(task, `queueEncryptedEnvelope ${id}`);
try {
envelopeQueue.add(taskWithTimeout);
@ -227,12 +253,19 @@ function queueDecryptedEnvelope(envelope: any, plaintext: ArrayBuffer, messageHa
async function handleDecryptedEnvelope(
envelope: EnvelopePlus,
plaintext: ArrayBuffer,
messageHash: string
messageHash: string,
messageExpirationFromRetrieve: number | null
) {
if (envelope.content) {
const sentAtTimestamp = _.toNumber(envelope.timestamp);
await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash);
await innerHandleSwarmContentMessage({
envelope,
sentAtTimestamp,
plaintext,
messageHash,
messageExpirationFromRetrieve,
});
} else {
await removeFromCache(envelope);
}

@ -474,12 +474,13 @@ async function handleInboxOutboxMessages(
window.log.warn('tryMatchBlindWithStandardKey could not veriyfy');
}
await innerHandleSwarmContentMessage(
builtEnvelope,
postedAtInMs,
builtEnvelope.content,
''
);
await innerHandleSwarmContentMessage({
envelope: builtEnvelope,
sentAtTimestamp: postedAtInMs,
plaintext: builtEnvelope.content,
messageHash: '',
messageExpirationFromRetrieve: null, // sogs message do not expire
});
}
} catch (e) {
window.log.warn('handleOutboxMessages failed with:', e.message);

@ -130,7 +130,6 @@ async function retrieveNextMessages(
4000,
associatedWith
);
if (!results || !results.length) {
window?.log?.warn(
`_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`

@ -338,7 +338,12 @@ export class SwarmPolling {
return;
}
Receiver.handleRequest(content.body, isGroup ? polledPubkey : null, content.messageHash);
Receiver.handleRequest(
content.body,
isGroup ? polledPubkey : null,
content.messageHash,
m.expiration
);
});
}
}

@ -1,13 +1,13 @@
import crypto from 'crypto';
import libsodiumwrappers from 'libsodium-wrappers-sumo';
import * as MessageEncrypter from './MessageEncrypter';
import * as DecryptedAttachmentsManager from './DecryptedAttachmentsManager';
import * as MessageEncrypter from './MessageEncrypter';
import { toHex } from '../utils/String';
import { ECKeyPair } from '../../receiver/keypairs';
import { toHex } from '../utils/String';
export { MessageEncrypter, DecryptedAttachmentsManager };
export { DecryptedAttachmentsManager, MessageEncrypter };
export type LibSodiumWrappers = typeof libsodiumwrappers;
@ -67,8 +67,6 @@ export async function generateGroupV3Keypair() {
preprendedPubkey.set(publicKey, 1);
preprendedPubkey[0] = 3;
// console.warn(`generateGroupV3Keypair: pubkey${toHex(preprendedPubkey)}`);
return { pubkey: toHex(preprendedPubkey), privateKey: toHex(ed25519KeyPair.privateKey) };
}

@ -4,6 +4,7 @@ import { initWallClockListener } from '../../util/wallClockListener';
import { Data } from '../../data/data';
import { ConversationModel } from '../../models/conversation';
import { READ_MESSAGE_STATE } from '../../models/conversationAttributes';
import { MessageModel } from '../../models/message';
import { SignalService } from '../../protobuf';
import { ReleasedFeatures } from '../../util/releaseFeature';
@ -105,8 +106,9 @@ async function checkExpiringMessages() {
if (!expiresAt || !isNumber(expiresAt)) {
return;
}
window.log.info('next message expires', new Date(expiresAt).toISOString());
window.log.info('next message expires in ', (expiresAt - Date.now()) / 1000);
const ms = expiresAt - Date.now();
window.log.info(`message expires in ${ms}ms, or ${ms / 1000}s, or ${ms / (3600 * 1000)}h`);
let wait = expiresAt - Date.now();
@ -279,7 +281,7 @@ function changeToDisappearingConversationMode(
async function checkForExpireUpdateInContentMessage(
content: SignalService.Content,
convoToUpdate: ConversationModel,
_isOutgoing?: boolean
messageExpirationFromRetrieve: number | null
): Promise<DisappearingMessageUpdate | undefined> {
const dataMessage = content.dataMessage as SignalService.DataMessage;
// We will only support legacy disappearing messages for a short period before disappearing messages v2 is unlocked
@ -314,6 +316,7 @@ async function checkForExpireUpdateInContentMessage(
isLegacyConversationSettingMessage,
isLegacyDataMessage,
isDisappearingMessagesV2Released,
messageExpirationFromRetrieve,
};
// NOTE some platforms do not include the diappearing message values in the Data Message for sent messages so we have to trust the conversation settings until v2 is released
@ -415,30 +418,54 @@ function getMessageReadyToDisappear(
return messageModel;
}
const { expirationType, expirationTimer: expireTimer } = expireUpdate;
const {
expirationType,
expirationTimer: expireTimer,
messageExpirationFromRetrieve,
} = expireUpdate;
/**
* This is quite tricky, but when we receive a message from the network, it might be a disappearing after read one, which was already read by another device.
* If that's the case, we need to not only mark the message as read, but also mark it as read at the right time.
* So that a message read 20h ago, and expiring 24h after read, has only 4h to live on this device too.
*
* A message is marked as read when created, if the convo volatile update reports that it should have been read (check `markAttributesAsReadIfNeeded()` if needed).
* That means that here, if we have a message
* - read,
* - incoming,
* - and disappearing after read,
* we have to force its expirationStartTimestamp and expire_at fields so they are in sync with our other devices.
*/
messageModel.set({
expirationType,
expireTimer,
});
if (
conversationModel.isPrivate() &&
messageModel.isIncoming() &&
expirationType === 'deleteAfterRead' &&
expireTimer > 0 &&
messageModel.get('unread') === READ_MESSAGE_STATE.read &&
messageExpirationFromRetrieve &&
messageExpirationFromRetrieve > 0
) {
const expirationStartTimestamp = messageExpirationFromRetrieve - expireTimer * 1000;
const expires_at = messageExpirationFromRetrieve;
// TODO a message might be added even when it expired, but the period cleaning of expired message will pick it up and remove it soon enough
window.log.debug(
`incoming DaR message already read by another device, forcing readAt ${(Date.now() -
expirationStartTimestamp) /
1000}s ago, so with ${(expires_at - Date.now()) / 1000}s left`
);
messageModel.set({
expirationStartTimestamp,
expires_at,
});
}
// This message is an ExpirationTimerUpdate
if (messageFlags === SignalService.DataMessage.Flags.EXPIRATION_TIMER_UPDATE) {
const previousExpirationMode = conversationModel.getExpirationMode();
const previousExpirationTimer = conversationModel.getExpireTimer();
const shouldUsePreviousExpiration =
expirationType === 'unknown' &&
previousExpirationMode !== 'off' &&
previousExpirationMode !== 'legacy' &&
messageFlags === SignalService.DataMessage.Flags.EXPIRATION_TIMER_UPDATE;
if (shouldUsePreviousExpiration) {
messageModel.set({
expirationType: previousExpirationMode,
expireTimer: previousExpirationTimer,
});
}
const expirationTimerUpdate = {
expirationType,
expireTimer,

@ -32,4 +32,5 @@ export type DisappearingMessageUpdate = {
isLegacyConversationSettingMessage?: boolean;
isLegacyDataMessage?: boolean;
isDisappearingMessagesV2Released?: boolean;
messageExpirationFromRetrieve: number | null;
};

@ -185,7 +185,7 @@ async function send(
expireTimer
);
const canBeDeleteAfterRead = convo && !convo.isMe() && convo.isPrivate();
const canBeDeleteAfterRead = !convo.isMe() && convo.isPrivate();
// TODO legacy messages support will be removed in a future release
if (

@ -1429,9 +1429,11 @@ async function addIceCandidateToExistingPeerConnection(callMessage: SignalServic
if (peerConnection) {
for (let index = 0; index < callMessage.sdps.length; index++) {
const sdp = callMessage.sdps[index];
const sdpMLineIndex = callMessage.sdpMLineIndexes[index];
const sdpMid = callMessage.sdpMids[index];
const candicate = new RTCIceCandidate({ sdpMid, sdpMLineIndex, candidate: sdp });
try {
// eslint-disable-next-line no-await-in-loop
await peerConnection.addIceCandidate(candicate);

@ -1,8 +1,8 @@
/* eslint-disable no-console */
import storage from 'redux-persist/lib/storage';
import { createLogger } from 'redux-logger';
import { configureStore } from '@reduxjs/toolkit';
import { createLogger } from 'redux-logger';
import { persistReducer } from 'redux-persist';
@ -16,7 +16,6 @@ const directConsole = {
group: console.group,
groupEnd: console.groupEnd,
warn: console.warn,
error: console.error,
};

@ -349,7 +349,7 @@ describe('DisappearingMessage', () => {
const expireUpdate = await DisappearingMessages.checkForExpireUpdateInContentMessage(
visibleMessage.contentProto(),
convoToUpdate,
true
null
);
expect(expireUpdate?.expirationType, 'expirationType should be unknown').to.equal('unknown');
@ -376,7 +376,7 @@ describe('DisappearingMessage', () => {
const expireUpdate = await DisappearingMessages.checkForExpireUpdateInContentMessage(
disappearingMessage.contentProto(),
convoToUpdate,
true
null
);
expect(expireUpdate?.expirationType, 'expirationType should be deleteAfterRead').to.equal(
@ -410,7 +410,7 @@ describe('DisappearingMessage', () => {
const expireUpdate = await DisappearingMessages.checkForExpireUpdateInContentMessage(
expirationTimerUpdateMessage.contentProto(),
convoToUpdate,
true
null
);
expect(expireUpdate?.expirationType, 'expirationType should be deleteAfterSend').to.equal(
@ -444,7 +444,7 @@ describe('DisappearingMessage', () => {
const expireUpdate = await DisappearingMessages.checkForExpireUpdateInContentMessage(
expirationTimerUpdateMessage.contentProto(),
convoToUpdate,
true
null
);
expect(expireUpdate?.expirationType, 'expirationType should be deleteAfterSend').to.equal(

Loading…
Cancel
Save