From 5afbd9c19e86891645fd5658016f26e5d897905b Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Tue, 25 Jan 2022 13:46:46 +1100 Subject: [PATCH] cleanup creation of opengroup message on sync --- ts/data/data.ts | 22 ++- ts/models/conversation.ts | 169 ++++++++++-------- ts/models/message.ts | 3 +- ts/models/messageFactory.ts | 159 ++++++++++++++++ ts/receiver/callMessage.ts | 5 +- ts/receiver/closedGroups.ts | 23 +-- ts/receiver/contentMessage.ts | 7 +- ts/receiver/dataMessage.ts | 72 ++++---- ts/receiver/errors.ts | 53 ------ ts/receiver/opengroup.ts | 88 +++++++++ ts/receiver/queuedJob.ts | 14 +- ts/receiver/receiver.ts | 108 +---------- .../opengroupV2/OpenGroupServerPoller.ts | 2 +- ts/session/apis/snode_api/SNodeAPI.ts | 4 + ts/session/group/closed-group.ts | 25 +-- ts/session/sending/MessageSender.ts | 25 ++- ts/session/utils/calling/CallManager.ts | 30 ++-- 17 files changed, 453 insertions(+), 356 deletions(-) create mode 100644 ts/models/messageFactory.ts delete mode 100644 ts/receiver/errors.ts create mode 100644 ts/receiver/opengroup.ts diff --git a/ts/data/data.ts b/ts/data/data.ts index b640ef71e..86c88e143 100644 --- a/ts/data/data.ts +++ b/ts/data/data.ts @@ -11,7 +11,7 @@ import { ConversationTypeEnum, } from '../models/conversation'; import { MessageCollection, MessageModel } from '../models/message'; -import { MessageAttributes, MessageDirection } from '../models/messageType'; +import { MessageAttributes } from '../models/messageType'; import { HexKeyPair } from '../receiver/keypairs'; import { getConversationController } from '../session/conversations'; import { getSodium } from '../session/crypto'; @@ -1037,13 +1037,17 @@ export async function fillWithTestData(convs: number, msgs: number) { for (let msgsAddedCount = 0; msgsAddedCount < msgs; msgsAddedCount++) { // tslint:disable: insecure-random const convoToChoose = newConvos[Math.floor(Math.random() * newConvos.length)]; - await convoToChoose.addSingleMessage({ - source: convoToChoose.id, - type: MessageDirection.outgoing, - conversationId: convoToChoose.id, - body: `spongebob ${new Date().toString()}`, - // tslint:disable: insecure-random - direction: Math.random() > 0.5 ? 'outgoing' : 'incoming', - }); + const direction = Math.random() > 0.5 ? 'outgoing' : 'incoming'; + const body = `spongebob ${new Date().toString()}`; + if (direction === 'outgoing') { + await convoToChoose.addSingleOutgoingMessage({ + body, + }); + } else { + await convoToChoose.addSingleIncomingMessage({ + source: convoToChoose.id, + body, + }); + } } } diff --git a/ts/models/conversation.ts b/ts/models/conversation.ts index e3f821e0d..fec679bc2 100644 --- a/ts/models/conversation.ts +++ b/ts/models/conversation.ts @@ -9,7 +9,7 @@ import { BlockedNumberController } from '../util'; import { leaveClosedGroup } from '../session/group/closed-group'; import { SignalService } from '../protobuf'; import { MessageModel } from './message'; -import { MessageAttributesOptionals, MessageModelType } from './messageType'; +import { MessageAttributesOptionals } from './messageType'; import autoBind from 'auto-bind'; import { getLastMessagesByConversation, @@ -47,7 +47,7 @@ import { ed25519Str } from '../session/onions/onionPath'; import { getDecryptedMediaUrl } from '../session/crypto/DecryptedAttachmentsManager'; import { IMAGE_JPEG } from '../types/MIME'; import { forceSyncConfigurationNowIfNeeded } from '../session/utils/syncUtils'; -import { getLatestTimestampOffset } from '../session/apis/snode_api/SNodeAPI'; +import { getNowWithNetworkOffset } from '../session/apis/snode_api/SNodeAPI'; import { createLastMessageUpdate } from '../types/Conversation'; import { ReplyingToMessageProps, @@ -701,10 +701,7 @@ export class ConversationModel extends Backbone.Model { public async sendMessage(msg: SendMessageType) { const { attachments, body, groupInvitation, preview, quote } = msg; this.clearTypingTimers(); - - const destination = this.id; const expireTimer = this.get('expireTimer'); - const networkTimestamp = getNowWithNetworkOffset(); window?.log?.info( @@ -714,34 +711,16 @@ export class ConversationModel extends Backbone.Model { networkTimestamp ); - const editedQuote = _.isEmpty(quote) ? undefined : quote; - - const messageObject: MessageAttributesOptionals = { - type: 'outgoing', + const messageModel = await this.addSingleOutgoingMessage({ body, - conversationId: destination, - quote: editedQuote, + quote: _.isEmpty(quote) ? undefined : quote, preview, attachments, sent_at: networkTimestamp, - received_at: networkTimestamp, expireTimer, - isDeleted: false, - source: UserUtils.getOurPubKeyStrFromCache(), - }; - - if (this.isPublic()) { - // set the serverTimestamp only if this conversation is a public one. - messageObject.serverTimestamp = Date.now(); - } - - const attributes: MessageAttributesOptionals = { - ...messageObject, + serverTimestamp: this.isPublic() ? Date.now() : undefined, groupInvitation, - conversationId: this.id, - }; - - const messageModel = await this.addSingleMessage(attributes); + }); // We're offline! if (!window.textsecure.messaging) { @@ -827,15 +806,7 @@ export class ConversationModel extends Backbone.Model { this.set({ expireTimer }); - const messageAttributes = { - // Even though this isn't reflected to the user, we want to place the last seen - // indicator above it. We set it to 'unread' to trigger that placement. - unread: isOutgoing ? 0 : 1, - conversationId: this.id, - source, - // No type; 'incoming' messages are specially treated by conversation.markRead() - sent_at: timestamp, - received_at: timestamp, + const commonAttributes = { flags: SignalService.DataMessage.Flags.EXPIRATION_TIMER_UPDATE, expirationTimerUpdate: { expireTimer, @@ -843,11 +814,27 @@ export class ConversationModel extends Backbone.Model { fromSync: options.fromSync, }, expireTimer: 0, - type: isOutgoing ? 'outgoing' : ('incoming' as MessageModelType), - destination: this.id, - recipients: isOutgoing ? this.getRecipients() : undefined, }; - const message = await this.addSingleMessage(messageAttributes); + + let message: MessageModel | undefined; + + if (isOutgoing) { + message = await this.addSingleOutgoingMessage({ + ...commonAttributes, + unread: 0, + sent_at: timestamp, + }); + } else { + message = await this.addSingleIncomingMessage({ + ...commonAttributes, + // Even though this isn't reflected to the user, we want to place the last seen + // indicator above it. We set it to 'unread' to trigger that placement. + unread: 1, + source, + sent_at: timestamp, + received_at: timestamp, + }); + } // tell the UI this conversation was updated await this.commit(); @@ -899,40 +886,39 @@ export class ConversationModel extends Backbone.Model { perfEnd(`conversationCommit-${this.attributes.id}`, 'conversationCommit'); } - public async addSingleMessage(messageAttributes: MessageAttributesOptionals, setToExpire = true) { - const model = new MessageModel(messageAttributes); - - const isMe = messageAttributes.source === UserUtils.getOurPubKeyStrFromCache(); - - if ( - isMe && - window.lokiFeatureFlags.useMessageRequests && - window.inboxStore?.getState().userConfig.messageRequests - ) { - await this.setIsApproved(true); - } - - // no need to trigger a UI update now, we trigger a messagesAdded just below - const messageId = await model.commit(false); - model.set({ id: messageId }); - - if (setToExpire) { - await model.setToExpire(); - } - window.inboxStore?.dispatch( - conversationActions.messagesAdded([ - { - conversationKey: this.id, - messageModelProps: model.getMessageModelProps(), - }, - ]) + public async addSingleOutgoingMessage( + messageAttributes: Omit< + MessageAttributesOptionals, + 'conversationId' | 'source' | 'type' | 'direction' | 'received_at' + >, + setToExpire = true + ) { + return this.addSingleMessage( + { + ...messageAttributes, + conversationId: this.id, + source: UserUtils.getOurPubKeyStrFromCache(), + type: 'outgoing', + direction: 'outgoing', + received_at: messageAttributes.sent_at, // make sure to set an received_at timestamp for an outgoing message, so the order are right. + }, + setToExpire ); - const unreadCount = await this.getUnreadCount(); - this.set({ unreadCount }); - this.updateLastMessage(); + } - await this.commit(); - return model; + public async addSingleIncomingMessage( + messageAttributes: Omit, + setToExpire = true + ) { + return this.addSingleMessage( + { + ...messageAttributes, + conversationId: this.id, + type: 'incoming', + direction: 'outgoing', + }, + setToExpire + ); } public async leaveClosedGroup() { @@ -1484,6 +1470,45 @@ export class ConversationModel extends Backbone.Model { } } + private async addSingleMessage( + messageAttributes: MessageAttributesOptionals, + setToExpire = true + ) { + const model = new MessageModel(messageAttributes); + + const isMe = messageAttributes.source === UserUtils.getOurPubKeyStrFromCache(); + + if ( + isMe && + window.lokiFeatureFlags.useMessageRequests && + window.inboxStore?.getState().userConfig.messageRequests + ) { + await this.setIsApproved(true); + } + + // no need to trigger a UI update now, we trigger a messagesAdded just below + const messageId = await model.commit(false); + model.set({ id: messageId }); + + if (setToExpire) { + await model.setToExpire(); + } + window.inboxStore?.dispatch( + conversationActions.messagesAdded([ + { + conversationKey: this.id, + messageModelProps: model.getMessageModelProps(), + }, + ]) + ); + const unreadCount = await this.getUnreadCount(); + this.set({ unreadCount }); + this.updateLastMessage(); + + await this.commit(); + return model; + } + private async clearContactTypingTimer(_sender: string) { if (!!this.typingTimer) { global.clearTimeout(this.typingTimer); diff --git a/ts/models/message.ts b/ts/models/message.ts index b24b598ff..c926d78fd 100644 --- a/ts/models/message.ts +++ b/ts/models/message.ts @@ -707,11 +707,10 @@ export class MessageModel extends Backbone.Model { // TODO: In the future it might be best if we cache the upload results if possible. // This way we don't upload duplicated data. - const attachmentsWithData = await Promise.all( + const finalAttachments = await Promise.all( (this.get('attachments') || []).map(loadAttachmentData) ); const body = this.get('body'); - const finalAttachments = attachmentsWithData; const quoteWithData = await loadQuoteData(this.get('quote')); const previewWithData = await loadPreviewData(this.get('preview')); diff --git a/ts/models/messageFactory.ts b/ts/models/messageFactory.ts new file mode 100644 index 000000000..a23110770 --- /dev/null +++ b/ts/models/messageFactory.ts @@ -0,0 +1,159 @@ +import { UserUtils } from '../session/utils'; +import { MessageModel } from './message'; +import { MessageAttributesOptionals, MessageModelType } from './messageType'; + +export type MessageCreationData = { + timestamp: number; + receivedAt: number; + source: string; + isPublic: boolean; + serverId: number | null; + serverTimestamp: number | null; + groupId: string | null; + + expirationStartTimestamp?: number; + destination: string; + messageHash: string; +}; + +function initIncomingMessage(data: MessageCreationData): MessageModel { + const { + timestamp, + isPublic, + receivedAt, + source, + serverId, + serverTimestamp, + messageHash, + groupId, + } = data; + + const messageData: MessageAttributesOptionals = { + source, + serverId: serverId || undefined, + sent_at: timestamp, + serverTimestamp: serverTimestamp || undefined, + received_at: receivedAt || Date.now(), + conversationId: groupId ?? source, + type: 'incoming', + direction: 'incoming', + unread: 1, + isPublic, + messageHash: messageHash || undefined, + }; + + return new MessageModel(messageData); +} + +/** + * This function can be called for either a sync message or a message synced through an opengroup poll. + * This does not save it to the db, just in memory + */ +function createMessageSentFromOurself({ + timestamp, + serverTimestamp, + serverId, + isPublic, + receivedAt, + expirationStartTimestamp, + destination, + groupId, + messageHash, +}: { + timestamp: number; + receivedAt: number; + isPublic: boolean; + serverId: number | null; + serverTimestamp: number | null; + groupId: string | null; + expirationStartTimestamp: number | null; + destination: string; + messageHash: string; +}): MessageModel { + // Omit< + // MessageAttributesOptionals, + // 'conversationId' | 'source' | 'type' | 'direction' | 'received_at' + // > + const now = Date.now(); + + const messageData: MessageAttributesOptionals = { + source: UserUtils.getOurPubKeyStrFromCache(), + type: 'outgoing' as MessageModelType, + serverTimestamp: serverTimestamp || undefined, + serverId: serverId || undefined, + sent_at: timestamp, + received_at: isPublic ? receivedAt : now, + isPublic, + conversationId: groupId ?? destination, + messageHash, + unread: 0, + sent_to: [], + sent: true, + expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now), + }; + + return new MessageModel(messageData); +} + +/** + * This function is only called when we get a message from ourself from an opengroup polling event + */ +export function createPublicMessageSentFromUs({ + serverTimestamp, + serverId, + conversationId, +}: { + serverId: number; + serverTimestamp: number; + conversationId: string; +}): MessageModel { + const messageData: MessageAttributesOptionals = { + source: UserUtils.getOurPubKeyStrFromCache(), + type: 'outgoing' as MessageModelType, + serverTimestamp: serverTimestamp || undefined, + serverId: serverId || undefined, + sent_at: serverTimestamp, + received_at: serverTimestamp, + isPublic: true, + conversationId, + messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that + unread: 0, + sent_to: [], + sent: true, + expirationStartTimestamp: undefined, + }; + + return new MessageModel(messageData); +} + +/** + * This function is only called by the Receiver when we get a message + * from someone else than ourself from an opengroup polling event + */ +export function createPublicMessageSentFromNotUs({ + serverTimestamp, + serverId, + conversationId, + sender, +}: { + serverId: number; + sender: string; + serverTimestamp: number; + conversationId: string; +}): MessageModel { + const messageData: MessageAttributesOptionals = { + source: sender, + conversationId, + type: 'incoming' as MessageModelType, + serverTimestamp: serverTimestamp, + sent_at: serverTimestamp, + received_at: serverTimestamp, + serverId, + isPublic: true, + messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that + unread: 1, + expirationStartTimestamp: undefined, + }; + + return new MessageModel(messageData); +} diff --git a/ts/receiver/callMessage.ts b/ts/receiver/callMessage.ts index a365b1dd4..2ca0a3a28 100644 --- a/ts/receiver/callMessage.ts +++ b/ts/receiver/callMessage.ts @@ -1,10 +1,10 @@ import _ from 'lodash'; import { SignalService } from '../protobuf'; import { TTL_DEFAULT } from '../session/constants'; -import { SNodeAPI } from '../session/apis/snode_api'; import { CallManager, UserUtils } from '../session/utils'; import { removeFromCache } from './cache'; import { EnvelopePlus } from './types'; +import { getNowWithNetworkOffset } from '../session/apis/snode_api/SNodeAPI'; export async function handleCallMessage( envelope: EnvelopePlus, @@ -12,7 +12,6 @@ export async function handleCallMessage( ) { const sender = envelope.senderIdentity || envelope.source; - const currentOffset = SNodeAPI.getLatestTimestampOffset(); const sentTimestamp = _.toNumber(envelope.timestamp); const { type } = callMessage; @@ -50,7 +49,7 @@ export async function handleCallMessage( } if (type === SignalService.CallMessage.Type.OFFER) { - if (Math.max(sentTimestamp - (Date.now() - currentOffset)) > TTL_DEFAULT.CALL_MESSAGE) { + if (Math.max(sentTimestamp - getNowWithNetworkOffset()) > TTL_DEFAULT.CALL_MESSAGE) { window?.log?.info('Dropping incoming OFFER callMessage sent a while ago: ', sentTimestamp); await removeFromCache(envelope); diff --git a/ts/receiver/closedGroups.ts b/ts/receiver/closedGroups.ts index e44be49e6..ab5fa3591 100644 --- a/ts/receiver/closedGroups.ts +++ b/ts/receiver/closedGroups.ts @@ -233,12 +233,12 @@ export async function handleNewClosedGroup( await removeFromCache(envelope); return; } - const maybeConvo = getConversationController().get(groupId); + const groupConvo = getConversationController().get(groupId); const expireTimer = groupUpdate.expireTimer; - if (maybeConvo) { + if (groupConvo) { // if we did not left this group, just add the keypair we got if not already there - if (!maybeConvo.get('isKickedFromGroup') && !maybeConvo.get('left')) { + if (!groupConvo.get('isKickedFromGroup') && !groupConvo.get('left')) { const ecKeyPairAlreadyExistingConvo = new ECKeyPair( // tslint:disable: no-non-null-assertion encryptionKeyPair!.publicKey, @@ -249,7 +249,7 @@ export async function handleNewClosedGroup( ecKeyPairAlreadyExistingConvo.toHexKeyPair() ); - await maybeConvo.updateExpireTimer(expireTimer, sender, Date.now()); + await groupConvo.updateExpireTimer(expireTimer, sender, Date.now()); if (isKeyPairAlreadyHere) { window.log.info('Dropping already saved keypair for group', groupId); @@ -266,15 +266,18 @@ export async function handleNewClosedGroup( } // convo exists and we left or got kicked, enable typing and continue processing // Enable typing: - maybeConvo.set('isKickedFromGroup', false); - maybeConvo.set('left', false); - maybeConvo.set('lastJoinedTimestamp', _.toNumber(envelope.timestamp)); - // we just got readded. Consider the zombie list to have been cleared - maybeConvo.set('zombies', []); + groupConvo.set({ + left: false, + isKickedFromGroup: false, + lastJoinedTimestamp: _.toNumber(envelope.timestamp), + // we just got readded. Consider the zombie list to have been cleared + + zombies: [], + }); } const convo = - maybeConvo || + groupConvo || (await getConversationController().getOrCreateAndWait(groupId, ConversationTypeEnum.GROUP)); // ***** Creating a new group ***** window?.log?.info('Received a new ClosedGroup of id:', groupId); diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index 73b2a9c2d..326ca27b5 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -591,14 +591,9 @@ export async function handleDataExtractionNotification( if (timestamp) { const envelopeTimestamp = Lodash.toNumber(timestamp); const referencedAttachmentTimestamp = Lodash.toNumber(referencedAttachment); - const now = Date.now(); - await convo.addSingleMessage({ - conversationId: convo.get('id'), - source, - type: 'outgoing', // mark it as outgoing just so it appears below our sent attachment + await convo.addSingleOutgoingMessage({ sent_at: envelopeTimestamp, - received_at: now, dataExtractionNotification: { type, referencedAttachmentTimestamp, // currently unused diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index d9f5b20f2..4adf8f4a0 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -11,7 +11,7 @@ import { StringUtils, UserUtils } from '../session/utils'; import { getConversationController } from '../session/conversations'; import { handleClosedGroupControlMessage } from './closedGroups'; import { MessageModel } from '../models/message'; -import { MessageModelType } from '../models/messageType'; +import { MessageAttributesOptionals, MessageModelType } from '../models/messageType'; import { getMessageBySenderAndSentAt, getMessageBySenderAndServerTimestamp, @@ -318,7 +318,6 @@ export async function handleDataMessage( // Data messages for medium groups don't arrive as sync messages. Instead, // linked devices poll for group messages independently, thus they need // to recognise some of those messages at their own. - const messageEventType: 'sent' | 'message' = isMe ? 'sent' : 'message'; if (envelope.senderIdentity) { message.group = { @@ -326,12 +325,6 @@ export async function handleDataMessage( }; } - let groupId: string | null = null; - if (message.group?.id?.length) { - // remove the prefix from the source object so this is correct for all other - groupId = PubKey.removeTextSecurePrefixIfNeeded(toHex(message.group?.id)); - } - const confirm = () => removeFromCache(envelope); const data: MessageCreationData = { @@ -343,10 +336,12 @@ export async function handleDataMessage( isPublic: false, serverId: null, serverTimestamp: null, - groupId, + groupId: message.group?.id?.length + ? PubKey.removeTextSecurePrefixIfNeeded(toHex(message.group?.id)) + : null, }; - await handleMessageEvent(messageEventType, data, message, confirm); + await handleMessageEvent(!isMe, data, message, confirm); } export type MessageId = { @@ -387,6 +382,31 @@ export async function isMessageDuplicate({ source, timestamp, serverTimestamp }: } } +export async function isOpengroupMessageDuplicate({ + sender, + serverTimestamp, +}: { + sender: string; + serverTimestamp: number; +}) { + // serverTimestamp is only used for opengroupv2 + try { + const result = await getMessageBySenderAndServerTimestamp({ + source: sender, + serverTimestamp, + }); + + // if we have a result, it means a specific user sent two messages either with the same serverTimestamp. + // no need to do anything else, those messages must be the same + // Note: this test is not based on which conversation the user sent the message + // but we consider that a user sending two messages with the same serverTimestamp is unlikely + return Boolean(result); + } catch (error) { + window?.log?.error('isOpengroupMessageDuplicate error:', toLogFormat(error)); + return false; + } +} + async function handleProfileUpdate( profileKeyBuffer: Uint8Array, convoId: string, @@ -418,14 +438,12 @@ export type MessageCreationData = { serverId: number | null; serverTimestamp: number | null; groupId: string | null; - - // Needed for synced outgoing messages - expirationStartTimestamp?: any; // ??? + expirationStartTimestamp?: number; destination: string; messageHash: string; }; -export function initIncomingMessage(data: MessageCreationData): MessageModel { +function initIncomingMessage(data: MessageCreationData): MessageModel { const { timestamp, isPublic, @@ -437,18 +455,18 @@ export function initIncomingMessage(data: MessageCreationData): MessageModel { groupId, } = data; - const messageData: any = { + const messageData: MessageAttributesOptionals = { source, - serverId, + serverId: serverId || undefined, sent_at: timestamp, - serverTimestamp, + serverTimestamp: serverTimestamp || undefined, received_at: receivedAt || Date.now(), conversationId: groupId ?? source, type: 'incoming', - direction: 'incoming', // + + direction: 'incoming', unread: 1, isPublic, - messageHash: messageHash || null, + messageHash: messageHash || undefined, }; return new MessageModel(messageData); @@ -475,7 +493,7 @@ function createSentMessage(data: MessageCreationData): MessageModel { expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now), }; - const messageData = { + const messageData: MessageAttributesOptionals = { source: UserUtils.getOurPubKeyStrFromCache(), serverTimestamp: serverTimestamp || undefined, serverId: serverId || undefined, @@ -501,13 +519,11 @@ export function createMessage(data: MessageCreationData, isIncoming: boolean): M // tslint:disable:cyclomatic-complexity max-func-body-length */ async function handleMessageEvent( - messageEventType: 'sent' | 'message', + isIncoming: boolean, messageCreationData: MessageCreationData, rawDataMessage: SignalService.DataMessage, confirm: () => void ): Promise { - const isIncoming = messageEventType === 'message'; - if (!messageCreationData || !rawDataMessage) { window?.log?.warn('Invalid data passed to handleMessageEvent.', event); confirm(); @@ -576,14 +592,6 @@ async function handleMessageEvent( confirm(); return; } - await handleMessageJob( - msg, - conversation, - rawDataMessage, - ourNumber, - confirm, - source, - messageHash - ); + await handleMessageJob(msg, conversation, rawDataMessage, confirm, source, messageHash); }); } diff --git a/ts/receiver/errors.ts b/ts/receiver/errors.ts deleted file mode 100644 index 7ac5941ad..000000000 --- a/ts/receiver/errors.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { initIncomingMessage } from './dataMessage'; -import { toNumber } from 'lodash'; -import { getConversationController } from '../session/conversations'; -import { ConversationTypeEnum } from '../models/conversation'; -import { toLogFormat } from '../types/attachments/Errors'; -import { messagesAdded } from '../state/ducks/conversations'; - -export async function onError(ev: any) { - const { error } = ev; - window?.log?.error('background onError:', toLogFormat(error)); - - if (ev.proto) { - const envelope = ev.proto; - - const message = initIncomingMessage(envelope); - - await message.saveErrors(error || new Error('Error was null')); - const id = message.get('conversationId'); - const conversation = await getConversationController().getOrCreateAndWait( - id, - ConversationTypeEnum.PRIVATE - ); - // force conversation unread count to be > 0 so it is highlighted - conversation.set({ - active_at: Date.now(), - unreadCount: toNumber(conversation.get('unreadCount')) + 1, - }); - - const conversationActiveAt = conversation.get('active_at'); - const messageTimestamp = message.get('timestamp') || 0; - if (!conversationActiveAt || messageTimestamp > conversationActiveAt) { - conversation.set({ active_at: message.get('sent_at') }); - } - - conversation.updateLastMessage(); - await conversation.notify(message); - window.inboxStore?.dispatch( - messagesAdded([ - { - conversationKey: conversation.id, - messageModelProps: message.getMessageModelProps(), - }, - ]) - ); - - if (ev.confirm) { - ev.confirm(); - } - await conversation.commit(); - } - - throw error; -} diff --git a/ts/receiver/opengroup.ts b/ts/receiver/opengroup.ts new file mode 100644 index 000000000..3cf69399f --- /dev/null +++ b/ts/receiver/opengroup.ts @@ -0,0 +1,88 @@ +import { noop } from 'lodash'; +import { ConversationTypeEnum } from '../models/conversation'; +import { + createPublicMessageSentFromNotUs, + createPublicMessageSentFromUs, +} from '../models/messageFactory'; +import { SignalService } from '../protobuf'; +import { OpenGroupRequestCommonType } from '../session/apis/open_group_api/opengroupV2/ApiUtil'; +import { OpenGroupMessageV2 } from '../session/apis/open_group_api/opengroupV2/OpenGroupMessageV2'; +import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils'; +import { getConversationController } from '../session/conversations'; +import { removeMessagePadding } from '../session/crypto/BufferPadding'; +import { UserUtils } from '../session/utils'; +import { fromBase64ToArray } from '../session/utils/String'; +import { isOpengroupMessageDuplicate } from './dataMessage'; +import { handleMessageJob } from './queuedJob'; + +export async function handleOpenGroupV2Message( + message: OpenGroupMessageV2, + roomInfos: OpenGroupRequestCommonType +) { + const { base64EncodedData, sentTimestamp, sender, serverId } = message; + const { serverUrl, roomId } = roomInfos; + if (!base64EncodedData || !sentTimestamp || !sender || !serverId) { + window?.log?.warn('Invalid data passed to handleOpenGroupV2Message.', message); + return; + } + + // Note: opengroup messages are not padded + const dataUint = new Uint8Array(removeMessagePadding(fromBase64ToArray(base64EncodedData))); + + const decoded = SignalService.Content.decode(dataUint); + + const conversationId = getOpenGroupV2ConversationId(serverUrl, roomId); + if (!conversationId) { + window?.log?.error('We cannot handle a message without a conversationId'); + return; + } + const idataMessage = decoded?.dataMessage; + if (!idataMessage) { + window?.log?.error('Invalid decoded opengroup message: no dataMessage'); + return; + } + + if (!getConversationController().get(conversationId)) { + window?.log?.error('Received a message for an unknown convo. Skipping'); + return; + } + + const conversation = await getConversationController().getOrCreateAndWait( + conversationId, + ConversationTypeEnum.GROUP + ); + + if (!conversation) { + window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId); + return; + } + + void conversation.queueJob(async () => { + const isMe = UserUtils.isUsFromCache(sender); + + const commonAttributes = { serverTimestamp: sentTimestamp, serverId, conversationId }; + const attributesForNotUs = { ...commonAttributes, sender }; + // those lines just create an empty message with some basic stuff set. + // the whole decoding of data is happening in handleMessageJob() + const msgModel = isMe + ? createPublicMessageSentFromUs(commonAttributes) + : createPublicMessageSentFromNotUs(attributesForNotUs); + + // WARNING this is important that the isOpengroupMessageDuplicate is made INSIDE the conversation.queueJob call + const isDuplicate = await isOpengroupMessageDuplicate(attributesForNotUs); + + if (isDuplicate) { + window?.log?.info('Received duplicate opengroup message. Dropping it.'); + return; + } + + await handleMessageJob( + msgModel, + conversation, + decoded?.dataMessage as SignalService.DataMessage, + noop, + sender, + '' + ); + }); +} diff --git a/ts/receiver/queuedJob.ts b/ts/receiver/queuedJob.ts index 950b4a749..6d2fcdc3e 100644 --- a/ts/receiver/queuedJob.ts +++ b/ts/receiver/queuedJob.ts @@ -10,6 +10,7 @@ import { getMessageById, getMessagesBySentAt } from '../../ts/data/data'; import { MessageModelPropsWithoutConvoProps, messagesAdded } from '../state/ducks/conversations'; import { updateProfileOneAtATime } from './dataMessage'; import { SignalService } from '../protobuf'; +import { UserUtils } from '../session/utils'; function contentTypeSupported(type: string): boolean { const Chrome = window.Signal.Util.GoogleChrome; @@ -182,7 +183,6 @@ async function handleRegularMessage( message: MessageModel, rawDataMessage: SignalService.DataMessage, source: string, - ourNumber: string, messageHash: string ) { const type = message.get('type'); @@ -215,7 +215,7 @@ async function handleRegularMessage( // Expire timer updates are now explicit. // We don't handle an expire timer from a incoming message except if it is an ExpireTimerUpdate message. - const ourPubKey = PubKey.cast(ourNumber); + const ourPubKey = UserUtils.getOurPubKeyFromCache(); handleMentions(message, conversation, ourPubKey); @@ -295,7 +295,6 @@ export async function handleMessageJob( messageModel: MessageModel, conversation: ConversationModel, rawDataMessage: SignalService.DataMessage, - ourNumber: string, confirm: () => void, source: string, messageHash: string @@ -320,14 +319,7 @@ export async function handleMessageJob( } await handleExpirationTimerUpdate(conversation, messageModel, source, expireTimer); } else { - await handleRegularMessage( - conversation, - messageModel, - rawDataMessage, - source, - ourNumber, - messageHash - ); + await handleRegularMessage(conversation, messageModel, rawDataMessage, source, messageHash); } const id = await messageModel.commit(); diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index 56c413c13..47bdf51cc 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -3,29 +3,15 @@ export { downloadAttachment } from './attachments'; import { v4 as uuidv4 } from 'uuid'; import { addToCache, getAllFromCache, getAllFromCacheForSource, removeFromCache } from './cache'; -import { processMessage } from '../session/apis/snode_api/swarmPolling'; -import { onError } from './errors'; // innerHandleContentMessage is only needed because of code duplication in handleDecryptedEnvelope... import { handleContentMessage, innerHandleContentMessage } from './contentMessage'; -import _, { noop } from 'lodash'; - -export { processMessage }; - -import { createMessage, isMessageDuplicate, MessageCreationData } from './dataMessage'; +import _ from 'lodash'; import { getEnvelopeId } from './common'; import { StringUtils, UserUtils } from '../session/utils'; import { SignalService } from '../protobuf'; -import { getConversationController } from '../session/conversations'; import { removeUnprocessed } from '../data/data'; -import { ConversationTypeEnum } from '../models/conversation'; -import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils'; -import { OpenGroupMessageV2 } from '../session/apis/open_group_api/opengroupV2/OpenGroupMessageV2'; -import { OpenGroupRequestCommonType } from '../session/apis/open_group_api/opengroupV2/ApiUtil'; -import { handleMessageJob } from './queuedJob'; -import { fromBase64ToArray } from '../session/utils/String'; -import { removeMessagePadding } from '../session/crypto/BufferPadding'; import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout'; import { perfEnd, perfStart } from '../session/utils/Performance'; @@ -146,16 +132,12 @@ async function handleRequestDetail( } } -export function handleRequest(body: any, options: ReqOptions, messageHash: string): void { +export function handleRequest(plaintext: any, options: ReqOptions, messageHash: string): void { // tslint:disable-next-line no-promise-as-boolean const lastPromise = _.last(incomingMessagePromises) || Promise.resolve(); - const plaintext = body; - const promise = handleRequestDetail(plaintext, options, lastPromise, messageHash).catch(e => { window?.log?.error('Error handling incoming message:', e && e.stack ? e.stack : e); - - void onError(e); }); incomingMessagePromises.push(promise); @@ -258,89 +240,3 @@ async function handleDecryptedEnvelope( await removeFromCache(envelope); } } - -export async function handleOpenGroupV2Message( - message: OpenGroupMessageV2, - roomInfos: OpenGroupRequestCommonType -) { - const { base64EncodedData, sentTimestamp, sender, serverId } = message; - const { serverUrl, roomId } = roomInfos; - if (!base64EncodedData || !sentTimestamp || !sender || !serverId) { - window?.log?.warn('Invalid data passed to handleOpenGroupV2Message.', message); - return; - } - - // Note: opengroup messages are not padded - const dataUint = new Uint8Array(removeMessagePadding(fromBase64ToArray(base64EncodedData))); - - const decoded = SignalService.Content.decode(dataUint); - - const conversationId = getOpenGroupV2ConversationId(serverUrl, roomId); - if (!conversationId) { - window?.log?.error('We cannot handle a message without a conversationId'); - return; - } - const idataMessage = decoded?.dataMessage; - if (!idataMessage) { - window?.log?.error('Invalid decoded opengroup message: no dataMessage'); - return; - } - - if (!getConversationController().get(conversationId)) { - window?.log?.error('Received a message for an unknown convo. Skipping'); - return; - } - - // if the message is `sent` (from secondary device) we have to set the sender manually... (at least for now) - // source = source || msg.get('source'); - - const conversation = await getConversationController().getOrCreateAndWait( - conversationId, - ConversationTypeEnum.GROUP - ); - - if (!conversation) { - window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId); - return; - } - - void conversation.queueJob(async () => { - const isMe = UserUtils.isUsFromCache(sender); - // for an opengroupv2 incoming message the serverTimestamp and the timestamp - const messageCreationData: MessageCreationData = { - isPublic: true, - serverId, - serverTimestamp: sentTimestamp, - receivedAt: Date.now(), - destination: conversationId, - timestamp: sentTimestamp, - expirationStartTimestamp: undefined, - source: sender, - groupId: null, - messageHash: '', // we do not care of a hash for an opengroup message - }; - - // WARNING this is important that the isMessageDuplicate is made in the conversation.queueJob - const isDuplicate = await isMessageDuplicate({ ...messageCreationData }); - - if (isDuplicate) { - window?.log?.info('Received duplicate message. Dropping it.'); - return; - } - - // this line just create an empty message with some basic stuff set. - // the whole decoding of data is happening in handleMessageJob() - const msg = createMessage(messageCreationData, !isMe); - const ourNumber = UserUtils.getOurPubKeyStrFromCache(); - - await handleMessageJob( - msg, - conversation, - decoded?.dataMessage as SignalService.DataMessage, - ourNumber, - noop, - sender, - '' - ); - }); -} diff --git a/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts b/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts index 9d293a194..14e438d33 100644 --- a/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts +++ b/ts/session/apis/open_group_api/opengroupV2/OpenGroupServerPoller.ts @@ -16,12 +16,12 @@ import { ConversationModel } from '../../../../models/conversation'; import { getMessageIdsFromServerIds, removeMessage } from '../../../../data/data'; import { getV2OpenGroupRoom, saveV2OpenGroupRoom } from '../../../../data/opengroups'; import { OpenGroupMessageV2 } from './OpenGroupMessageV2'; -import { handleOpenGroupV2Message } from '../../../../receiver/receiver'; import autoBind from 'auto-bind'; import { sha256 } from '../../../crypto'; import { DURATION } from '../../../constants'; import { processNewAttachment } from '../../../../types/MessageAttachment'; import { MIME } from '../../../../types'; +import { handleOpenGroupV2Message } from '../../../../receiver/opengroup'; const pollForEverythingInterval = DURATION.SECONDS * 10; const pollForRoomAvatarInterval = DURATION.DAYS * 1; diff --git a/ts/session/apis/snode_api/SNodeAPI.ts b/ts/session/apis/snode_api/SNodeAPI.ts index 17d54a4f7..6828b16e3 100644 --- a/ts/session/apis/snode_api/SNodeAPI.ts +++ b/ts/session/apis/snode_api/SNodeAPI.ts @@ -53,6 +53,10 @@ export function getLatestTimestampOffset() { return latestTimestampOffset; } +export function getNowWithNetworkOffset() { + return Date.now() - getLatestTimestampOffset(); +} + export type SendParams = { pubKey: string; ttl: string; diff --git a/ts/session/group/closed-group.ts b/ts/session/group/closed-group.ts index 8f9a85680..ca72614aa 100644 --- a/ts/session/group/closed-group.ts +++ b/ts/session/group/closed-group.ts @@ -28,7 +28,7 @@ import { ClosedGroupNameChangeMessage } from '../messages/outgoing/controlMessag import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage'; import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage'; import { getSwarmPollingInstance } from '../apis/snode_api'; -import { getLatestTimestampOffset } from '../apis/snode_api/SNodeAPI'; +import { getNowWithNetworkOffset } from '../apis/snode_api/SNodeAPI'; export type GroupInfo = { id: string; @@ -156,18 +156,10 @@ export async function addUpdateMessage( groupUpdate.kicked = diff.kickedMembers; } - const now = Date.now(); - const unread = type === 'incoming'; - const source = UserUtils.getOurPubKeyStrFromCache(); - - const message = await convo.addSingleMessage({ - conversationId: convo.get('id'), - source, - type, + const message = await convo.addSingleOutgoingMessage({ sent_at: sentAt, - received_at: now, group_update: groupUpdate, unread: unread ? 1 : 0, expireTimer: 0, @@ -278,7 +270,6 @@ export async function leaveClosedGroup(groupId: string) { const ourNumber = UserUtils.getOurPubKeyFromCache(); const isCurrentUserAdmin = convo.get('groupAdmins')?.includes(ourNumber.key); - const now = Date.now(); let members: Array = []; let admins: Array = []; @@ -299,20 +290,16 @@ export async function leaveClosedGroup(groupId: string) { await convo.commit(); const source = UserUtils.getOurPubKeyStrFromCache(); - const diffTimestamp = Date.now() - getLatestTimestampOffset(); + const networkTimestamp = getNowWithNetworkOffset(); - const dbMessage = await convo.addSingleMessage({ + const dbMessage = await convo.addSingleOutgoingMessage({ group_update: { left: [source] }, - conversationId: groupId, - source, - type: 'outgoing', - sent_at: diffTimestamp, - received_at: now, + sent_at: networkTimestamp, expireTimer: 0, }); // Send the update to the group const ourLeavingMessage = new ClosedGroupMemberLeftMessage({ - timestamp: Date.now(), + timestamp: networkTimestamp, groupId, identifier: dbMessage.id as string, }); diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index 0f45623e4..fa0819186 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -13,12 +13,11 @@ import { fromUInt8ArrayToBase64 } from '../utils/String'; import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage'; import { addMessagePadding } from '../crypto/BufferPadding'; import _ from 'lodash'; -import { storeOnNode } from '../apis/snode_api/SNodeAPI'; +import { getNowWithNetworkOffset, storeOnNode } from '../apis/snode_api/SNodeAPI'; import { getSwarmFor } from '../apis/snode_api/snodePool'; import { firstTrue } from '../utils/Promise'; import { MessageSender } from '.'; import { getMessageById } from '../../../ts/data/data'; -import { SNodeAPI } from '../apis/snode_api'; import { getConversationController } from '../conversations'; import { ed25519Str } from '../onions/onionPath'; @@ -27,7 +26,7 @@ const DEFAULT_CONNECTIONS = 1; // ================ SNODE STORE ================ function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) { - const diffTimestamp = Date.now() - SNodeAPI.getLatestTimestampOffset(); + const networkTimestamp = getNowWithNetworkOffset(); const { plainTextBuffer } = message; const contentDecoded = SignalService.Content.decode(plainTextBuffer); @@ -37,23 +36,23 @@ function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) { if (dataMessage.syncTarget) { return { overRiddenTimestampBuffer: plainTextBuffer, - diffTimestamp: _.toNumber(dataMessage.timestamp), + networkTimestamp: _.toNumber(dataMessage.timestamp), }; } - dataMessage.timestamp = diffTimestamp; + dataMessage.timestamp = networkTimestamp; } if ( dataExtractionNotification && dataExtractionNotification.timestamp && dataExtractionNotification.timestamp > 0 ) { - dataExtractionNotification.timestamp = diffTimestamp; + dataExtractionNotification.timestamp = networkTimestamp; } if (typingMessage && typingMessage.timestamp && typingMessage.timestamp > 0) { - typingMessage.timestamp = diffTimestamp; + typingMessage.timestamp = networkTimestamp; } const overRiddenTimestampBuffer = SignalService.Content.encode(contentDecoded).finish(); - return { overRiddenTimestampBuffer, diffTimestamp }; + return { overRiddenTimestampBuffer, networkTimestamp }; } export function getMinRetryTimeout() { @@ -79,7 +78,7 @@ export async function send( const { overRiddenTimestampBuffer, - diffTimestamp, + networkTimestamp, } = overwriteOutgoingTimestampWithNetworkTimestamp(message); const { envelopeType, cipherText } = await MessageEncrypter.encrypt( @@ -88,7 +87,7 @@ export async function send( encryption ); - const envelope = await buildEnvelope(envelopeType, device.key, diffTimestamp, cipherText); + const envelope = await buildEnvelope(envelopeType, device.key, networkTimestamp, cipherText); const data = wrapEnvelope(envelope); // make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side @@ -98,18 +97,18 @@ export async function send( // make sure to not update the sent timestamp if this a currently syncing message if (found && !found.get('sentSync')) { - found.set({ sent_at: diffTimestamp }); + found.set({ sent_at: networkTimestamp }); await found.commit(); } await MessageSender.TEST_sendMessageToSnode( device.key, data, ttl, - diffTimestamp, + networkTimestamp, isSyncMessage, message.identifier ); - return { wrappedEnvelope: data, effectiveTimestamp: diffTimestamp }; + return { wrappedEnvelope: data, effectiveTimestamp: networkTimestamp }; }, { retries: Math.max(attempts - 1, 0), diff --git a/ts/session/utils/calling/CallManager.ts b/ts/session/utils/calling/CallManager.ts index 272f097b3..378ea8c19 100644 --- a/ts/session/utils/calling/CallManager.ts +++ b/ts/session/utils/calling/CallManager.ts @@ -1,6 +1,5 @@ import _ from 'lodash'; import { MessageUtils, ToastUtils, UserUtils } from '../'; -import { MessageModelType } from '../../../models/messageType'; import { SignalService } from '../../../protobuf'; import { openConversationWithMessages } from '../../../state/ducks/conversations'; import { @@ -26,6 +25,7 @@ import { DURATION } from '../../constants'; import { hasConversationOutgoingMessage } from '../../../data/data'; import { getCallMediaPermissionsSettings } from '../../../components/settings/SessionSettings'; import { PnServer } from '../../apis/push_notification_api'; +import { getNowWithNetworkOffset } from '../../apis/snode_api/SNodeAPI'; // tslint:disable: function-name @@ -491,14 +491,10 @@ export async function USER_callRecipient(recipient: string) { window.log.info('Sending preOffer message to ', ed25519Str(recipient)); const calledConvo = getConversationController().get(recipient); - calledConvo.set('active_at', Date.now()); // addSingleMessage does the commit for us on the convo + calledConvo.set('active_at', Date.now()); // addSingleOutgoingMessage does the commit for us on the convo - await calledConvo?.addSingleMessage({ - conversationId: calledConvo.id, - source: UserUtils.getOurPubKeyStrFromCache(), - type: 'outgoing', + await calledConvo?.addSingleOutgoingMessage({ sent_at: now, - received_at: now, expireTimer: 0, callNotificationType: 'started-call', unread: 0, @@ -820,15 +816,13 @@ export async function USER_acceptIncomingCallRequest(fromSender: string) { await peerConnection.addIceCandidate(candicate); } } - const now = Date.now(); + const networkTimestamp = getNowWithNetworkOffset(); const callerConvo = getConversationController().get(fromSender); - callerConvo.set('active_at', now); - await callerConvo?.addSingleMessage({ - conversationId: callerConvo.id, + callerConvo.set('active_at', networkTimestamp); + await callerConvo?.addSingleIncomingMessage({ source: UserUtils.getOurPubKeyStrFromCache(), - type: 'incoming', - sent_at: now, - received_at: now, + sent_at: networkTimestamp, + received_at: networkTimestamp, expireTimer: 0, callNotificationType: 'answered-a-call', unread: 0, @@ -1134,15 +1128,13 @@ async function addMissedCallMessage(callerPubkey: string, sentAt: number) { const incomingCallConversation = getConversationController().get(callerPubkey); if (incomingCallConversation.isActive()) { - incomingCallConversation.set('active_at', Date.now()); + incomingCallConversation.set('active_at', getNowWithNetworkOffset()); } - await incomingCallConversation?.addSingleMessage({ - conversationId: callerPubkey, + await incomingCallConversation?.addSingleIncomingMessage({ source: callerPubkey, - type: 'incoming' as MessageModelType, sent_at: sentAt, - received_at: Date.now(), + received_at: getNowWithNetworkOffset(), expireTimer: 0, callNotificationType: 'missed-call', unread: 1,