cleanup creation of opengroup message on sync

pull/2142/head
Audric Ackermann 3 years ago
parent 5e314e4dcc
commit 5afbd9c19e
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -11,7 +11,7 @@ import {
ConversationTypeEnum,
} from '../models/conversation';
import { MessageCollection, MessageModel } from '../models/message';
import { MessageAttributes, MessageDirection } from '../models/messageType';
import { MessageAttributes } from '../models/messageType';
import { HexKeyPair } from '../receiver/keypairs';
import { getConversationController } from '../session/conversations';
import { getSodium } from '../session/crypto';
@ -1037,13 +1037,17 @@ export async function fillWithTestData(convs: number, msgs: number) {
for (let msgsAddedCount = 0; msgsAddedCount < msgs; msgsAddedCount++) {
// tslint:disable: insecure-random
const convoToChoose = newConvos[Math.floor(Math.random() * newConvos.length)];
await convoToChoose.addSingleMessage({
source: convoToChoose.id,
type: MessageDirection.outgoing,
conversationId: convoToChoose.id,
body: `spongebob ${new Date().toString()}`,
// tslint:disable: insecure-random
direction: Math.random() > 0.5 ? 'outgoing' : 'incoming',
});
const direction = Math.random() > 0.5 ? 'outgoing' : 'incoming';
const body = `spongebob ${new Date().toString()}`;
if (direction === 'outgoing') {
await convoToChoose.addSingleOutgoingMessage({
body,
});
} else {
await convoToChoose.addSingleIncomingMessage({
source: convoToChoose.id,
body,
});
}
}
}

