From b3a86922403c0cf865b3d8466b63dcad30ee2f15 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Tue, 25 Jan 2022 16:59:05 +1100 Subject: [PATCH] cleanup incoming message creation for displaying messageresult --- ts/models/message.ts | 2 +- ts/models/messageFactory.ts | 179 +++---- ts/receiver/contentMessage.ts | 47 +- ts/receiver/dataMessage.ts | 455 ++++++------------ ts/receiver/opengroup.ts | 4 +- ts/receiver/queuedJob.ts | 69 ++- ts/receiver/receiver.ts | 26 +- .../outgoing/visibleMessage/VisibleMessage.ts | 2 +- 8 files changed, 330 insertions(+), 454 deletions(-) diff --git a/ts/models/message.ts b/ts/models/message.ts index c926d78fd..d75bd52c3 100644 --- a/ts/models/message.ts +++ b/ts/models/message.ts @@ -960,7 +960,7 @@ export class MessageModel extends Backbone.Model { // if this message needs to be synced if ( - (dataMessage.body && dataMessage.body.length) || + dataMessage.body?.length || dataMessage.attachments.length || dataMessage.flags === SignalService.DataMessage.Flags.EXPIRATION_TIMER_UPDATE ) { diff --git a/ts/models/messageFactory.ts b/ts/models/messageFactory.ts index a23110770..5a68fdd29 100644 --- a/ts/models/messageFactory.ts +++ b/ts/models/messageFactory.ts @@ -2,103 +2,67 @@ import { UserUtils } from '../session/utils'; import { MessageModel } from './message'; import { MessageAttributesOptionals, MessageModelType } from './messageType'; -export type MessageCreationData = { - timestamp: number; - receivedAt: number; - source: string; - isPublic: boolean; - serverId: number | null; - serverTimestamp: number | null; - groupId: string | null; - - expirationStartTimestamp?: number; - destination: string; +function getSharedAttributesForSwarmMessage({ + conversationId, + messageHash, + sentAt, +}: { + conversationId: string; messageHash: string; -}; - -function initIncomingMessage(data: MessageCreationData): MessageModel { - const { - timestamp, - isPublic, - receivedAt, - source, - serverId, - serverTimestamp, + sentAt: number; +}) { + const now = Date.now(); + return { + sent_at: sentAt, + received_at: now, + conversationId, messageHash, - groupId, - } = data; + }; +} +/** + * This function is only called when we get a message from ourself from a swarm polling event. + * + * NOTE: conversationId has to be the conversation in which this message should be added. So + * either syncTarget, groupId or envelope.source or senderIdentity + */ +export function createSwarmMessageSentFromUs(args: { + messageHash: string; + sentAt: number; + conversationId: string; +}): MessageModel { + // for messages we did send, we mark it as read and start the expiration timer const messageData: MessageAttributesOptionals = { - source, - serverId: serverId || undefined, - sent_at: timestamp, - serverTimestamp: serverTimestamp || undefined, - received_at: receivedAt || Date.now(), - conversationId: groupId ?? source, - type: 'incoming', - direction: 'incoming', - unread: 1, - isPublic, - messageHash: messageHash || undefined, + ...getSharedAttributesForSwarmMessage(args), + ...getSharedAttributesForOutgoingMessage(), + expirationStartTimestamp: Math.min(args.sentAt, Date.now()), }; return new MessageModel(messageData); } /** - * This function can be called for either a sync message or a message synced through an opengroup poll. - * This does not save it to the db, just in memory + * This function is only called by the Receiver when we get a message + * from someone else than ourself from a swarm polling event + * NOTE: conversationId has to be the conversation in which this message should be added. So + * either syncTarget, groupId or envelope.source or senderIdentity */ -function createMessageSentFromOurself({ - timestamp, - serverTimestamp, - serverId, - isPublic, - receivedAt, - expirationStartTimestamp, - destination, - groupId, - messageHash, -}: { - timestamp: number; - receivedAt: number; - isPublic: boolean; - serverId: number | null; - serverTimestamp: number | null; - groupId: string | null; - expirationStartTimestamp: number | null; - destination: string; +export function createSwarmMessageSentFromNotUs(args: { messageHash: string; + sentAt: number; + sender: string; + conversationId: string; }): MessageModel { - // Omit< - // MessageAttributesOptionals, - // 'conversationId' | 'source' | 'type' | 'direction' | 'received_at' - // > - const now = Date.now(); - const messageData: MessageAttributesOptionals = { - source: UserUtils.getOurPubKeyStrFromCache(), - type: 'outgoing' as MessageModelType, - serverTimestamp: serverTimestamp || undefined, - serverId: serverId || undefined, - sent_at: timestamp, - received_at: isPublic ? receivedAt : now, - isPublic, - conversationId: groupId ?? destination, - messageHash, - unread: 0, - sent_to: [], - sent: true, - expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now), + ...getSharedAttributesForSwarmMessage(args), + ...getSharedAttributesForIncomingMessage(), + source: args.sender, }; return new MessageModel(messageData); } -/** - * This function is only called when we get a message from ourself from an opengroup polling event - */ -export function createPublicMessageSentFromUs({ +function getSharedAttributesForPublicMessage({ serverTimestamp, serverId, conversationId, @@ -106,10 +70,8 @@ export function createPublicMessageSentFromUs({ serverId: number; serverTimestamp: number; conversationId: string; -}): MessageModel { - const messageData: MessageAttributesOptionals = { - source: UserUtils.getOurPubKeyStrFromCache(), - type: 'outgoing' as MessageModelType, +}) { + return { serverTimestamp: serverTimestamp || undefined, serverId: serverId || undefined, sent_at: serverTimestamp, @@ -117,10 +79,40 @@ export function createPublicMessageSentFromUs({ isPublic: true, conversationId, messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that + expirationStartTimestamp: undefined, + }; +} + +function getSharedAttributesForOutgoingMessage() { + return { + source: UserUtils.getOurPubKeyStrFromCache(), unread: 0, sent_to: [], sent: true, - expirationStartTimestamp: undefined, + type: 'outgoing' as MessageModelType, + direction: 'outgoing' as MessageModelType, + }; +} + +function getSharedAttributesForIncomingMessage() { + return { + unread: 1, + type: 'incoming' as MessageModelType, + direction: 'incoming' as MessageModelType, + }; +} + +/** + * This function is only called when we get a message from ourself from an opengroup polling event + */ +export function createPublicMessageSentFromUs(args: { + serverId: number; + serverTimestamp: number; + conversationId: string; +}): MessageModel { + const messageData: MessageAttributesOptionals = { + ...getSharedAttributesForPublicMessage(args), + ...getSharedAttributesForOutgoingMessage(), }; return new MessageModel(messageData); @@ -130,29 +122,16 @@ export function createPublicMessageSentFromUs({ * This function is only called by the Receiver when we get a message * from someone else than ourself from an opengroup polling event */ -export function createPublicMessageSentFromNotUs({ - serverTimestamp, - serverId, - conversationId, - sender, -}: { +export function createPublicMessageSentFromNotUs(args: { serverId: number; sender: string; serverTimestamp: number; conversationId: string; }): MessageModel { const messageData: MessageAttributesOptionals = { - source: sender, - conversationId, - type: 'incoming' as MessageModelType, - serverTimestamp: serverTimestamp, - sent_at: serverTimestamp, - received_at: serverTimestamp, - serverId, - isPublic: true, - messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that - unread: 1, - expirationStartTimestamp: undefined, + ...getSharedAttributesForPublicMessage(args), + ...getSharedAttributesForIncomingMessage(), + source: args.sender, }; return new MessageModel(messageData); diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index 326ca27b5..d69929c42 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -1,5 +1,5 @@ import { EnvelopePlus } from './types'; -import { handleDataMessage } from './dataMessage'; +import { handleSwarmDataMessage } from './dataMessage'; import { removeFromCache, updateCache } from './cache'; import { SignalService } from '../protobuf'; @@ -25,20 +25,20 @@ import { } from '../interactions/conversations/unsendingInteractions'; import { SettingsKey } from '../data/settings-key'; -export async function handleContentMessage(envelope: EnvelopePlus, messageHash: string) { +export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) { try { const plaintext = await decrypt(envelope, envelope.content); if (!plaintext) { - // window?.log?.warn('handleContentMessage: plaintext was falsey'); + // window?.log?.warn('handleSwarmContentMessage: plaintext was falsey'); return; } else if (plaintext instanceof ArrayBuffer && plaintext.byteLength === 0) { return; } - perfStart(`innerHandleContentMessage-${envelope.id}`); + perfStart(`innerHandleSwarmContentMessage-${envelope.id}`); - await innerHandleContentMessage(envelope, plaintext, messageHash); - perfEnd(`innerHandleContentMessage-${envelope.id}`, 'innerHandleContentMessage'); + await innerHandleSwarmContentMessage(envelope, plaintext, messageHash); + perfEnd(`innerHandleSwarmContentMessage-${envelope.id}`, 'innerHandleSwarmContentMessage'); } catch (e) { window?.log?.warn(e); } @@ -326,7 +326,7 @@ function shouldDropBlockedUserMessage(content: SignalService.Content): boolean { return !isControlDataMessageOnly; } -export async function innerHandleContentMessage( +export async function innerHandleSwarmContentMessage( envelope: EnvelopePlus, plaintext: ArrayBuffer, messageHash: string @@ -350,22 +350,43 @@ export async function innerHandleContentMessage( } } - await getConversationController().getOrCreateAndWait( - envelope.source, + // if this is a direct message, envelope.senderIdentity is undefined + // if this is a closed group message, envelope.senderIdentity is the sender's pubkey and envelope.source is the closed group's pubkey + const isPrivateConversationMessage = !envelope.senderIdentity; + + /** + * For a closed group message, this holds the conversation with that specific user outside of the closed group. + * For a private conversation message, this is just the conversation with that user + */ + const senderConversationModel = await getConversationController().getOrCreateAndWait( + isPrivateConversationMessage ? envelope.source : envelope.senderIdentity, ConversationTypeEnum.PRIVATE ); + /** + * For a closed group message, this holds the closed group's conversation. + * For a private conversation message, this is just the conversation with that user + */ + if (!isPrivateConversationMessage) { + // this is a closed group message, we have a second conversation to make sure exists + await getConversationController().getOrCreateAndWait( + envelope.source, + ConversationTypeEnum.GROUP + ); + } + if (content.dataMessage) { if (content.dataMessage.profileKey && content.dataMessage.profileKey.length === 0) { content.dataMessage.profileKey = null; } - perfStart(`handleDataMessage-${envelope.id}`); - await handleDataMessage( + perfStart(`handleSwarmDataMessage-${envelope.id}`); + await handleSwarmDataMessage( envelope, content.dataMessage as SignalService.DataMessage, - messageHash + messageHash, + senderConversationModel ); - perfEnd(`handleDataMessage-${envelope.id}`, 'handleDataMessage'); + perfEnd(`handleSwarmDataMessage-${envelope.id}`, 'handleSwarmDataMessage'); return; } diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index 4adf8f4a0..02a0b64aa 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -4,14 +4,12 @@ import { EnvelopePlus } from './types'; import { getEnvelopeId } from './common'; import { PubKey } from '../session/types'; -import { handleMessageJob } from './queuedJob'; +import { handleMessageJob, toRegularMessage } from './queuedJob'; import { downloadAttachment } from './attachments'; import _ from 'lodash'; import { StringUtils, UserUtils } from '../session/utils'; import { getConversationController } from '../session/conversations'; import { handleClosedGroupControlMessage } from './closedGroups'; -import { MessageModel } from '../models/message'; -import { MessageAttributesOptionals, MessageModelType } from '../models/messageType'; import { getMessageBySenderAndSentAt, getMessageBySenderAndServerTimestamp, @@ -23,6 +21,12 @@ import { toLogFormat } from '../types/attachments/Errors'; import { processNewAttachment } from '../types/MessageAttachment'; import { MIME } from '../types'; import { autoScaleForIncomingAvatar } from '../util/attachmentsUtil'; +import { + createSwarmMessageSentFromNotUs, + createSwarmMessageSentFromUs, +} from '../models/messageFactory'; +import { MessageModel } from '../models/message'; +import { isUsFromCache } from '../session/utils/User'; export async function updateProfileOneAtATime( conversation: ConversationModel, @@ -125,7 +129,7 @@ function cleanAttachment(attachment: any) { }; } -function cleanAttachments(decrypted: any) { +function cleanAttachments(decrypted: SignalService.DataMessage) { const { quote, group } = decrypted; // Here we go from binary to string/base64 in all AttachmentPointer digest/key fields @@ -170,9 +174,28 @@ function cleanAttachments(decrypted: any) { } } -export async function processDecrypted( +export function isMessageEmpty(message: SignalService.DataMessage) { + const { flags, body, attachments, group, quote, preview, openGroupInvitation } = message; + + return ( + !flags && + // FIXME remove this hack to drop auto friend requests messages in a few weeks 15/07/2020 + isBodyEmpty(body) && + _.isEmpty(attachments) && + _.isEmpty(group) && + _.isEmpty(quote) && + _.isEmpty(preview) && + _.isEmpty(openGroupInvitation) + ); +} + +function isBodyEmpty(body: string) { + return _.isEmpty(body); +} + +async function cleanIncomingDataMessage( envelope: EnvelopePlus, - decrypted: SignalService.IDataMessage + rawDataMessage: SignalService.DataMessage ) { /* tslint:disable:no-bitwise */ const FLAGS = SignalService.DataMessage.Flags; @@ -182,45 +205,20 @@ export async function processDecrypted( // Note that messages may (generally) only perform one action and we ignore remaining // fields after the first action. - if (decrypted.flags == null) { - decrypted.flags = 0; + if (rawDataMessage.flags == null) { + rawDataMessage.flags = 0; } - if (decrypted.expireTimer == null) { - decrypted.expireTimer = 0; + if (rawDataMessage.expireTimer == null) { + rawDataMessage.expireTimer = 0; } - if (decrypted.flags & FLAGS.EXPIRATION_TIMER_UPDATE) { - decrypted.body = ''; - decrypted.attachments = []; - } else if (decrypted.flags !== 0) { + if (rawDataMessage.flags & FLAGS.EXPIRATION_TIMER_UPDATE) { + rawDataMessage.body = ''; + rawDataMessage.attachments = []; + } else if (rawDataMessage.flags !== 0) { throw new Error('Unknown flags in message'); } - if (decrypted.group) { - switch (decrypted.group.type) { - case SignalService.GroupContext.Type.UPDATE: - decrypted.body = ''; - decrypted.attachments = []; - break; - case SignalService.GroupContext.Type.QUIT: - decrypted.body = ''; - decrypted.attachments = []; - break; - case SignalService.GroupContext.Type.DELIVER: - decrypted.group.name = null; - decrypted.group.members = []; - decrypted.group.avatar = null; - break; - case SignalService.GroupContext.Type.REQUEST_INFO: - decrypted.body = ''; - decrypted.attachments = []; - break; - default: - await removeFromCache(envelope); - throw new Error('Unknown group message type'); - } - } - - const attachmentCount = decrypted?.attachments?.length || 0; + const attachmentCount = rawDataMessage?.attachments?.length || 0; const ATTACHMENT_MAX = 32; if (attachmentCount > ATTACHMENT_MAX) { await removeFromCache(envelope); @@ -228,35 +226,14 @@ export async function processDecrypted( `Too many attachments: ${attachmentCount} included in one message, max is ${ATTACHMENT_MAX}` ); } - - cleanAttachments(decrypted); + cleanAttachments(rawDataMessage); // if the decrypted dataMessage timestamp is not set, copy the one from the envelope - if (!_.toNumber(decrypted?.timestamp)) { - decrypted.timestamp = envelope.timestamp; + if (!_.isFinite(rawDataMessage?.timestamp)) { + rawDataMessage.timestamp = envelope.timestamp; } - return decrypted as SignalService.DataMessage; - /* tslint:disable:no-bitwise */ -} - -export function isMessageEmpty(message: SignalService.DataMessage) { - const { flags, body, attachments, group, quote, preview, openGroupInvitation } = message; - - return ( - !flags && - // FIXME remove this hack to drop auto friend requests messages in a few weeks 15/07/2020 - isBodyEmpty(body) && - _.isEmpty(attachments) && - _.isEmpty(group) && - _.isEmpty(quote) && - _.isEmpty(preview) && - _.isEmpty(openGroupInvitation) - ); -} - -function isBodyEmpty(body: string) { - return _.isEmpty(body); + return rawDataMessage; } /** @@ -270,114 +247,124 @@ function isBodyEmpty(body: string) { * * envelope.source is our pubkey (our other device has the same pubkey as us) * * dataMessage.syncTarget is either the group public key OR the private conversation this message is about. */ -export async function handleDataMessage( +// tslint:disable-next-line: cyclomatic-complexity +export async function handleSwarmDataMessage( envelope: EnvelopePlus, rawDataMessage: SignalService.DataMessage, - messageHash: string + messageHash: string, + senderConversationModel: ConversationModel ): Promise { + const cleanDataMessage = await cleanIncomingDataMessage(envelope, rawDataMessage); // we handle group updates from our other devices in handleClosedGroupControlMessage() - if (rawDataMessage.closedGroupControlMessage) { + if (cleanDataMessage.closedGroupControlMessage) { await handleClosedGroupControlMessage( envelope, - rawDataMessage.closedGroupControlMessage as SignalService.DataMessage.ClosedGroupControlMessage + cleanDataMessage.closedGroupControlMessage as SignalService.DataMessage.ClosedGroupControlMessage ); return; } - const message = await processDecrypted(envelope, rawDataMessage); - const source = rawDataMessage.syncTarget || envelope.source; - const senderPubKey = envelope.senderIdentity || envelope.source; - const isMe = UserUtils.isUsFromCache(senderPubKey); - const isSyncMessage = Boolean(rawDataMessage.syncTarget?.length); - - window?.log?.info(`Handle dataMessage from ${source} `); - - if (isSyncMessage && !isMe) { + /** + * This is a mess, but + * + * 1. if syncTarget is set and this is a synced message, syncTarget holds the conversationId in which this message is addressed. This syncTarget can be a private conversation pubkey or a closed group pubkey + * + * 2. for a closed group message, envelope.senderIdentity is the pubkey of the sender and envelope.source is the pubkey of the closed group. + * + * 3. for a private conversation message, envelope.senderIdentity and envelope.source are probably the pubkey of the sender. + */ + const isSyncedMessage = Boolean(cleanDataMessage.syncTarget?.length); + // no need to remove prefix here, as senderIdentity set => envelope.source is not used (and this is the one having the prefix when this is an opengroup) + const convoIdOfSender = envelope.senderIdentity || envelope.source; + const isMe = UserUtils.isUsFromCache(convoIdOfSender); + + if (isSyncedMessage && !isMe) { window?.log?.warn('Got a sync message from someone else than me. Dropping it.'); return removeFromCache(envelope); - } else if (isSyncMessage && rawDataMessage.syncTarget) { - // override the envelope source - envelope.source = rawDataMessage.syncTarget; + } else if (isSyncedMessage) { + // we should create the synTarget convo but I have no idea how to know if this is a private or closed group convo? } + const convoIdToAddTheMessageTo = PubKey.removeTextSecurePrefixIfNeeded( + isSyncedMessage ? cleanDataMessage.syncTarget : envelope.source + ); - const senderConversation = await getConversationController().getOrCreateAndWait( - senderPubKey, - ConversationTypeEnum.PRIVATE + const convoToAddMessageTo = await getConversationController().getOrCreateAndWait( + convoIdToAddTheMessageTo, + envelope.senderIdentity ? ConversationTypeEnum.GROUP : ConversationTypeEnum.PRIVATE ); + window?.log?.info( + `Handle dataMessage about convo ${convoIdToAddTheMessageTo} from user: ${convoIdOfSender}` + ); + // remove the prefix from the source object so this is correct for all other + // Check if we need to update any profile names - if (!isMe && senderConversation && message.profile) { + if ( + !isMe && + senderConversationModel && + cleanDataMessage.profile && + cleanDataMessage.profileKey?.length + ) { // do not await this - void updateProfileOneAtATime(senderConversation, message.profile, message.profileKey); + void updateProfileOneAtATime( + senderConversationModel, + cleanDataMessage.profile, + cleanDataMessage.profileKey + ); } - if (isMessageEmpty(message)) { + if (isMessageEmpty(cleanDataMessage)) { window?.log?.warn(`Message ${getEnvelopeId(envelope)} ignored; it was empty`); return removeFromCache(envelope); } - // Data messages for medium groups don't arrive as sync messages. Instead, - // linked devices poll for group messages independently, thus they need - // to recognise some of those messages at their own. + const sentAtTimestamp = _.toNumber(envelope.timestamp); - if (envelope.senderIdentity) { - message.group = { - id: envelope.source as any, // FIXME Uint8Array vs string - }; + if (!convoIdToAddTheMessageTo) { + window?.log?.error('We cannot handle a message without a conversationId'); + confirm(); + return; } - const confirm = () => removeFromCache(envelope); + const msgModel = + isSyncedMessage || (envelope.senderIdentity && isUsFromCache(envelope.senderIdentity)) + ? createSwarmMessageSentFromUs({ + conversationId: convoIdToAddTheMessageTo, + messageHash, + sentAt: sentAtTimestamp, + }) + : createSwarmMessageSentFromNotUs({ + conversationId: convoIdToAddTheMessageTo, + messageHash, + sender: senderConversationModel.id, + sentAt: sentAtTimestamp, + }); - const data: MessageCreationData = { - source: senderPubKey, - destination: isMe ? message.syncTarget : envelope.source, - timestamp: _.toNumber(envelope.timestamp), - receivedAt: envelope.receivedAt, + await handleSwarmMessage( + msgModel, messageHash, - isPublic: false, - serverId: null, - serverTimestamp: null, - groupId: message.group?.id?.length - ? PubKey.removeTextSecurePrefixIfNeeded(toHex(message.group?.id)) - : null, - }; - - await handleMessageEvent(!isMe, data, message, confirm); + sentAtTimestamp, + cleanDataMessage, + convoToAddMessageTo, + () => removeFromCache(envelope) + ); } -export type MessageId = { +export async function isSwarmMessageDuplicate({ + source, + sentAt, +}: { source: string; - serverId?: number | null; - serverTimestamp?: number | null; - timestamp: number; -}; - -export async function isMessageDuplicate({ source, timestamp, serverTimestamp }: MessageId) { - // serverTimestamp is only used for opengroupv2 + sentAt: number; +}) { try { - let result; - - if (serverTimestamp) { - // first try to find a duplicate with the same serverTimestamp from this sender - - result = await getMessageBySenderAndServerTimestamp({ - source, - serverTimestamp, - }); - - // if we have a result, it means a specific user sent two messages either with the same serverTimestamp. - // no need to do anything else, those messages must be the same - // Note: this test is not based on which conversation the user sent the message - // but we consider that a user sending two messages with the same serverTimestamp is unlikely - return Boolean(result); - } - result = await getMessageBySenderAndSentAt({ + const result = await getMessageBySenderAndSentAt({ source, - sentAt: timestamp, + sentAt, }); return Boolean(result); } catch (error) { - window?.log?.error('isMessageDuplicate error:', toLogFormat(error)); + window?.log?.error('isSwarmMessageDuplicate error:', toLogFormat(error)); return false; } } @@ -407,191 +394,39 @@ export async function isOpengroupMessageDuplicate({ } } -async function handleProfileUpdate( - profileKeyBuffer: Uint8Array, - convoId: string, - isIncoming: boolean -) { - if (!isIncoming) { - // We update our own profileKey if it's different from what we have - const ourNumber = UserUtils.getOurPubKeyStrFromCache(); - const me = getConversationController().getOrCreate(ourNumber, ConversationTypeEnum.PRIVATE); - - // Will do the save for us if needed - await me.setProfileKey(profileKeyBuffer); - } else { - const senderConvo = await getConversationController().getOrCreateAndWait( - convoId, - ConversationTypeEnum.PRIVATE - ); - - // Will do the save for us - await senderConvo.setProfileKey(profileKeyBuffer); - } -} - -export type MessageCreationData = { - timestamp: number; - receivedAt: number; - source: string; - isPublic: boolean; - serverId: number | null; - serverTimestamp: number | null; - groupId: string | null; - expirationStartTimestamp?: number; - destination: string; - messageHash: string; -}; - -function initIncomingMessage(data: MessageCreationData): MessageModel { - const { - timestamp, - isPublic, - receivedAt, - source, - serverId, - serverTimestamp, - messageHash, - groupId, - } = data; - - const messageData: MessageAttributesOptionals = { - source, - serverId: serverId || undefined, - sent_at: timestamp, - serverTimestamp: serverTimestamp || undefined, - received_at: receivedAt || Date.now(), - conversationId: groupId ?? source, - type: 'incoming', - direction: 'incoming', - unread: 1, - isPublic, - messageHash: messageHash || undefined, - }; - - return new MessageModel(messageData); -} - -function createSentMessage(data: MessageCreationData): MessageModel { - const now = Date.now(); - - const { - timestamp, - serverTimestamp, - serverId, - isPublic, - receivedAt, - expirationStartTimestamp, - destination, - groupId, - messageHash, - } = data; - - const sentSpecificFields = { - sent_to: [], - sent: true, - expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now), - }; - - const messageData: MessageAttributesOptionals = { - source: UserUtils.getOurPubKeyStrFromCache(), - serverTimestamp: serverTimestamp || undefined, - serverId: serverId || undefined, - sent_at: timestamp, - received_at: isPublic ? receivedAt : now, - isPublic, - conversationId: groupId ?? destination, - type: 'outgoing' as MessageModelType, - messageHash, - ...sentSpecificFields, - }; - - return new MessageModel(messageData); -} - -export function createMessage(data: MessageCreationData, isIncoming: boolean): MessageModel { - if (isIncoming) { - return initIncomingMessage(data); - } else { - return createSentMessage(data); - } -} - // tslint:disable:cyclomatic-complexity max-func-body-length */ -async function handleMessageEvent( - isIncoming: boolean, - messageCreationData: MessageCreationData, +async function handleSwarmMessage( + msgModel: MessageModel, + messageHash: string, + sentAt: number, rawDataMessage: SignalService.DataMessage, + convoToAddMessageTo: ConversationModel, confirm: () => void ): Promise { - if (!messageCreationData || !rawDataMessage) { - window?.log?.warn('Invalid data passed to handleMessageEvent.', event); - confirm(); - return; - } - - const { destination, messageHash } = messageCreationData; - - let { source } = messageCreationData; - - const isGroupMessage = Boolean(rawDataMessage.group); - - const type = isGroupMessage ? ConversationTypeEnum.GROUP : ConversationTypeEnum.PRIVATE; - - let conversationId = isIncoming ? source : destination || source; // for synced message - if (!conversationId) { - window?.log?.error('We cannot handle a message without a conversationId'); - confirm(); - return; - } - if (rawDataMessage.profileKey?.length) { - await handleProfileUpdate(rawDataMessage.profileKey, conversationId, isIncoming); - } - - const msg = createMessage(messageCreationData, isIncoming); - - // if the message is `sent` (from secondary device) we have to set the sender manually... (at least for now) - source = source || msg.get('source'); - - // Conversation Id is: - // - primarySource if it is an incoming DM message, - // - destination if it is an outgoing message, - // - group.id if it is a group message - if (isGroupMessage) { - // remove the prefix from the source object so this is correct for all other - (rawDataMessage as any).group.id = PubKey.removeTextSecurePrefixIfNeeded( - (rawDataMessage as any).group.id - ); - - conversationId = (rawDataMessage as any).group.id; - } - - if (!conversationId) { - window?.log?.warn('Invalid conversation id for incoming message', conversationId); - } - const ourNumber = UserUtils.getOurPubKeyStrFromCache(); - - // ========================================= - - if (!rawDataMessage.group && source !== ourNumber) { - // Ignore auth from our devices - conversationId = source; - } - - const conversation = await getConversationController().getOrCreateAndWait(conversationId, type); - - if (!conversation) { - window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId); + if (!rawDataMessage || !msgModel) { + window?.log?.warn('Invalid data passed to handleSwarmMessage.'); confirm(); return; } - void conversation.queueJob(async () => { - if (await isMessageDuplicate(messageCreationData)) { + void convoToAddMessageTo.queueJob(async () => { + // this call has to be made inside the queueJob! + const isDuplicate = await isSwarmMessageDuplicate({ + source: msgModel.get('source'), + sentAt, + }); + if (isDuplicate) { window?.log?.info('Received duplicate message. Dropping it.'); confirm(); return; } - await handleMessageJob(msg, conversation, rawDataMessage, confirm, source, messageHash); + await handleMessageJob( + msgModel, + convoToAddMessageTo, + toRegularMessage(rawDataMessage), + confirm, + msgModel.get('source'), + messageHash + ); }); } diff --git a/ts/receiver/opengroup.ts b/ts/receiver/opengroup.ts index 3cf69399f..2575989ee 100644 --- a/ts/receiver/opengroup.ts +++ b/ts/receiver/opengroup.ts @@ -13,7 +13,7 @@ import { removeMessagePadding } from '../session/crypto/BufferPadding'; import { UserUtils } from '../session/utils'; import { fromBase64ToArray } from '../session/utils/String'; import { isOpengroupMessageDuplicate } from './dataMessage'; -import { handleMessageJob } from './queuedJob'; +import { handleMessageJob, toRegularMessage } from './queuedJob'; export async function handleOpenGroupV2Message( message: OpenGroupMessageV2, @@ -79,7 +79,7 @@ export async function handleOpenGroupV2Message( await handleMessageJob( msgModel, conversation, - decoded?.dataMessage as SignalService.DataMessage, + toRegularMessage(decoded?.dataMessage as SignalService.DataMessage), noop, sender, '' diff --git a/ts/receiver/queuedJob.ts b/ts/receiver/queuedJob.ts index 6d2fcdc3e..873b33a62 100644 --- a/ts/receiver/queuedJob.ts +++ b/ts/receiver/queuedJob.ts @@ -168,9 +168,6 @@ function updateReadStatus(message: MessageModel, conversation: ConversationModel } async function handleSyncedReceipts(message: MessageModel, conversation: ConversationModel) { - console.warn('handleSyncedReceipts', message); - debugger; - // If the newly received message is from us, we assume that we've seen the messages up until that point const sentTimestamp = message.get('sent_at'); if (sentTimestamp) { @@ -178,10 +175,43 @@ async function handleSyncedReceipts(message: MessageModel, conversation: Convers } } +export type RegularMessageType = Pick< + SignalService.DataMessage, + | 'attachments' + | 'body' + | 'flags' + | 'openGroupInvitation' + | 'quote' + | 'preview' + | 'profile' + | 'profileKey' + | 'expireTimer' +> & { isRegularMessage: true }; + +/** + * This function is just used to make sure we do not forward things we shouldn't in the incoming message pipeline + */ +export function toRegularMessage(rawDataMessage: SignalService.DataMessage): RegularMessageType { + return { + ..._.pick(rawDataMessage, [ + 'attachments', + 'preview', + 'body', + 'flags', + 'profileKey', + 'openGroupInvitation', + 'quote', + 'profile', + 'expireTimer', + ]), + isRegularMessage: true, + }; +} + async function handleRegularMessage( conversation: ConversationModel, message: MessageModel, - rawDataMessage: SignalService.DataMessage, + rawDataMessage: RegularMessageType, source: string, messageHash: string ) { @@ -285,7 +315,7 @@ async function handleExpirationTimerUpdate( window?.log?.info("Update conversation 'expireTimer'", { id: conversation.idForLogging(), expireTimer, - source: 'handleDataMessage', + source: 'handleSwarmDataMessage', }); await conversation.updateExpireTimer(expireTimer, source, message.get('received_at')); @@ -294,21 +324,21 @@ async function handleExpirationTimerUpdate( export async function handleMessageJob( messageModel: MessageModel, conversation: ConversationModel, - rawDataMessage: SignalService.DataMessage, + regularDataMessage: RegularMessageType, confirm: () => void, source: string, messageHash: string ) { window?.log?.info( - `Starting handleDataMessage for message ${messageModel.idForLogging()}, ${messageModel.get( + `Starting handleSwarmDataMessage for message ${messageModel.idForLogging()}, ${messageModel.get( 'serverTimestamp' ) || messageModel.get('timestamp')} in conversation ${conversation.idForLogging()}` ); try { - messageModel.set({ flags: rawDataMessage.flags }); + messageModel.set({ flags: regularDataMessage.flags }); if (messageModel.isExpirationTimerUpdate()) { - const { expireTimer } = rawDataMessage; + const { expireTimer } = regularDataMessage; const oldValue = conversation.get('expireTimer'); if (expireTimer === oldValue) { confirm?.(); @@ -319,7 +349,13 @@ export async function handleMessageJob( } await handleExpirationTimerUpdate(conversation, messageModel, source, expireTimer); } else { - await handleRegularMessage(conversation, messageModel, rawDataMessage, source, messageHash); + await handleRegularMessage( + conversation, + messageModel, + regularDataMessage, + source, + messageHash + ); } const id = await messageModel.commit(); @@ -359,7 +395,11 @@ export async function handleMessageJob( await messageModel.markRead(Date.now()); } } catch (error) { - window?.log?.warn('handleDataMessage: Message', messageModel.idForLogging(), 'was deleted'); + window?.log?.warn( + 'handleSwarmDataMessage: Message', + messageModel.idForLogging(), + 'was deleted' + ); } // this updates the redux store. @@ -380,7 +420,12 @@ export async function handleMessageJob( } } catch (error) { const errorForLog = error && error.stack ? error.stack : error; - window?.log?.error('handleDataMessage', messageModel.idForLogging(), 'error:', errorForLog); + window?.log?.error( + 'handleSwarmDataMessage', + messageModel.idForLogging(), + 'error:', + errorForLog + ); throw error; } diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index 47bdf51cc..5e4270eea 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -4,8 +4,8 @@ import { v4 as uuidv4 } from 'uuid'; import { addToCache, getAllFromCache, getAllFromCacheForSource, removeFromCache } from './cache'; -// innerHandleContentMessage is only needed because of code duplication in handleDecryptedEnvelope... -import { handleContentMessage, innerHandleContentMessage } from './contentMessage'; +// innerHandleSwarmContentMessage is only needed because of code duplication in handleDecryptedEnvelope... +import { handleSwarmContentMessage, innerHandleSwarmContentMessage } from './contentMessage'; import _ from 'lodash'; import { getEnvelopeId } from './common'; @@ -23,9 +23,9 @@ interface ReqOptions { const incomingMessagePromises: Array> = []; -async function handleEnvelope(envelope: EnvelopePlus, messageHash: string) { +async function handleSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) { if (envelope.content && envelope.content.length > 0) { - return handleContentMessage(envelope, messageHash); + return handleSwarmContentMessage(envelope, messageHash); } await removeFromCache(envelope); @@ -55,18 +55,18 @@ class EnvelopeQueue { const envelopeQueue = new EnvelopeQueue(); -function queueEnvelope(envelope: EnvelopePlus, messageHash: string) { +function queueSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) { const id = getEnvelopeId(envelope); // window?.log?.info('queueing envelope', id); - const task = handleEnvelope.bind(null, envelope, messageHash); - const taskWithTimeout = createTaskWithTimeout(task, `queueEnvelope ${id}`); + const task = handleSwarmEnvelope.bind(null, envelope, messageHash); + const taskWithTimeout = createTaskWithTimeout(task, `queueSwarmEnvelope ${id}`); try { envelopeQueue.add(taskWithTimeout); } catch (error) { window?.log?.error( - 'queueEnvelope error handling envelope', + 'queueSwarmEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error @@ -123,7 +123,7 @@ async function handleRequestDetail( await lastPromise; - queueEnvelope(envelope, messageHash); + queueSwarmEnvelope(envelope, messageHash); } catch (error) { window?.log?.error( 'handleRequest error trying to add message to cache:', @@ -185,7 +185,7 @@ async function queueCached(item: any) { queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash); } else { - queueEnvelope(envelope, envelope.messageHash); + queueSwarmEnvelope(envelope, envelope.messageHash); } } catch (error) { window?.log?.error( @@ -230,12 +230,8 @@ async function handleDecryptedEnvelope( plaintext: ArrayBuffer, messageHash: string ) { - // if (this.stoppingProcessing) { - // return Promise.resolve(); - // } - if (envelope.content) { - await innerHandleContentMessage(envelope, plaintext, messageHash); + await innerHandleSwarmContentMessage(envelope, plaintext, messageHash); } else { await removeFromCache(envelope); } diff --git a/ts/session/messages/outgoing/visibleMessage/VisibleMessage.ts b/ts/session/messages/outgoing/visibleMessage/VisibleMessage.ts index fa1cce4f4..49a12a529 100644 --- a/ts/session/messages/outgoing/visibleMessage/VisibleMessage.ts +++ b/ts/session/messages/outgoing/visibleMessage/VisibleMessage.ts @@ -66,7 +66,7 @@ export interface VisibleMessageParams extends MessageParams { expireTimer?: number; lokiProfile?: LokiProfile; preview?: Array; - syncTarget?: string; // null means it is not a synced message + syncTarget?: string; // undefined means it is not a synced message } export class VisibleMessage extends DataMessage {