cleanup incoming message creation for displaying messageresult

pull/2142/head
Audric Ackermann 3 years ago
parent 76bfd5992c
commit b3a8692240
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -960,7 +960,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
// if this message needs to be synced
if (
(dataMessage.body && dataMessage.body.length) ||
dataMessage.body?.length ||
dataMessage.attachments.length ||
dataMessage.flags === SignalService.DataMessage.Flags.EXPIRATION_TIMER_UPDATE
) {

@ -2,103 +2,67 @@ import { UserUtils } from '../session/utils';
import { MessageModel } from './message';
import { MessageAttributesOptionals, MessageModelType } from './messageType';
export type MessageCreationData = {
timestamp: number;
receivedAt: number;
source: string;
isPublic: boolean;
serverId: number | null;
serverTimestamp: number | null;
groupId: string | null;
expirationStartTimestamp?: number;
destination: string;
function getSharedAttributesForSwarmMessage({
conversationId,
messageHash,
sentAt,
}: {
conversationId: string;
messageHash: string;
};
function initIncomingMessage(data: MessageCreationData): MessageModel {
const {
timestamp,
isPublic,
receivedAt,
source,
serverId,
serverTimestamp,
sentAt: number;
}) {
const now = Date.now();
return {
sent_at: sentAt,
received_at: now,
conversationId,
messageHash,
groupId,
} = data;
};
}
/**
* This function is only called when we get a message from ourself from a swarm polling event.
*
* NOTE: conversationId has to be the conversation in which this message should be added. So
* either syncTarget, groupId or envelope.source or senderIdentity
*/
export function createSwarmMessageSentFromUs(args: {
messageHash: string;
sentAt: number;
conversationId: string;
}): MessageModel {
// for messages we did send, we mark it as read and start the expiration timer
const messageData: MessageAttributesOptionals = {
source,
serverId: serverId || undefined,
sent_at: timestamp,
serverTimestamp: serverTimestamp || undefined,
received_at: receivedAt || Date.now(),
conversationId: groupId ?? source,
type: 'incoming',
direction: 'incoming',
unread: 1,
isPublic,
messageHash: messageHash || undefined,
...getSharedAttributesForSwarmMessage(args),
...getSharedAttributesForOutgoingMessage(),
expirationStartTimestamp: Math.min(args.sentAt, Date.now()),
};
return new MessageModel(messageData);
}
/**
* This function can be called for either a sync message or a message synced through an opengroup poll.
* This does not save it to the db, just in memory
* This function is only called by the Receiver when we get a message
* from someone else than ourself from a swarm polling event
* NOTE: conversationId has to be the conversation in which this message should be added. So
* either syncTarget, groupId or envelope.source or senderIdentity
*/
function createMessageSentFromOurself({
timestamp,
serverTimestamp,
serverId,
isPublic,
receivedAt,
expirationStartTimestamp,
destination,
groupId,
messageHash,
}: {
timestamp: number;
receivedAt: number;
isPublic: boolean;
serverId: number | null;
serverTimestamp: number | null;
groupId: string | null;
expirationStartTimestamp: number | null;
destination: string;
export function createSwarmMessageSentFromNotUs(args: {
messageHash: string;
sentAt: number;
sender: string;
conversationId: string;
}): MessageModel {
// Omit<
// MessageAttributesOptionals,
// 'conversationId' | 'source' | 'type' | 'direction' | 'received_at'
// >
const now = Date.now();
const messageData: MessageAttributesOptionals = {
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'outgoing' as MessageModelType,
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
sent_at: timestamp,
received_at: isPublic ? receivedAt : now,
isPublic,
conversationId: groupId ?? destination,
messageHash,
unread: 0,
sent_to: [],
sent: true,
expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now),
...getSharedAttributesForSwarmMessage(args),
...getSharedAttributesForIncomingMessage(),
source: args.sender,
};
return new MessageModel(messageData);
}
/**
* This function is only called when we get a message from ourself from an opengroup polling event
*/
export function createPublicMessageSentFromUs({
function getSharedAttributesForPublicMessage({
serverTimestamp,
serverId,
conversationId,
@ -106,10 +70,8 @@ export function createPublicMessageSentFromUs({
serverId: number;
serverTimestamp: number;
conversationId: string;
}): MessageModel {
const messageData: MessageAttributesOptionals = {
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'outgoing' as MessageModelType,
}) {
return {
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
sent_at: serverTimestamp,
@ -117,10 +79,40 @@ export function createPublicMessageSentFromUs({
isPublic: true,
conversationId,
messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that
expirationStartTimestamp: undefined,
};
}
function getSharedAttributesForOutgoingMessage() {
return {
source: UserUtils.getOurPubKeyStrFromCache(),
unread: 0,
sent_to: [],
sent: true,
expirationStartTimestamp: undefined,
type: 'outgoing' as MessageModelType,
direction: 'outgoing' as MessageModelType,
};
}
function getSharedAttributesForIncomingMessage() {
return {
unread: 1,
type: 'incoming' as MessageModelType,
direction: 'incoming' as MessageModelType,
};
}
/**
* This function is only called when we get a message from ourself from an opengroup polling event
*/
export function createPublicMessageSentFromUs(args: {
serverId: number;
serverTimestamp: number;
conversationId: string;
}): MessageModel {
const messageData: MessageAttributesOptionals = {
...getSharedAttributesForPublicMessage(args),
...getSharedAttributesForOutgoingMessage(),
};
return new MessageModel(messageData);
@ -130,29 +122,16 @@ export function createPublicMessageSentFromUs({
* This function is only called by the Receiver when we get a message
* from someone else than ourself from an opengroup polling event
*/
export function createPublicMessageSentFromNotUs({
serverTimestamp,
serverId,
conversationId,
sender,
}: {
export function createPublicMessageSentFromNotUs(args: {
serverId: number;
sender: string;
serverTimestamp: number;
conversationId: string;
}): MessageModel {
const messageData: MessageAttributesOptionals = {
source: sender,
conversationId,
type: 'incoming' as MessageModelType,
serverTimestamp: serverTimestamp,
sent_at: serverTimestamp,
received_at: serverTimestamp,
serverId,
isPublic: true,
messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that
unread: 1,
expirationStartTimestamp: undefined,
...getSharedAttributesForPublicMessage(args),
...getSharedAttributesForIncomingMessage(),
source: args.sender,
};
return new MessageModel(messageData);

@ -1,5 +1,5 @@
import { EnvelopePlus } from './types';
import { handleDataMessage } from './dataMessage';
import { handleSwarmDataMessage } from './dataMessage';
import { removeFromCache, updateCache } from './cache';
import { SignalService } from '../protobuf';
@ -25,20 +25,20 @@ import {
} from '../interactions/conversations/unsendingInteractions';
import { SettingsKey } from '../data/settings-key';
export async function handleContentMessage(envelope: EnvelopePlus, messageHash: string) {
export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) {
try {
const plaintext = await decrypt(envelope, envelope.content);
if (!plaintext) {
// window?.log?.warn('handleContentMessage: plaintext was falsey');
// window?.log?.warn('handleSwarmContentMessage: plaintext was falsey');
return;
} else if (plaintext instanceof ArrayBuffer && plaintext.byteLength === 0) {
return;
}
perfStart(`innerHandleContentMessage-${envelope.id}`);
perfStart(`innerHandleSwarmContentMessage-${envelope.id}`);
await innerHandleContentMessage(envelope, plaintext, messageHash);
perfEnd(`innerHandleContentMessage-${envelope.id}`, 'innerHandleContentMessage');
await innerHandleSwarmContentMessage(envelope, plaintext, messageHash);
perfEnd(`innerHandleSwarmContentMessage-${envelope.id}`, 'innerHandleSwarmContentMessage');
} catch (e) {
window?.log?.warn(e);
}
@ -326,7 +326,7 @@ function shouldDropBlockedUserMessage(content: SignalService.Content): boolean {
return !isControlDataMessageOnly;
}
export async function innerHandleContentMessage(
export async function innerHandleSwarmContentMessage(
envelope: EnvelopePlus,
plaintext: ArrayBuffer,
messageHash: string
@ -350,22 +350,43 @@ export async function innerHandleContentMessage(
}
}
await getConversationController().getOrCreateAndWait(
envelope.source,
// if this is a direct message, envelope.senderIdentity is undefined
// if this is a closed group message, envelope.senderIdentity is the sender's pubkey and envelope.source is the closed group's pubkey
const isPrivateConversationMessage = !envelope.senderIdentity;
/**
* For a closed group message, this holds the conversation with that specific user outside of the closed group.
* For a private conversation message, this is just the conversation with that user
*/
const senderConversationModel = await getConversationController().getOrCreateAndWait(
isPrivateConversationMessage ? envelope.source : envelope.senderIdentity,
ConversationTypeEnum.PRIVATE
);
/**
* For a closed group message, this holds the closed group's conversation.
* For a private conversation message, this is just the conversation with that user
*/
if (!isPrivateConversationMessage) {
// this is a closed group message, we have a second conversation to make sure exists
await getConversationController().getOrCreateAndWait(
envelope.source,
ConversationTypeEnum.GROUP
);
}
if (content.dataMessage) {
if (content.dataMessage.profileKey && content.dataMessage.profileKey.length === 0) {
content.dataMessage.profileKey = null;
}
perfStart(`handleDataMessage-${envelope.id}`);
await handleDataMessage(
perfStart(`handleSwarmDataMessage-${envelope.id}`);
await handleSwarmDataMessage(
envelope,
content.dataMessage as SignalService.DataMessage,
messageHash
messageHash,
senderConversationModel
);
perfEnd(`handleDataMessage-${envelope.id}`, 'handleDataMessage');
perfEnd(`handleSwarmDataMessage-${envelope.id}`, 'handleSwarmDataMessage');
return;
}

@ -4,14 +4,12 @@ import { EnvelopePlus } from './types';
import { getEnvelopeId } from './common';
import { PubKey } from '../session/types';
import { handleMessageJob } from './queuedJob';
import { handleMessageJob, toRegularMessage } from './queuedJob';
import { downloadAttachment } from './attachments';
import _ from 'lodash';
import { StringUtils, UserUtils } from '../session/utils';
import { getConversationController } from '../session/conversations';
import { handleClosedGroupControlMessage } from './closedGroups';
import { MessageModel } from '../models/message';
import { MessageAttributesOptionals, MessageModelType } from '../models/messageType';
import {
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
@ -23,6 +21,12 @@ import { toLogFormat } from '../types/attachments/Errors';
import { processNewAttachment } from '../types/MessageAttachment';
import { MIME } from '../types';
import { autoScaleForIncomingAvatar } from '../util/attachmentsUtil';
import {
createSwarmMessageSentFromNotUs,
createSwarmMessageSentFromUs,
} from '../models/messageFactory';
import { MessageModel } from '../models/message';
import { isUsFromCache } from '../session/utils/User';
export async function updateProfileOneAtATime(
conversation: ConversationModel,
@ -125,7 +129,7 @@ function cleanAttachment(attachment: any) {
};
}
function cleanAttachments(decrypted: any) {
function cleanAttachments(decrypted: SignalService.DataMessage) {
const { quote, group } = decrypted;
// Here we go from binary to string/base64 in all AttachmentPointer digest/key fields
@ -170,9 +174,28 @@ function cleanAttachments(decrypted: any) {
}
}
export async function processDecrypted(
export function isMessageEmpty(message: SignalService.DataMessage) {
const { flags, body, attachments, group, quote, preview, openGroupInvitation } = message;
return (
!flags &&
// FIXME remove this hack to drop auto friend requests messages in a few weeks 15/07/2020
isBodyEmpty(body) &&
_.isEmpty(attachments) &&
_.isEmpty(group) &&
_.isEmpty(quote) &&
_.isEmpty(preview) &&
_.isEmpty(openGroupInvitation)
);
}
function isBodyEmpty(body: string) {
return _.isEmpty(body);
}
async function cleanIncomingDataMessage(
envelope: EnvelopePlus,
decrypted: SignalService.IDataMessage
rawDataMessage: SignalService.DataMessage
) {
/* tslint:disable:no-bitwise */
const FLAGS = SignalService.DataMessage.Flags;
@ -182,45 +205,20 @@ export async function processDecrypted(
// Note that messages may (generally) only perform one action and we ignore remaining
// fields after the first action.
if (decrypted.flags == null) {
decrypted.flags = 0;
if (rawDataMessage.flags == null) {
rawDataMessage.flags = 0;
}
if (decrypted.expireTimer == null) {
decrypted.expireTimer = 0;
if (rawDataMessage.expireTimer == null) {
rawDataMessage.expireTimer = 0;
}
if (decrypted.flags & FLAGS.EXPIRATION_TIMER_UPDATE) {
decrypted.body = '';
decrypted.attachments = [];
} else if (decrypted.flags !== 0) {
if (rawDataMessage.flags & FLAGS.EXPIRATION_TIMER_UPDATE) {
rawDataMessage.body = '';
rawDataMessage.attachments = [];
} else if (rawDataMessage.flags !== 0) {
throw new Error('Unknown flags in message');
}
if (decrypted.group) {
switch (decrypted.group.type) {
case SignalService.GroupContext.Type.UPDATE:
decrypted.body = '';
decrypted.attachments = [];
break;
case SignalService.GroupContext.Type.QUIT:
decrypted.body = '';
decrypted.attachments = [];
break;
case SignalService.GroupContext.Type.DELIVER:
decrypted.group.name = null;
decrypted.group.members = [];
decrypted.group.avatar = null;
break;
case SignalService.GroupContext.Type.REQUEST_INFO:
decrypted.body = '';
decrypted.attachments = [];
break;
default:
await removeFromCache(envelope);
throw new Error('Unknown group message type');
}
}
const attachmentCount = decrypted?.attachments?.length || 0;
const attachmentCount = rawDataMessage?.attachments?.length || 0;
const ATTACHMENT_MAX = 32;
if (attachmentCount > ATTACHMENT_MAX) {
await removeFromCache(envelope);
@ -228,35 +226,14 @@ export async function processDecrypted(
`Too many attachments: ${attachmentCount} included in one message, max is ${ATTACHMENT_MAX}`
);
}
cleanAttachments(decrypted);
cleanAttachments(rawDataMessage);
// if the decrypted dataMessage timestamp is not set, copy the one from the envelope
if (!_.toNumber(decrypted?.timestamp)) {
decrypted.timestamp = envelope.timestamp;
if (!_.isFinite(rawDataMessage?.timestamp)) {
rawDataMessage.timestamp = envelope.timestamp;
}
return decrypted as SignalService.DataMessage;
/* tslint:disable:no-bitwise */
}
export function isMessageEmpty(message: SignalService.DataMessage) {
const { flags, body, attachments, group, quote, preview, openGroupInvitation } = message;
return (
!flags &&
// FIXME remove this hack to drop auto friend requests messages in a few weeks 15/07/2020
isBodyEmpty(body) &&
_.isEmpty(attachments) &&
_.isEmpty(group) &&
_.isEmpty(quote) &&
_.isEmpty(preview) &&
_.isEmpty(openGroupInvitation)
);
}
function isBodyEmpty(body: string) {
return _.isEmpty(body);
return rawDataMessage;
}
/**
@ -270,114 +247,124 @@ function isBodyEmpty(body: string) {
* * envelope.source is our pubkey (our other device has the same pubkey as us)
* * dataMessage.syncTarget is either the group public key OR the private conversation this message is about.
*/
export async function handleDataMessage(
// tslint:disable-next-line: cyclomatic-complexity
export async function handleSwarmDataMessage(
envelope: EnvelopePlus,
rawDataMessage: SignalService.DataMessage,
messageHash: string
messageHash: string,
senderConversationModel: ConversationModel
): Promise<void> {
const cleanDataMessage = await cleanIncomingDataMessage(envelope, rawDataMessage);
// we handle group updates from our other devices in handleClosedGroupControlMessage()
if (rawDataMessage.closedGroupControlMessage) {
if (cleanDataMessage.closedGroupControlMessage) {
await handleClosedGroupControlMessage(
envelope,
rawDataMessage.closedGroupControlMessage as SignalService.DataMessage.ClosedGroupControlMessage
cleanDataMessage.closedGroupControlMessage as SignalService.DataMessage.ClosedGroupControlMessage
);
return;
}
const message = await processDecrypted(envelope, rawDataMessage);
const source = rawDataMessage.syncTarget || envelope.source;
const senderPubKey = envelope.senderIdentity || envelope.source;
const isMe = UserUtils.isUsFromCache(senderPubKey);
const isSyncMessage = Boolean(rawDataMessage.syncTarget?.length);
window?.log?.info(`Handle dataMessage from ${source} `);
if (isSyncMessage && !isMe) {
/**
* This is a mess, but
*
* 1. if syncTarget is set and this is a synced message, syncTarget holds the conversationId in which this message is addressed. This syncTarget can be a private conversation pubkey or a closed group pubkey
*
* 2. for a closed group message, envelope.senderIdentity is the pubkey of the sender and envelope.source is the pubkey of the closed group.
*
* 3. for a private conversation message, envelope.senderIdentity and envelope.source are probably the pubkey of the sender.
*/
const isSyncedMessage = Boolean(cleanDataMessage.syncTarget?.length);
// no need to remove prefix here, as senderIdentity set => envelope.source is not used (and this is the one having the prefix when this is an opengroup)
const convoIdOfSender = envelope.senderIdentity || envelope.source;
const isMe = UserUtils.isUsFromCache(convoIdOfSender);
if (isSyncedMessage && !isMe) {
window?.log?.warn('Got a sync message from someone else than me. Dropping it.');
return removeFromCache(envelope);
} else if (isSyncMessage && rawDataMessage.syncTarget) {
// override the envelope source
envelope.source = rawDataMessage.syncTarget;
} else if (isSyncedMessage) {
// we should create the synTarget convo but I have no idea how to know if this is a private or closed group convo?
}
const convoIdToAddTheMessageTo = PubKey.removeTextSecurePrefixIfNeeded(
isSyncedMessage ? cleanDataMessage.syncTarget : envelope.source
);
const senderConversation = await getConversationController().getOrCreateAndWait(
senderPubKey,
ConversationTypeEnum.PRIVATE
const convoToAddMessageTo = await getConversationController().getOrCreateAndWait(
convoIdToAddTheMessageTo,
envelope.senderIdentity ? ConversationTypeEnum.GROUP : ConversationTypeEnum.PRIVATE
);
window?.log?.info(
`Handle dataMessage about convo ${convoIdToAddTheMessageTo} from user: ${convoIdOfSender}`
);
// remove the prefix from the source object so this is correct for all other
// Check if we need to update any profile names
if (!isMe && senderConversation && message.profile) {
if (
!isMe &&
senderConversationModel &&
cleanDataMessage.profile &&
cleanDataMessage.profileKey?.length
) {
// do not await this
void updateProfileOneAtATime(senderConversation, message.profile, message.profileKey);
void updateProfileOneAtATime(
senderConversationModel,
cleanDataMessage.profile,
cleanDataMessage.profileKey
);
}
if (isMessageEmpty(message)) {
if (isMessageEmpty(cleanDataMessage)) {
window?.log?.warn(`Message ${getEnvelopeId(envelope)} ignored; it was empty`);
return removeFromCache(envelope);
}
// Data messages for medium groups don't arrive as sync messages. Instead,
// linked devices poll for group messages independently, thus they need
// to recognise some of those messages at their own.
const sentAtTimestamp = _.toNumber(envelope.timestamp);
if (envelope.senderIdentity) {
message.group = {
id: envelope.source as any, // FIXME Uint8Array vs string
};
if (!convoIdToAddTheMessageTo) {
window?.log?.error('We cannot handle a message without a conversationId');
confirm();
return;
}
const confirm = () => removeFromCache(envelope);
const msgModel =
isSyncedMessage || (envelope.senderIdentity && isUsFromCache(envelope.senderIdentity))
? createSwarmMessageSentFromUs({
conversationId: convoIdToAddTheMessageTo,
messageHash,
sentAt: sentAtTimestamp,
})
: createSwarmMessageSentFromNotUs({
conversationId: convoIdToAddTheMessageTo,
messageHash,
sender: senderConversationModel.id,
sentAt: sentAtTimestamp,
});
const data: MessageCreationData = {
source: senderPubKey,
destination: isMe ? message.syncTarget : envelope.source,
timestamp: _.toNumber(envelope.timestamp),
receivedAt: envelope.receivedAt,
await handleSwarmMessage(
msgModel,
messageHash,
isPublic: false,
serverId: null,
serverTimestamp: null,
groupId: message.group?.id?.length
? PubKey.removeTextSecurePrefixIfNeeded(toHex(message.group?.id))
: null,
};
await handleMessageEvent(!isMe, data, message, confirm);
sentAtTimestamp,
cleanDataMessage,
convoToAddMessageTo,
() => removeFromCache(envelope)
);
}
export type MessageId = {
export async function isSwarmMessageDuplicate({
source,
sentAt,
}: {
source: string;
serverId?: number | null;
serverTimestamp?: number | null;
timestamp: number;
};
export async function isMessageDuplicate({ source, timestamp, serverTimestamp }: MessageId) {
// serverTimestamp is only used for opengroupv2
sentAt: number;
}) {
try {
let result;
if (serverTimestamp) {
// first try to find a duplicate with the same serverTimestamp from this sender
result = await getMessageBySenderAndServerTimestamp({
source,
serverTimestamp,
});
// if we have a result, it means a specific user sent two messages either with the same serverTimestamp.
// no need to do anything else, those messages must be the same
// Note: this test is not based on which conversation the user sent the message
// but we consider that a user sending two messages with the same serverTimestamp is unlikely
return Boolean(result);
}
result = await getMessageBySenderAndSentAt({
const result = await getMessageBySenderAndSentAt({
source,
sentAt: timestamp,
sentAt,
});
return Boolean(result);
} catch (error) {
window?.log?.error('isMessageDuplicate error:', toLogFormat(error));
window?.log?.error('isSwarmMessageDuplicate error:', toLogFormat(error));
return false;
}
}
@ -407,191 +394,39 @@ export async function isOpengroupMessageDuplicate({
}
}
async function handleProfileUpdate(
profileKeyBuffer: Uint8Array,
convoId: string,
isIncoming: boolean
) {
if (!isIncoming) {
// We update our own profileKey if it's different from what we have
const ourNumber = UserUtils.getOurPubKeyStrFromCache();
const me = getConversationController().getOrCreate(ourNumber, ConversationTypeEnum.PRIVATE);
// Will do the save for us if needed
await me.setProfileKey(profileKeyBuffer);
} else {
const senderConvo = await getConversationController().getOrCreateAndWait(
convoId,
ConversationTypeEnum.PRIVATE
);
// Will do the save for us
await senderConvo.setProfileKey(profileKeyBuffer);
}
}
export type MessageCreationData = {
timestamp: number;
receivedAt: number;
source: string;
isPublic: boolean;
serverId: number | null;
serverTimestamp: number | null;
groupId: string | null;
expirationStartTimestamp?: number;
destination: string;
messageHash: string;
};
function initIncomingMessage(data: MessageCreationData): MessageModel {
const {
timestamp,
isPublic,
receivedAt,
source,
serverId,
serverTimestamp,
messageHash,
groupId,
} = data;
const messageData: MessageAttributesOptionals = {
source,
serverId: serverId || undefined,
sent_at: timestamp,
serverTimestamp: serverTimestamp || undefined,
received_at: receivedAt || Date.now(),
conversationId: groupId ?? source,
type: 'incoming',
direction: 'incoming',
unread: 1,
isPublic,
messageHash: messageHash || undefined,
};
return new MessageModel(messageData);
}
function createSentMessage(data: MessageCreationData): MessageModel {
const now = Date.now();
const {
timestamp,
serverTimestamp,
serverId,
isPublic,
receivedAt,
expirationStartTimestamp,
destination,
groupId,
messageHash,
} = data;
const sentSpecificFields = {
sent_to: [],
sent: true,
expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now),
};
const messageData: MessageAttributesOptionals = {
source: UserUtils.getOurPubKeyStrFromCache(),
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
sent_at: timestamp,
received_at: isPublic ? receivedAt : now,
isPublic,
conversationId: groupId ?? destination,
type: 'outgoing' as MessageModelType,
messageHash,
...sentSpecificFields,
};
return new MessageModel(messageData);
}
export function createMessage(data: MessageCreationData, isIncoming: boolean): MessageModel {
if (isIncoming) {
return initIncomingMessage(data);
} else {
return createSentMessage(data);
}
}
// tslint:disable:cyclomatic-complexity max-func-body-length */
async function handleMessageEvent(
isIncoming: boolean,
messageCreationData: MessageCreationData,
async function handleSwarmMessage(
msgModel: MessageModel,
messageHash: string,
sentAt: number,
rawDataMessage: SignalService.DataMessage,
convoToAddMessageTo: ConversationModel,
confirm: () => void
): Promise<void> {
if (!messageCreationData || !rawDataMessage) {
window?.log?.warn('Invalid data passed to handleMessageEvent.', event);
confirm();
return;
}
const { destination, messageHash } = messageCreationData;
let { source } = messageCreationData;
const isGroupMessage = Boolean(rawDataMessage.group);
const type = isGroupMessage ? ConversationTypeEnum.GROUP : ConversationTypeEnum.PRIVATE;
let conversationId = isIncoming ? source : destination || source; // for synced message
if (!conversationId) {
window?.log?.error('We cannot handle a message without a conversationId');
confirm();
return;
}
if (rawDataMessage.profileKey?.length) {
await handleProfileUpdate(rawDataMessage.profileKey, conversationId, isIncoming);
}
const msg = createMessage(messageCreationData, isIncoming);
// if the message is `sent` (from secondary device) we have to set the sender manually... (at least for now)
source = source || msg.get('source');
// Conversation Id is:
// - primarySource if it is an incoming DM message,
// - destination if it is an outgoing message,
// - group.id if it is a group message
if (isGroupMessage) {
// remove the prefix from the source object so this is correct for all other
(rawDataMessage as any).group.id = PubKey.removeTextSecurePrefixIfNeeded(
(rawDataMessage as any).group.id
);
conversationId = (rawDataMessage as any).group.id;
}
if (!conversationId) {
window?.log?.warn('Invalid conversation id for incoming message', conversationId);
}
const ourNumber = UserUtils.getOurPubKeyStrFromCache();
// =========================================
if (!rawDataMessage.group && source !== ourNumber) {
// Ignore auth from our devices
conversationId = source;
}
const conversation = await getConversationController().getOrCreateAndWait(conversationId, type);
if (!conversation) {
window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId);
if (!rawDataMessage || !msgModel) {
window?.log?.warn('Invalid data passed to handleSwarmMessage.');
confirm();
return;
}
void conversation.queueJob(async () => {
if (await isMessageDuplicate(messageCreationData)) {
void convoToAddMessageTo.queueJob(async () => {
// this call has to be made inside the queueJob!
const isDuplicate = await isSwarmMessageDuplicate({
source: msgModel.get('source'),
sentAt,
});
if (isDuplicate) {
window?.log?.info('Received duplicate message. Dropping it.');
confirm();
return;
}
await handleMessageJob(msg, conversation, rawDataMessage, confirm, source, messageHash);
await handleMessageJob(
msgModel,
convoToAddMessageTo,
toRegularMessage(rawDataMessage),
confirm,
msgModel.get('source'),
messageHash
);
});
}

@ -13,7 +13,7 @@ import { removeMessagePadding } from '../session/crypto/BufferPadding';
import { UserUtils } from '../session/utils';
import { fromBase64ToArray } from '../session/utils/String';
import { isOpengroupMessageDuplicate } from './dataMessage';
import { handleMessageJob } from './queuedJob';
import { handleMessageJob, toRegularMessage } from './queuedJob';
export async function handleOpenGroupV2Message(
message: OpenGroupMessageV2,
@ -79,7 +79,7 @@ export async function handleOpenGroupV2Message(
await handleMessageJob(
msgModel,
conversation,
decoded?.dataMessage as SignalService.DataMessage,
toRegularMessage(decoded?.dataMessage as SignalService.DataMessage),
noop,
sender,
''

@ -168,9 +168,6 @@ function updateReadStatus(message: MessageModel, conversation: ConversationModel
}
async function handleSyncedReceipts(message: MessageModel, conversation: ConversationModel) {
console.warn('handleSyncedReceipts', message);
debugger;
// If the newly received message is from us, we assume that we've seen the messages up until that point
const sentTimestamp = message.get('sent_at');
if (sentTimestamp) {
@ -178,10 +175,43 @@ async function handleSyncedReceipts(message: MessageModel, conversation: Convers
}
}
export type RegularMessageType = Pick<
SignalService.DataMessage,
| 'attachments'
| 'body'
| 'flags'
| 'openGroupInvitation'
| 'quote'
| 'preview'
| 'profile'
| 'profileKey'
| 'expireTimer'
> & { isRegularMessage: true };
/**
* This function is just used to make sure we do not forward things we shouldn't in the incoming message pipeline
*/
export function toRegularMessage(rawDataMessage: SignalService.DataMessage): RegularMessageType {
return {
..._.pick(rawDataMessage, [
'attachments',
'preview',
'body',
'flags',
'profileKey',
'openGroupInvitation',
'quote',
'profile',
'expireTimer',
]),
isRegularMessage: true,
};
}
async function handleRegularMessage(
conversation: ConversationModel,
message: MessageModel,
rawDataMessage: SignalService.DataMessage,
rawDataMessage: RegularMessageType,
source: string,
messageHash: string
) {
@ -285,7 +315,7 @@ async function handleExpirationTimerUpdate(
window?.log?.info("Update conversation 'expireTimer'", {
id: conversation.idForLogging(),
expireTimer,
source: 'handleDataMessage',
source: 'handleSwarmDataMessage',
});
await conversation.updateExpireTimer(expireTimer, source, message.get('received_at'));
@ -294,21 +324,21 @@ async function handleExpirationTimerUpdate(
export async function handleMessageJob(
messageModel: MessageModel,
conversation: ConversationModel,
rawDataMessage: SignalService.DataMessage,
regularDataMessage: RegularMessageType,
confirm: () => void,
source: string,
messageHash: string
) {
window?.log?.info(
`Starting handleDataMessage for message ${messageModel.idForLogging()}, ${messageModel.get(
`Starting handleSwarmDataMessage for message ${messageModel.idForLogging()}, ${messageModel.get(
'serverTimestamp'
) || messageModel.get('timestamp')} in conversation ${conversation.idForLogging()}`
);
try {
messageModel.set({ flags: rawDataMessage.flags });
messageModel.set({ flags: regularDataMessage.flags });
if (messageModel.isExpirationTimerUpdate()) {
const { expireTimer } = rawDataMessage;
const { expireTimer } = regularDataMessage;
const oldValue = conversation.get('expireTimer');
if (expireTimer === oldValue) {
confirm?.();
@ -319,7 +349,13 @@ export async function handleMessageJob(
}
await handleExpirationTimerUpdate(conversation, messageModel, source, expireTimer);
} else {
await handleRegularMessage(conversation, messageModel, rawDataMessage, source, messageHash);
await handleRegularMessage(
conversation,
messageModel,
regularDataMessage,
source,
messageHash
);
}
const id = await messageModel.commit();
@ -359,7 +395,11 @@ export async function handleMessageJob(
await messageModel.markRead(Date.now());
}
} catch (error) {
window?.log?.warn('handleDataMessage: Message', messageModel.idForLogging(), 'was deleted');
window?.log?.warn(
'handleSwarmDataMessage: Message',
messageModel.idForLogging(),
'was deleted'
);
}
// this updates the redux store.
@ -380,7 +420,12 @@ export async function handleMessageJob(
}
} catch (error) {
const errorForLog = error && error.stack ? error.stack : error;
window?.log?.error('handleDataMessage', messageModel.idForLogging(), 'error:', errorForLog);
window?.log?.error(
'handleSwarmDataMessage',
messageModel.idForLogging(),
'error:',
errorForLog
);
throw error;
}

@ -4,8 +4,8 @@ import { v4 as uuidv4 } from 'uuid';
import { addToCache, getAllFromCache, getAllFromCacheForSource, removeFromCache } from './cache';
// innerHandleContentMessage is only needed because of code duplication in handleDecryptedEnvelope...
import { handleContentMessage, innerHandleContentMessage } from './contentMessage';
// innerHandleSwarmContentMessage is only needed because of code duplication in handleDecryptedEnvelope...
import { handleSwarmContentMessage, innerHandleSwarmContentMessage } from './contentMessage';
import _ from 'lodash';
import { getEnvelopeId } from './common';
@ -23,9 +23,9 @@ interface ReqOptions {
const incomingMessagePromises: Array<Promise<any>> = [];
async function handleEnvelope(envelope: EnvelopePlus, messageHash: string) {
async function handleSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) {
if (envelope.content && envelope.content.length > 0) {
return handleContentMessage(envelope, messageHash);
return handleSwarmContentMessage(envelope, messageHash);
}
await removeFromCache(envelope);
@ -55,18 +55,18 @@ class EnvelopeQueue {
const envelopeQueue = new EnvelopeQueue();
function queueEnvelope(envelope: EnvelopePlus, messageHash: string) {
function queueSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) {
const id = getEnvelopeId(envelope);
// window?.log?.info('queueing envelope', id);
const task = handleEnvelope.bind(null, envelope, messageHash);
const taskWithTimeout = createTaskWithTimeout(task, `queueEnvelope ${id}`);
const task = handleSwarmEnvelope.bind(null, envelope, messageHash);
const taskWithTimeout = createTaskWithTimeout(task, `queueSwarmEnvelope ${id}`);
try {
envelopeQueue.add(taskWithTimeout);
} catch (error) {
window?.log?.error(
'queueEnvelope error handling envelope',
'queueSwarmEnvelope error handling envelope',
id,
':',
error && error.stack ? error.stack : error
@ -123,7 +123,7 @@ async function handleRequestDetail(
await lastPromise;
queueEnvelope(envelope, messageHash);
queueSwarmEnvelope(envelope, messageHash);
} catch (error) {
window?.log?.error(
'handleRequest error trying to add message to cache:',
@ -185,7 +185,7 @@ async function queueCached(item: any) {
queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash);
} else {
queueEnvelope(envelope, envelope.messageHash);
queueSwarmEnvelope(envelope, envelope.messageHash);
}
} catch (error) {
window?.log?.error(
@ -230,12 +230,8 @@ async function handleDecryptedEnvelope(
plaintext: ArrayBuffer,
messageHash: string
) {
// if (this.stoppingProcessing) {
// return Promise.resolve();
// }
if (envelope.content) {
await innerHandleContentMessage(envelope, plaintext, messageHash);
await innerHandleSwarmContentMessage(envelope, plaintext, messageHash);
} else {
await removeFromCache(envelope);
}

@ -66,7 +66,7 @@ export interface VisibleMessageParams extends MessageParams {
expireTimer?: number;
lokiProfile?: LokiProfile;
preview?: Array<PreviewWithAttachmentUrl>;
syncTarget?: string; // null means it is not a synced message
syncTarget?: string; // undefined means it is not a synced message
}
export class VisibleMessage extends DataMessage {

Loading…
Cancel
Save