@ -9,7 +9,7 @@ import { BlockedNumberController } from '../util';
import { leaveClosedGroup } from '../session/group/closed-group';
import { SignalService } from '../protobuf';
import { MessageModel } from './message';
import { MessageAttributesOptionals, MessageModelType } from './messageType';
import { MessageAttributesOptionals } from './messageType';
import autoBind from 'auto-bind';
import {
getLastMessagesByConversation,
@ -47,7 +47,7 @@ import { ed25519Str } from '../session/onions/onionPath';
import { getDecryptedMediaUrl } from '../session/crypto/DecryptedAttachmentsManager';
import { IMAGE_JPEG } from '../types/MIME';
import { forceSyncConfigurationNowIfNeeded } from '../session/utils/syncUtils';
import { getLatestTimestampOffset } from '../session/apis/snode_api/SNodeAPI';
import { getNowWithNetworkOffset } from '../session/apis/snode_api/SNodeAPI';
import { createLastMessageUpdate } from '../types/Conversation';
import {
ReplyingToMessageProps,
@ -701,10 +701,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
public async sendMessage(msg: SendMessageType) {
const { attachments, body, groupInvitation, preview, quote } = msg;
this.clearTypingTimers();
const destination = this.id;
const expireTimer = this.get('expireTimer');
const networkTimestamp = getNowWithNetworkOffset();
window?.log?.info(
@ -714,34 +711,16 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
networkTimestamp
);
const editedQuote = _.isEmpty(quote) ? undefined : quote;
const messageObject: MessageAttributesOptionals = {
type: 'outgoing',
const messageModel = await this.addSingleOutgoingMessage({
body,
conversationId: destination,
quote: editedQuote,
quote: _.isEmpty(quote) ? undefined : quote,
preview,
attachments,
sent_at: networkTimestamp,
received_at: networkTimestamp,
expireTimer,
isDeleted: false,
source: UserUtils.getOurPubKeyStrFromCache(),
};
if (this.isPublic()) {
// set the serverTimestamp only if this conversation is a public one.
messageObject.serverTimestamp = Date.now();
}
const attributes: MessageAttributesOptionals = {
...messageObject,
serverTimestamp: this.isPublic() ? Date.now() : undefined,
groupInvitation,
conversationId: this.id,
};
const messageModel = await this.addSingleMessage(attributes);
});
// We're offline!
if (!window.textsecure.messaging) {
@ -827,15 +806,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
this.set({ expireTimer });
const messageAttributes = {
// Even though this isn't reflected to the user, we want to place the last seen
// indicator above it. We set it to 'unread' to trigger that placement.
unread: isOutgoing ? 0 : 1,
conversationId: this.id,
source,
// No type; 'incoming' messages are specially treated by conversation.markRead()
sent_at: timestamp,
received_at: timestamp,
const commonAttributes = {
flags: SignalService.DataMessage.Flags.EXPIRATION_TIMER_UPDATE,
expirationTimerUpdate: {
expireTimer,
@ -843,11 +814,27 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
fromSync: options.fromSync,
},
expireTimer: 0,
type: isOutgoing ? 'outgoing' : ('incoming' as MessageModelType),
destination: this.id,
recipients: isOutgoing ? this.getRecipients() : undefined,
};
const message = await this.addSingleMessage(messageAttributes);
let message: MessageModel | undefined;
if (isOutgoing) {
message = await this.addSingleOutgoingMessage({
...commonAttributes,
unread: 0,
sent_at: timestamp,
});
} else {
message = await this.addSingleIncomingMessage({
...commonAttributes,
// Even though this isn't reflected to the user, we want to place the last seen
// indicator above it. We set it to 'unread' to trigger that placement.
unread: 1,
source,
sent_at: timestamp,
received_at: timestamp,
});
}
// tell the UI this conversation was updated
await this.commit();
@ -899,40 +886,39 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
perfEnd(`conversationCommit-${this.attributes.id}`, 'conversationCommit');
}
public async addSingleMessage(messageAttributes: MessageAttributesOptionals, setToExpire = true) {
const model = new MessageModel(messageAttributes);
const isMe = messageAttributes.source === UserUtils.getOurPubKeyStrFromCache();
if (
isMe &&
window.lokiFeatureFlags.useMessageRequests &&
window.inboxStore?.getState().userConfig.messageRequests
) {
await this.setIsApproved(true);
}
// no need to trigger a UI update now, we trigger a messagesAdded just below
const messageId = await model.commit(false);
model.set({ id: messageId });
if (setToExpire) {
await model.setToExpire();
}
window.inboxStore?.dispatch(
conversationActions.messagesAdded([
{
conversationKey: this.id,
messageModelProps: model.getMessageModelProps(),
},
])
public async addSingleOutgoingMessage(
messageAttributes: Omit<
MessageAttributesOptionals,
'conversationId' | 'source' | 'type' | 'direction' | 'received_at'
>,
setToExpire = true
) {
return this.addSingleMessage(
{
...messageAttributes,
conversationId: this.id,
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'outgoing',
direction: 'outgoing',
received_at: messageAttributes.sent_at, // make sure to set an received_at timestamp for an outgoing message, so the order are right.
},
setToExpire
);
const unreadCount = await this.getUnreadCount();
this.set({ unreadCount });
this.updateLastMessage();
}
await this.commit();
return model;
public async addSingleIncomingMessage(
messageAttributes: Omit<MessageAttributesOptionals, 'conversationId' | 'type' | 'direction'>,
setToExpire = true
) {
return this.addSingleMessage(
{
...messageAttributes,
conversationId: this.id,
type: 'incoming',
direction: 'outgoing',
},
setToExpire
);
}
public async leaveClosedGroup() {
@ -1484,6 +1470,45 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
}
}
private async addSingleMessage(
messageAttributes: MessageAttributesOptionals,
setToExpire = true
) {
const model = new MessageModel(messageAttributes);
const isMe = messageAttributes.source === UserUtils.getOurPubKeyStrFromCache();
if (
isMe &&
window.lokiFeatureFlags.useMessageRequests &&
window.inboxStore?.getState().userConfig.messageRequests
) {
await this.setIsApproved(true);
}
// no need to trigger a UI update now, we trigger a messagesAdded just below
const messageId = await model.commit(false);
model.set({ id: messageId });
if (setToExpire) {
await model.setToExpire();
}
window.inboxStore?.dispatch(
conversationActions.messagesAdded([
{
conversationKey: this.id,
messageModelProps: model.getMessageModelProps(),
},
])
);
const unreadCount = await this.getUnreadCount();
this.set({ unreadCount });
this.updateLastMessage();
await this.commit();
return model;
}
private async clearContactTypingTimer(_sender: string) {
if (!!this.typingTimer) {
global.clearTimeout(this.typingTimer);

@ -707,11 +707,10 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
// TODO: In the future it might be best if we cache the upload results if possible.
// This way we don't upload duplicated data.
const attachmentsWithData = await Promise.all(
const finalAttachments = await Promise.all(
(this.get('attachments') || []).map(loadAttachmentData)
);
const body = this.get('body');
const finalAttachments = attachmentsWithData;
const quoteWithData = await loadQuoteData(this.get('quote'));
const previewWithData = await loadPreviewData(this.get('preview'));

@ -0,0 +1,159 @@
import { UserUtils } from '../session/utils';
import { MessageModel } from './message';
import { MessageAttributesOptionals, MessageModelType } from './messageType';
export type MessageCreationData = {
timestamp: number;
receivedAt: number;
source: string;
isPublic: boolean;
serverId: number | null;
serverTimestamp: number | null;
groupId: string | null;
expirationStartTimestamp?: number;
destination: string;
messageHash: string;
};
function initIncomingMessage(data: MessageCreationData): MessageModel {
const {
timestamp,
isPublic,
receivedAt,
source,
serverId,
serverTimestamp,
messageHash,
groupId,
} = data;
const messageData: MessageAttributesOptionals = {
source,
serverId: serverId || undefined,
sent_at: timestamp,
serverTimestamp: serverTimestamp || undefined,
received_at: receivedAt || Date.now(),
conversationId: groupId ?? source,
type: 'incoming',
direction: 'incoming',
unread: 1,
isPublic,
messageHash: messageHash || undefined,
};
return new MessageModel(messageData);
}
/**
* This function can be called for either a sync message or a message synced through an opengroup poll.
* This does not save it to the db, just in memory
*/
function createMessageSentFromOurself({
timestamp,
serverTimestamp,
serverId,
isPublic,
receivedAt,
expirationStartTimestamp,
destination,
groupId,
messageHash,
}: {
timestamp: number;
receivedAt: number;
isPublic: boolean;
serverId: number | null;
serverTimestamp: number | null;
groupId: string | null;
expirationStartTimestamp: number | null;
destination: string;
messageHash: string;
}): MessageModel {
// Omit<
// MessageAttributesOptionals,
// 'conversationId' | 'source' | 'type' | 'direction' | 'received_at'
// >
const now = Date.now();
const messageData: MessageAttributesOptionals = {
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'outgoing' as MessageModelType,
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
sent_at: timestamp,
received_at: isPublic ? receivedAt : now,
isPublic,
conversationId: groupId ?? destination,
messageHash,
unread: 0,
sent_to: [],
sent: true,
expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now),
};
return new MessageModel(messageData);
}
/**
* This function is only called when we get a message from ourself from an opengroup polling event
*/
export function createPublicMessageSentFromUs({
serverTimestamp,
serverId,
conversationId,
}: {
serverId: number;
serverTimestamp: number;
conversationId: string;
}): MessageModel {
const messageData: MessageAttributesOptionals = {
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'outgoing' as MessageModelType,
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
sent_at: serverTimestamp,
received_at: serverTimestamp,
isPublic: true,
conversationId,
messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that
unread: 0,
sent_to: [],
sent: true,
expirationStartTimestamp: undefined,
};
return new MessageModel(messageData);
}
/**
* This function is only called by the Receiver when we get a message
* from someone else than ourself from an opengroup polling event
*/
export function createPublicMessageSentFromNotUs({
serverTimestamp,
serverId,
conversationId,
sender,
}: {
serverId: number;
sender: string;
serverTimestamp: number;
conversationId: string;
}): MessageModel {
const messageData: MessageAttributesOptionals = {
source: sender,
conversationId,
type: 'incoming' as MessageModelType,
serverTimestamp: serverTimestamp,
sent_at: serverTimestamp,
received_at: serverTimestamp,
serverId,
isPublic: true,
messageHash: '', // we do not care of a messageHash for an opengroup message. we have serverId for that
unread: 1,
expirationStartTimestamp: undefined,
};
return new MessageModel(messageData);
}

@ -1,10 +1,10 @@
import _ from 'lodash';
import { SignalService } from '../protobuf';
import { TTL_DEFAULT } from '../session/constants';
import { SNodeAPI } from '../session/apis/snode_api';
import { CallManager, UserUtils } from '../session/utils';
import { removeFromCache } from './cache';
import { EnvelopePlus } from './types';
import { getNowWithNetworkOffset } from '../session/apis/snode_api/SNodeAPI';
export async function handleCallMessage(
envelope: EnvelopePlus,
@ -12,7 +12,6 @@ export async function handleCallMessage(
) {
const sender = envelope.senderIdentity || envelope.source;
const currentOffset = SNodeAPI.getLatestTimestampOffset();
const sentTimestamp = _.toNumber(envelope.timestamp);
const { type } = callMessage;
@ -50,7 +49,7 @@ export async function handleCallMessage(
}
if (type === SignalService.CallMessage.Type.OFFER) {
if (Math.max(sentTimestamp - (Date.now() - currentOffset)) > TTL_DEFAULT.CALL_MESSAGE) {
if (Math.max(sentTimestamp - getNowWithNetworkOffset()) > TTL_DEFAULT.CALL_MESSAGE) {
window?.log?.info('Dropping incoming OFFER callMessage sent a while ago: ', sentTimestamp);
await removeFromCache(envelope);

@ -233,12 +233,12 @@ export async function handleNewClosedGroup(
await removeFromCache(envelope);
return;
}
const maybeConvo = getConversationController().get(groupId);
const groupConvo = getConversationController().get(groupId);
const expireTimer = groupUpdate.expireTimer;
if (maybeConvo) {
if (groupConvo) {
// if we did not left this group, just add the keypair we got if not already there
if (!maybeConvo.get('isKickedFromGroup') && !maybeConvo.get('left')) {
if (!groupConvo.get('isKickedFromGroup') && !groupConvo.get('left')) {
const ecKeyPairAlreadyExistingConvo = new ECKeyPair(
// tslint:disable: no-non-null-assertion
encryptionKeyPair!.publicKey,
@ -249,7 +249,7 @@ export async function handleNewClosedGroup(
ecKeyPairAlreadyExistingConvo.toHexKeyPair()
);
await maybeConvo.updateExpireTimer(expireTimer, sender, Date.now());
await groupConvo.updateExpireTimer(expireTimer, sender, Date.now());
if (isKeyPairAlreadyHere) {
window.log.info('Dropping already saved keypair for group', groupId);
@ -266,15 +266,18 @@ export async function handleNewClosedGroup(
}
// convo exists and we left or got kicked, enable typing and continue processing
// Enable typing:
maybeConvo.set('isKickedFromGroup', false);
maybeConvo.set('left', false);
maybeConvo.set('lastJoinedTimestamp', _.toNumber(envelope.timestamp));
// we just got readded. Consider the zombie list to have been cleared
maybeConvo.set('zombies', []);
groupConvo.set({
left: false,
isKickedFromGroup: false,
lastJoinedTimestamp: _.toNumber(envelope.timestamp),
// we just got readded. Consider the zombie list to have been cleared
zombies: [],
});
}
const convo =
maybeConvo ||
groupConvo ||
(await getConversationController().getOrCreateAndWait(groupId, ConversationTypeEnum.GROUP));
// ***** Creating a new group *****
window?.log?.info('Received a new ClosedGroup of id:', groupId);

@ -591,14 +591,9 @@ export async function handleDataExtractionNotification(
if (timestamp) {
const envelopeTimestamp = Lodash.toNumber(timestamp);
const referencedAttachmentTimestamp = Lodash.toNumber(referencedAttachment);
const now = Date.now();
await convo.addSingleMessage({
conversationId: convo.get('id'),
source,
type: 'outgoing', // mark it as outgoing just so it appears below our sent attachment
await convo.addSingleOutgoingMessage({
sent_at: envelopeTimestamp,
received_at: now,
dataExtractionNotification: {
type,
referencedAttachmentTimestamp, // currently unused

@ -11,7 +11,7 @@ import { StringUtils, UserUtils } from '../session/utils';
import { getConversationController } from '../session/conversations';
import { handleClosedGroupControlMessage } from './closedGroups';
import { MessageModel } from '../models/message';
import { MessageModelType } from '../models/messageType';
import { MessageAttributesOptionals, MessageModelType } from '../models/messageType';
import {
getMessageBySenderAndSentAt,
getMessageBySenderAndServerTimestamp,
@ -318,7 +318,6 @@ export async function handleDataMessage(
// Data messages for medium groups don't arrive as sync messages. Instead,
// linked devices poll for group messages independently, thus they need
// to recognise some of those messages at their own.
const messageEventType: 'sent' | 'message' = isMe ? 'sent' : 'message';
if (envelope.senderIdentity) {
message.group = {
@ -326,12 +325,6 @@ export async function handleDataMessage(
};
}
let groupId: string | null = null;
if (message.group?.id?.length) {
// remove the prefix from the source object so this is correct for all other
groupId = PubKey.removeTextSecurePrefixIfNeeded(toHex(message.group?.id));
}
const confirm = () => removeFromCache(envelope);
const data: MessageCreationData = {
@ -343,10 +336,12 @@ export async function handleDataMessage(
isPublic: false,
serverId: null,
serverTimestamp: null,
groupId,
groupId: message.group?.id?.length
? PubKey.removeTextSecurePrefixIfNeeded(toHex(message.group?.id))
: null,
};
await handleMessageEvent(messageEventType, data, message, confirm);
await handleMessageEvent(!isMe, data, message, confirm);
}
export type MessageId = {
@ -387,6 +382,31 @@ export async function isMessageDuplicate({ source, timestamp, serverTimestamp }:
}
}
export async function isOpengroupMessageDuplicate({
sender,
serverTimestamp,
}: {
sender: string;
serverTimestamp: number;
}) {
// serverTimestamp is only used for opengroupv2
try {
const result = await getMessageBySenderAndServerTimestamp({
source: sender,
serverTimestamp,
});
// if we have a result, it means a specific user sent two messages either with the same serverTimestamp.
// no need to do anything else, those messages must be the same
// Note: this test is not based on which conversation the user sent the message
// but we consider that a user sending two messages with the same serverTimestamp is unlikely
return Boolean(result);
} catch (error) {
window?.log?.error('isOpengroupMessageDuplicate error:', toLogFormat(error));
return false;
}
}
async function handleProfileUpdate(
profileKeyBuffer: Uint8Array,
convoId: string,
@ -418,14 +438,12 @@ export type MessageCreationData = {
serverId: number | null;
serverTimestamp: number | null;
groupId: string | null;
// Needed for synced outgoing messages
expirationStartTimestamp?: any; // ???
expirationStartTimestamp?: number;
destination: string;
messageHash: string;
};
export function initIncomingMessage(data: MessageCreationData): MessageModel {
function initIncomingMessage(data: MessageCreationData): MessageModel {
const {
timestamp,
isPublic,
@ -437,18 +455,18 @@ export function initIncomingMessage(data: MessageCreationData): MessageModel {
groupId,
} = data;
const messageData: any = {
const messageData: MessageAttributesOptionals = {
source,
serverId,
serverId: serverId || undefined,
sent_at: timestamp,
serverTimestamp,
serverTimestamp: serverTimestamp || undefined,
received_at: receivedAt || Date.now(),
conversationId: groupId ?? source,
type: 'incoming',
direction: 'incoming', // +
direction: 'incoming',
unread: 1,
isPublic,
messageHash: messageHash || null,
messageHash: messageHash || undefined,
};
return new MessageModel(messageData);
@ -475,7 +493,7 @@ function createSentMessage(data: MessageCreationData): MessageModel {
expirationStartTimestamp: Math.min(expirationStartTimestamp || data.timestamp || now, now),
};
const messageData = {
const messageData: MessageAttributesOptionals = {
source: UserUtils.getOurPubKeyStrFromCache(),
serverTimestamp: serverTimestamp || undefined,
serverId: serverId || undefined,
@ -501,13 +519,11 @@ export function createMessage(data: MessageCreationData, isIncoming: boolean): M
// tslint:disable:cyclomatic-complexity max-func-body-length */
async function handleMessageEvent(
messageEventType: 'sent' | 'message',
isIncoming: boolean,
messageCreationData: MessageCreationData,
rawDataMessage: SignalService.DataMessage,
confirm: () => void
): Promise<void> {
const isIncoming = messageEventType === 'message';
if (!messageCreationData || !rawDataMessage) {
window?.log?.warn('Invalid data passed to handleMessageEvent.', event);
confirm();
@ -576,14 +592,6 @@ async function handleMessageEvent(
confirm();
return;
}
await handleMessageJob(
msg,
conversation,
rawDataMessage,
ourNumber,
confirm,
source,
messageHash
);
await handleMessageJob(msg, conversation, rawDataMessage, confirm, source, messageHash);
});
}

@ -1,53 +0,0 @@
import { initIncomingMessage } from './dataMessage';
import { toNumber } from 'lodash';
import { getConversationController } from '../session/conversations';
import { ConversationTypeEnum } from '../models/conversation';
import { toLogFormat } from '../types/attachments/Errors';
import { messagesAdded } from '../state/ducks/conversations';
export async function onError(ev: any) {
const { error } = ev;
window?.log?.error('background onError:', toLogFormat(error));
if (ev.proto) {
const envelope = ev.proto;
const message = initIncomingMessage(envelope);
await message.saveErrors(error || new Error('Error was null'));
const id = message.get('conversationId');
const conversation = await getConversationController().getOrCreateAndWait(
id,
ConversationTypeEnum.PRIVATE
);
// force conversation unread count to be > 0 so it is highlighted
conversation.set({
active_at: Date.now(),
unreadCount: toNumber(conversation.get('unreadCount')) + 1,
});
const conversationActiveAt = conversation.get('active_at');
const messageTimestamp = message.get('timestamp') || 0;
if (!conversationActiveAt || messageTimestamp > conversationActiveAt) {
conversation.set({ active_at: message.get('sent_at') });
}
conversation.updateLastMessage();
await conversation.notify(message);
window.inboxStore?.dispatch(
messagesAdded([
{
conversationKey: conversation.id,
messageModelProps: message.getMessageModelProps(),
},
])
);
if (ev.confirm) {
ev.confirm();
}
await conversation.commit();
}
throw error;
}

@ -0,0 +1,88 @@
import { noop } from 'lodash';
import { ConversationTypeEnum } from '../models/conversation';
import {
createPublicMessageSentFromNotUs,
createPublicMessageSentFromUs,
} from '../models/messageFactory';
import { SignalService } from '../protobuf';
import { OpenGroupRequestCommonType } from '../session/apis/open_group_api/opengroupV2/ApiUtil';
import { OpenGroupMessageV2 } from '../session/apis/open_group_api/opengroupV2/OpenGroupMessageV2';
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
import { getConversationController } from '../session/conversations';
import { removeMessagePadding } from '../session/crypto/BufferPadding';
import { UserUtils } from '../session/utils';
import { fromBase64ToArray } from '../session/utils/String';
import { isOpengroupMessageDuplicate } from './dataMessage';
import { handleMessageJob } from './queuedJob';
export async function handleOpenGroupV2Message(
message: OpenGroupMessageV2,
roomInfos: OpenGroupRequestCommonType
) {
const { base64EncodedData, sentTimestamp, sender, serverId } = message;
const { serverUrl, roomId } = roomInfos;
if (!base64EncodedData || !sentTimestamp || !sender || !serverId) {
window?.log?.warn('Invalid data passed to handleOpenGroupV2Message.', message);
return;
}
// Note: opengroup messages are not padded
const dataUint = new Uint8Array(removeMessagePadding(fromBase64ToArray(base64EncodedData)));
const decoded = SignalService.Content.decode(dataUint);
const conversationId = getOpenGroupV2ConversationId(serverUrl, roomId);
if (!conversationId) {
window?.log?.error('We cannot handle a message without a conversationId');
return;
}
const idataMessage = decoded?.dataMessage;
if (!idataMessage) {
window?.log?.error('Invalid decoded opengroup message: no dataMessage');
return;
}
if (!getConversationController().get(conversationId)) {
window?.log?.error('Received a message for an unknown convo. Skipping');
return;
}
const conversation = await getConversationController().getOrCreateAndWait(
conversationId,
ConversationTypeEnum.GROUP
);
if (!conversation) {
window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId);
return;
}
void conversation.queueJob(async () => {
const isMe = UserUtils.isUsFromCache(sender);
const commonAttributes = { serverTimestamp: sentTimestamp, serverId, conversationId };
const attributesForNotUs = { ...commonAttributes, sender };
// those lines just create an empty message with some basic stuff set.
// the whole decoding of data is happening in handleMessageJob()
const msgModel = isMe
? createPublicMessageSentFromUs(commonAttributes)
: createPublicMessageSentFromNotUs(attributesForNotUs);
// WARNING this is important that the isOpengroupMessageDuplicate is made INSIDE the conversation.queueJob call
const isDuplicate = await isOpengroupMessageDuplicate(attributesForNotUs);
if (isDuplicate) {
window?.log?.info('Received duplicate opengroup message. Dropping it.');
return;
}
await handleMessageJob(
msgModel,
conversation,
decoded?.dataMessage as SignalService.DataMessage,
noop,
sender,
''
);
});
}

@ -10,6 +10,7 @@ import { getMessageById, getMessagesBySentAt } from '../../ts/data/data';
import { MessageModelPropsWithoutConvoProps, messagesAdded } from '../state/ducks/conversations';
import { updateProfileOneAtATime } from './dataMessage';
import { SignalService } from '../protobuf';
import { UserUtils } from '../session/utils';
function contentTypeSupported(type: string): boolean {
const Chrome = window.Signal.Util.GoogleChrome;
@ -182,7 +183,6 @@ async function handleRegularMessage(
message: MessageModel,
rawDataMessage: SignalService.DataMessage,
source: string,
ourNumber: string,
messageHash: string
) {
const type = message.get('type');
@ -215,7 +215,7 @@ async function handleRegularMessage(
// Expire timer updates are now explicit.
// We don't handle an expire timer from a incoming message except if it is an ExpireTimerUpdate message.
const ourPubKey = PubKey.cast(ourNumber);
const ourPubKey = UserUtils.getOurPubKeyFromCache();
handleMentions(message, conversation, ourPubKey);
@ -295,7 +295,6 @@ export async function handleMessageJob(
messageModel: MessageModel,
conversation: ConversationModel,
rawDataMessage: SignalService.DataMessage,
ourNumber: string,
confirm: () => void,
source: string,
messageHash: string
@ -320,14 +319,7 @@ export async function handleMessageJob(
}
await handleExpirationTimerUpdate(conversation, messageModel, source, expireTimer);
} else {
await handleRegularMessage(
conversation,
messageModel,
rawDataMessage,
source,
ourNumber,
messageHash
);
await handleRegularMessage(conversation, messageModel, rawDataMessage, source, messageHash);
}
const id = await messageModel.commit();

@ -3,29 +3,15 @@ export { downloadAttachment } from './attachments';
import { v4 as uuidv4 } from 'uuid';
import { addToCache, getAllFromCache, getAllFromCacheForSource, removeFromCache } from './cache';
import { processMessage } from '../session/apis/snode_api/swarmPolling';
import { onError } from './errors';
// innerHandleContentMessage is only needed because of code duplication in handleDecryptedEnvelope...
import { handleContentMessage, innerHandleContentMessage } from './contentMessage';
import _, { noop } from 'lodash';
export { processMessage };
import { createMessage, isMessageDuplicate, MessageCreationData } from './dataMessage';
import _ from 'lodash';
import { getEnvelopeId } from './common';
import { StringUtils, UserUtils } from '../session/utils';
import { SignalService } from '../protobuf';
import { getConversationController } from '../session/conversations';
import { removeUnprocessed } from '../data/data';
import { ConversationTypeEnum } from '../models/conversation';
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
import { OpenGroupMessageV2 } from '../session/apis/open_group_api/opengroupV2/OpenGroupMessageV2';
import { OpenGroupRequestCommonType } from '../session/apis/open_group_api/opengroupV2/ApiUtil';
import { handleMessageJob } from './queuedJob';
import { fromBase64ToArray } from '../session/utils/String';
import { removeMessagePadding } from '../session/crypto/BufferPadding';
import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout';
import { perfEnd, perfStart } from '../session/utils/Performance';
@ -146,16 +132,12 @@ async function handleRequestDetail(
}
}
export function handleRequest(body: any, options: ReqOptions, messageHash: string): void {
export function handleRequest(plaintext: any, options: ReqOptions, messageHash: string): void {
// tslint:disable-next-line no-promise-as-boolean
const lastPromise = _.last(incomingMessagePromises) || Promise.resolve();
const plaintext = body;
const promise = handleRequestDetail(plaintext, options, lastPromise, messageHash).catch(e => {
window?.log?.error('Error handling incoming message:', e && e.stack ? e.stack : e);
void onError(e);
});
incomingMessagePromises.push(promise);
@ -258,89 +240,3 @@ async function handleDecryptedEnvelope(
await removeFromCache(envelope);
}
}
export async function handleOpenGroupV2Message(
message: OpenGroupMessageV2,
roomInfos: OpenGroupRequestCommonType
) {
const { base64EncodedData, sentTimestamp, sender, serverId } = message;
const { serverUrl, roomId } = roomInfos;
if (!base64EncodedData || !sentTimestamp || !sender || !serverId) {
window?.log?.warn('Invalid data passed to handleOpenGroupV2Message.', message);
return;
}
// Note: opengroup messages are not padded
const dataUint = new Uint8Array(removeMessagePadding(fromBase64ToArray(base64EncodedData)));
const decoded = SignalService.Content.decode(dataUint);
const conversationId = getOpenGroupV2ConversationId(serverUrl, roomId);
if (!conversationId) {
window?.log?.error('We cannot handle a message without a conversationId');
return;
}
const idataMessage = decoded?.dataMessage;
if (!idataMessage) {
window?.log?.error('Invalid decoded opengroup message: no dataMessage');
return;
}
if (!getConversationController().get(conversationId)) {
window?.log?.error('Received a message for an unknown convo. Skipping');
return;
}
// if the message is `sent` (from secondary device) we have to set the sender manually... (at least for now)
// source = source || msg.get('source');
const conversation = await getConversationController().getOrCreateAndWait(
conversationId,
ConversationTypeEnum.GROUP
);
if (!conversation) {
window?.log?.warn('Skipping handleJob for unknown convo: ', conversationId);
return;
}
void conversation.queueJob(async () => {
const isMe = UserUtils.isUsFromCache(sender);
// for an opengroupv2 incoming message the serverTimestamp and the timestamp
const messageCreationData: MessageCreationData = {
isPublic: true,
serverId,
serverTimestamp: sentTimestamp,
receivedAt: Date.now(),
destination: conversationId,
timestamp: sentTimestamp,
expirationStartTimestamp: undefined,
source: sender,
groupId: null,
messageHash: '', // we do not care of a hash for an opengroup message
};
// WARNING this is important that the isMessageDuplicate is made in the conversation.queueJob
const isDuplicate = await isMessageDuplicate({ ...messageCreationData });
if (isDuplicate) {
window?.log?.info('Received duplicate message. Dropping it.');
return;
}
// this line just create an empty message with some basic stuff set.
// the whole decoding of data is happening in handleMessageJob()
const msg = createMessage(messageCreationData, !isMe);
const ourNumber = UserUtils.getOurPubKeyStrFromCache();
await handleMessageJob(
msg,
conversation,
decoded?.dataMessage as SignalService.DataMessage,
ourNumber,
noop,
sender,
''
);
});
}

@ -16,12 +16,12 @@ import { ConversationModel } from '../../../../models/conversation';
import { getMessageIdsFromServerIds, removeMessage } from '../../../../data/data';
import { getV2OpenGroupRoom, saveV2OpenGroupRoom } from '../../../../data/opengroups';
import { OpenGroupMessageV2 } from './OpenGroupMessageV2';
import { handleOpenGroupV2Message } from '../../../../receiver/receiver';
import autoBind from 'auto-bind';
import { sha256 } from '../../../crypto';
import { DURATION } from '../../../constants';
import { processNewAttachment } from '../../../../types/MessageAttachment';
import { MIME } from '../../../../types';
import { handleOpenGroupV2Message } from '../../../../receiver/opengroup';
const pollForEverythingInterval = DURATION.SECONDS * 10;
const pollForRoomAvatarInterval = DURATION.DAYS * 1;

@ -53,6 +53,10 @@ export function getLatestTimestampOffset() {
return latestTimestampOffset;
}
export function getNowWithNetworkOffset() {
return Date.now() - getLatestTimestampOffset();
}
export type SendParams = {
pubKey: string;
ttl: string;

@ -28,7 +28,7 @@ import { ClosedGroupNameChangeMessage } from '../messages/outgoing/controlMessag
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage';
import { getSwarmPollingInstance } from '../apis/snode_api';
import { getLatestTimestampOffset } from '../apis/snode_api/SNodeAPI';
import { getNowWithNetworkOffset } from '../apis/snode_api/SNodeAPI';
export type GroupInfo = {
id: string;
@ -156,18 +156,10 @@ export async function addUpdateMessage(
groupUpdate.kicked = diff.kickedMembers;
}
const now = Date.now();
const unread = type === 'incoming';
const source = UserUtils.getOurPubKeyStrFromCache();
const message = await convo.addSingleMessage({
conversationId: convo.get('id'),
source,
type,
const message = await convo.addSingleOutgoingMessage({
sent_at: sentAt,
received_at: now,
group_update: groupUpdate,
unread: unread ? 1 : 0,
expireTimer: 0,
@ -278,7 +270,6 @@ export async function leaveClosedGroup(groupId: string) {
const ourNumber = UserUtils.getOurPubKeyFromCache();
const isCurrentUserAdmin = convo.get('groupAdmins')?.includes(ourNumber.key);
const now = Date.now();
let members: Array<string> = [];
let admins: Array<string> = [];
@ -299,20 +290,16 @@ export async function leaveClosedGroup(groupId: string) {
await convo.commit();
const source = UserUtils.getOurPubKeyStrFromCache();
const diffTimestamp = Date.now() - getLatestTimestampOffset();
const networkTimestamp = getNowWithNetworkOffset();
const dbMessage = await convo.addSingleMessage({
const dbMessage = await convo.addSingleOutgoingMessage({
group_update: { left: [source] },
conversationId: groupId,
source,
type: 'outgoing',
sent_at: diffTimestamp,
received_at: now,
sent_at: networkTimestamp,
expireTimer: 0,
});
// Send the update to the group
const ourLeavingMessage = new ClosedGroupMemberLeftMessage({
timestamp: Date.now(),
timestamp: networkTimestamp,
groupId,
identifier: dbMessage.id as string,
});

@ -13,12 +13,11 @@ import { fromUInt8ArrayToBase64 } from '../utils/String';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
import { addMessagePadding } from '../crypto/BufferPadding';
import _ from 'lodash';
import { storeOnNode } from '../apis/snode_api/SNodeAPI';
import { getNowWithNetworkOffset, storeOnNode } from '../apis/snode_api/SNodeAPI';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import { firstTrue } from '../utils/Promise';
import { MessageSender } from '.';
import { getMessageById } from '../../../ts/data/data';
import { SNodeAPI } from '../apis/snode_api';
import { getConversationController } from '../conversations';
import { ed25519Str } from '../onions/onionPath';
@ -27,7 +26,7 @@ const DEFAULT_CONNECTIONS = 1;
// ================ SNODE STORE ================
function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) {
const diffTimestamp = Date.now() - SNodeAPI.getLatestTimestampOffset();
const networkTimestamp = getNowWithNetworkOffset();
const { plainTextBuffer } = message;
const contentDecoded = SignalService.Content.decode(plainTextBuffer);
@ -37,23 +36,23 @@ function overwriteOutgoingTimestampWithNetworkTimestamp(message: RawMessage) {
if (dataMessage.syncTarget) {
return {
overRiddenTimestampBuffer: plainTextBuffer,
diffTimestamp: _.toNumber(dataMessage.timestamp),
networkTimestamp: _.toNumber(dataMessage.timestamp),
};
}
dataMessage.timestamp = diffTimestamp;
dataMessage.timestamp = networkTimestamp;
}
if (
dataExtractionNotification &&
dataExtractionNotification.timestamp &&
dataExtractionNotification.timestamp > 0
) {
dataExtractionNotification.timestamp = diffTimestamp;
dataExtractionNotification.timestamp = networkTimestamp;
}
if (typingMessage && typingMessage.timestamp && typingMessage.timestamp > 0) {
typingMessage.timestamp = diffTimestamp;
typingMessage.timestamp = networkTimestamp;
}
const overRiddenTimestampBuffer = SignalService.Content.encode(contentDecoded).finish();
return { overRiddenTimestampBuffer, diffTimestamp };
return { overRiddenTimestampBuffer, networkTimestamp };
}
export function getMinRetryTimeout() {
@ -79,7 +78,7 @@ export async function send(
const {
overRiddenTimestampBuffer,
diffTimestamp,
networkTimestamp,
} = overwriteOutgoingTimestampWithNetworkTimestamp(message);
const { envelopeType, cipherText } = await MessageEncrypter.encrypt(
@ -88,7 +87,7 @@ export async function send(
encryption
);
const envelope = await buildEnvelope(envelopeType, device.key, diffTimestamp, cipherText);
const envelope = await buildEnvelope(envelopeType, device.key, networkTimestamp, cipherText);
const data = wrapEnvelope(envelope);
// make sure to update the local sent_at timestamp, because sometimes, we will get the just pushed message in the receiver side
@ -98,18 +97,18 @@ export async function send(
// make sure to not update the sent timestamp if this a currently syncing message
if (found && !found.get('sentSync')) {
found.set({ sent_at: diffTimestamp });
found.set({ sent_at: networkTimestamp });
await found.commit();
}
await MessageSender.TEST_sendMessageToSnode(
device.key,
data,
ttl,
diffTimestamp,
networkTimestamp,
isSyncMessage,
message.identifier
);
return { wrappedEnvelope: data, effectiveTimestamp: diffTimestamp };
return { wrappedEnvelope: data, effectiveTimestamp: networkTimestamp };
},
{
retries: Math.max(attempts - 1, 0),

@ -1,6 +1,5 @@
import _ from 'lodash';
import { MessageUtils, ToastUtils, UserUtils } from '../';
import { MessageModelType } from '../../../models/messageType';
import { SignalService } from '../../../protobuf';
import { openConversationWithMessages } from '../../../state/ducks/conversations';
import {
@ -26,6 +25,7 @@ import { DURATION } from '../../constants';
import { hasConversationOutgoingMessage } from '../../../data/data';
import { getCallMediaPermissionsSettings } from '../../../components/settings/SessionSettings';
import { PnServer } from '../../apis/push_notification_api';
import { getNowWithNetworkOffset } from '../../apis/snode_api/SNodeAPI';
// tslint:disable: function-name
@ -491,14 +491,10 @@ export async function USER_callRecipient(recipient: string) {
window.log.info('Sending preOffer message to ', ed25519Str(recipient));
const calledConvo = getConversationController().get(recipient);
calledConvo.set('active_at', Date.now()); // addSingleMessage does the commit for us on the convo
calledConvo.set('active_at', Date.now()); // addSingleOutgoingMessage does the commit for us on the convo
await calledConvo?.addSingleMessage({
conversationId: calledConvo.id,
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'outgoing',
await calledConvo?.addSingleOutgoingMessage({
sent_at: now,
received_at: now,
expireTimer: 0,
callNotificationType: 'started-call',
unread: 0,
@ -820,15 +816,13 @@ export async function USER_acceptIncomingCallRequest(fromSender: string) {
await peerConnection.addIceCandidate(candicate);
}
}
const now = Date.now();
const networkTimestamp = getNowWithNetworkOffset();
const callerConvo = getConversationController().get(fromSender);
callerConvo.set('active_at', now);
await callerConvo?.addSingleMessage({
conversationId: callerConvo.id,
callerConvo.set('active_at', networkTimestamp);
await callerConvo?.addSingleIncomingMessage({
source: UserUtils.getOurPubKeyStrFromCache(),
type: 'incoming',
sent_at: now,
received_at: now,
sent_at: networkTimestamp,
received_at: networkTimestamp,
expireTimer: 0,
callNotificationType: 'answered-a-call',
unread: 0,
@ -1134,15 +1128,13 @@ async function addMissedCallMessage(callerPubkey: string, sentAt: number) {
const incomingCallConversation = getConversationController().get(callerPubkey);
if (incomingCallConversation.isActive()) {
incomingCallConversation.set('active_at', Date.now());
incomingCallConversation.set('active_at', getNowWithNetworkOffset());
}
await incomingCallConversation?.addSingleMessage({
conversationId: callerPubkey,
await incomingCallConversation?.addSingleIncomingMessage({
source: callerPubkey,
type: 'incoming' as MessageModelType,
sent_at: sentAt,
received_at: Date.now(),
received_at: getNowWithNetworkOffset(),
expireTimer: 0,
callNotificationType: 'missed-call',
unread: 1,

Loading…
Cancel
Save