From a972c328c737c0f61ca41790c55e7a6417a992ad Mon Sep 17 00:00:00 2001 From: Mikunj Date: Tue, 30 Jun 2020 13:29:21 +1000 Subject: [PATCH] Change how sync messages are handled --- js/models/conversations.js | 38 ++++++++-- ts/session/instance.ts | 4 +- .../content/receipt/ReceiptMessage.ts | 2 +- .../outgoing/content/sync/SentSyncMessage.ts | 6 +- .../outgoing/content/sync/SyncReadMessage.ts | 9 ++- ts/session/sending/MessageQueue.ts | 76 ++++++++----------- ts/session/sending/MessageQueueInterface.ts | 2 +- ts/session/utils/SyncMessageUtils.ts | 63 +++++++++++++-- ts/test/session/sending/MessageQueue_test.ts | 76 ++++++++----------- 9 files changed, 164 insertions(+), 112 deletions(-) diff --git a/js/models/conversations.js b/js/models/conversations.js index 92f2ac674..7b9908439 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -1856,10 +1856,7 @@ const groupUpdateMessage = new libsession.Messages.Outgoing.ClosedGroupUpdateMessage( updateParams ); - libsession - .getMessageQueue() - .sendToGroup(groupUpdateMessage) - .catch(log.error); + await this.sendClosedGroupMessage(groupUpdateMessage); }, sendGroupInfo(recipient) { @@ -1927,12 +1924,43 @@ quitGroup ); - await libsession.getMessageQueue().sendToGroup(quitGroupMessage); + await this.sendClosedGroupMessage(quitGroupMessage); this.updateTextInputState(); } }, + async sendClosedGroupMessage(message) { + const { ClosedGroupMessage, ClosedGroupChatMessage } = libsession.Messages.Outgoing; + if ( + !(message instanceof ClosedGroupMessage) + ) { + throw new Error('Invalid closed group message.'); + } + + // Sync messages for Chat Messages need to be constructed after confirming send was successful. + if ( + message instanceof ClosedGroupChatMessage + ) { + throw new Error( + 'ClosedGroupChatMessage should be constructed manually and sent' + ); + } + + try { + await libsession.getMessageQueue().sendToGroup(message); + + const syncMessage = libsession.Utils.SyncMessageUtils.fromClosedGroupMessage( + message + ); + if (syncMessage) { + await libsession.getMessageQueue().sendSyncMessage(syncMessage); + } + } catch (e) { + window.log.error(e); + } + }, + async markRead(newestUnreadDate, providedOptions) { const options = providedOptions || {}; _.defaults(options, { sendReadReceipts: true }); diff --git a/ts/session/instance.ts b/ts/session/instance.ts index 385ead7c8..15f35652b 100644 --- a/ts/session/instance.ts +++ b/ts/session/instance.ts @@ -1,8 +1,8 @@ -import { MessageQueue } from './sending/'; +import { MessageQueue, MessageQueueInterface } from './sending/'; let messageQueue: MessageQueue; -function getMessageQueue() { +function getMessageQueue(): MessageQueueInterface { if (!messageQueue) { messageQueue = new MessageQueue(); } diff --git a/ts/session/messages/outgoing/content/receipt/ReceiptMessage.ts b/ts/session/messages/outgoing/content/receipt/ReceiptMessage.ts index e6f7b34ab..ccef56c38 100644 --- a/ts/session/messages/outgoing/content/receipt/ReceiptMessage.ts +++ b/ts/session/messages/outgoing/content/receipt/ReceiptMessage.ts @@ -6,7 +6,7 @@ interface ReceiptMessageParams extends MessageParams { timestamps: Array; } export abstract class ReceiptMessage extends ContentMessage { - private readonly timestamps: Array; + public readonly timestamps: Array; constructor({ timestamp, identifier, timestamps }: ReceiptMessageParams) { super({ timestamp, identifier }); diff --git a/ts/session/messages/outgoing/content/sync/SentSyncMessage.ts b/ts/session/messages/outgoing/content/sync/SentSyncMessage.ts index 3c7ad0685..a595b4ed7 100644 --- a/ts/session/messages/outgoing/content/sync/SentSyncMessage.ts +++ b/ts/session/messages/outgoing/content/sync/SentSyncMessage.ts @@ -4,15 +4,15 @@ import { MessageParams } from '../../Message'; import { PubKey } from '../../../../types'; interface SentSyncMessageParams extends MessageParams { - dataMessage: SignalService.DataMessage; + dataMessage: SignalService.IDataMessage; expirationStartTimestamp?: number; sentTo?: Array; unidentifiedDeliveries?: Array; destination?: PubKey; } -export abstract class SentSyncMessage extends SyncMessage { - public readonly dataMessage: SignalService.DataMessage; +export class SentSyncMessage extends SyncMessage { + public readonly dataMessage: SignalService.IDataMessage; public readonly expirationStartTimestamp?: number; public readonly sentTo?: Array; public readonly unidentifiedDeliveries?: Array; diff --git a/ts/session/messages/outgoing/content/sync/SyncReadMessage.ts b/ts/session/messages/outgoing/content/sync/SyncReadMessage.ts index 9ef669188..062b306aa 100644 --- a/ts/session/messages/outgoing/content/sync/SyncReadMessage.ts +++ b/ts/session/messages/outgoing/content/sync/SyncReadMessage.ts @@ -1,13 +1,14 @@ +import _ from 'lodash'; import { SyncMessage } from './SyncMessage'; import { SignalService } from '../../../../../protobuf'; import { MessageParams } from '../../Message'; interface SyncReadMessageParams extends MessageParams { - readMessages: any; + readMessages: Array<{ sender: string; timestamp: number }>; } -export abstract class SyncReadMessage extends SyncMessage { - public readonly readMessages: any; +export class SyncReadMessage extends SyncMessage { + public readonly readMessages: Array<{ sender: string; timestamp: number }>; constructor(params: SyncReadMessageParams) { super({ timestamp: params.timestamp, identifier: params.identifier }); @@ -19,7 +20,7 @@ export abstract class SyncReadMessage extends SyncMessage { syncMessage.read = []; for (const read of this.readMessages) { const readMessage = new SignalService.SyncMessage.Read(); - read.timestamp = readMessage.timestamp; + read.timestamp = _.toNumber(readMessage.timestamp); read.sender = readMessage.sender; syncMessage.read.push(readMessage); } diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index cbb2e817e..f8c089d14 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -13,12 +13,7 @@ import { TypingMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { - GroupUtils, - JobQueue, - SyncMessageUtils, - TypedEventEmitter, -} from '../utils'; +import { GroupUtils, JobQueue, TypedEventEmitter } from '../utils'; import { PubKey } from '../types'; import { MessageSender } from '.'; import { MultiDeviceProtocol, SessionProtocol } from '../protocols'; @@ -39,44 +34,20 @@ export class MessageQueue implements MessageQueueInterface { user: PubKey, message: ContentMessage ): Promise { - const userDevices = await MultiDeviceProtocol.getAllDevices(user.key); + if (message instanceof SyncMessage) { + return this.sendSyncMessage(message); + } + const userDevices = await MultiDeviceProtocol.getAllDevices(user.key); await this.sendMessageToDevices(userDevices, message); } public async send(device: PubKey, message: ContentMessage): Promise { - await this.sendMessageToDevices([device], message); - } - - public async sendMessageToDevices( - devices: Array, - message: ContentMessage - ) { - let currentDevices = [...devices]; - - // Sync to our devices if syncable - if (SyncMessageUtils.canSync(message)) { - const syncMessage = SyncMessageUtils.from(message); - if (!syncMessage) { - throw new Error( - 'MessageQueue internal error occured: failed to make sync message' - ); - } - - await this.sendSyncMessage(syncMessage); - - const ourDevices = await MultiDeviceProtocol.getOurDevices(); - // Remove our devices from currentDevices - currentDevices = currentDevices.filter( - device => !ourDevices.some(d => device.isEqual(d)) - ); + if (message instanceof SyncMessage) { + return this.sendSyncMessage(message); } - const promises = currentDevices.map(async device => { - await this.process(device, message); - }); - - return Promise.all(promises); + await this.sendMessageToDevices([device], message); } public async sendToGroup( @@ -120,7 +91,16 @@ export class MessageQueue implements MessageQueueInterface { } // Get devices in group - const recipients = await GroupUtils.getGroupMembers(groupId); + let recipients = await GroupUtils.getGroupMembers(groupId); + + // Don't send to our own device as they'll likely be synced across. + const ourKey = await UserUtil.getCurrentDevicePubKey(); + if (!ourKey) { + throw new Error('Cannot get current user public key'); + } + const ourPrimary = await MultiDeviceProtocol.getPrimaryDevice(ourKey); + recipients = recipients.filter(member => !ourPrimary.isEqual(member)); + if (recipients.length === 0) { return; } @@ -133,16 +113,15 @@ export class MessageQueue implements MessageQueueInterface { ); } - public async sendSyncMessage(message: SyncMessage | undefined): Promise { + public async sendSyncMessage( + message: SyncMessage | undefined + ): Promise { if (!message) { return; } const ourDevices = await MultiDeviceProtocol.getOurDevices(); - const promises = ourDevices.map(async device => - this.process(device, message) - ); - return Promise.all(promises); + await this.sendMessageToDevices(ourDevices, message); } public async processPending(device: PubKey) { @@ -179,6 +158,17 @@ export class MessageQueue implements MessageQueueInterface { }); } + public async sendMessageToDevices( + devices: Array, + message: ContentMessage + ) { + const promises = devices.map(async device => { + await this.process(device, message); + }); + + return Promise.all(promises); + } + private async processAllPending() { const devices = await this.pendingMessageCache.getDevices(); const promises = devices.map(async device => this.processPending(device)); diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index 2cd58354e..3a66616c5 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -20,5 +20,5 @@ export interface MessageQueueInterface { sendUsingMultiDevice(user: PubKey, message: ContentMessage): Promise; send(device: PubKey, message: ContentMessage): Promise; sendToGroup(message: GroupMessageType): Promise; - sendSyncMessage(message: SyncMessage | undefined): Promise; + sendSyncMessage(message: SyncMessage | undefined): Promise; } diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts index 1aa0a2c28..ced128015 100644 --- a/ts/session/utils/SyncMessageUtils.ts +++ b/ts/session/utils/SyncMessageUtils.ts @@ -1,25 +1,72 @@ import * as _ from 'lodash'; import { UserUtil } from '../../util/'; import { getAllConversations } from '../../../js/modules/data'; -import { ContentMessage, SyncMessage } from '../messages/outgoing'; +import { + ClosedGroupChatMessage, + ClosedGroupMessage, + ClosedGroupRequestInfoMessage, + ContentMessage, + ReadReceiptMessage, + SentSyncMessage, + SyncMessage, + SyncReadMessage, +} from '../messages/outgoing'; import { MultiDeviceProtocol } from '../protocols'; import ByteBuffer from 'bytebuffer'; +import { PubKey } from '../types'; +import { SignalService } from '../../protobuf'; -export function from(message: ContentMessage): SyncMessage | undefined { +export function from( + message: ContentMessage, + destination: string | PubKey +): SyncMessage | undefined { if (message instanceof SyncMessage) { return message; } - // Stubbed for now + if (message instanceof ClosedGroupMessage) { + return fromClosedGroupMessage(message); + } + + if (message instanceof ReadReceiptMessage) { + const pubKey = PubKey.cast(destination); + const read = message.timestamps.map(timestamp => ({ + sender: pubKey.key, + timestamp, + })); + + return new SyncReadMessage({ + timestamp: Date.now(), + readMessages: read, + }); + } + return undefined; } -export function canSync(message: ContentMessage): boolean { - // This function should be agnostic to the device; it shouldn't need - // to know about the recipient +export function fromClosedGroupMessage( + message: ClosedGroupMessage +): SyncMessage | undefined { + // Sync messages for ClosedGroupChatMessage need to be built manually + // This is because it needs the `expireStartTimestamp` field. + if ( + message instanceof ClosedGroupRequestInfoMessage || + message instanceof ClosedGroupChatMessage + ) { + return undefined; + } + + const pubKey = PubKey.cast(message.groupId); + const content = SignalService.Content.decode(message.plainTextBuffer()); + if (!content.dataMessage) { + return undefined; + } - // Stubbed for now - return Boolean(from(message)); + return new SentSyncMessage({ + timestamp: message.timestamp, + destination: pubKey, + dataMessage: content.dataMessage, + }); } export async function getSyncContacts(): Promise | undefined> { diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 3f1a4cb72..7c7241f9f 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -231,6 +231,18 @@ describe('MessageQueue', () => { expect(args[0]).to.have.same.members(devices); expect(args[1]).to.equal(message); }); + + it('should send sync message if it was passed in', async () => { + const devices = TestUtils.generateFakePubKeys(3); + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices); + const stub = sandbox.stub(messageQueueStub, 'sendSyncMessage').resolves(); + + const message = new TestSyncMessage({ timestamp: Date.now() }); + await messageQueueStub.sendUsingMultiDevice(devices[0], message); + + const args = stub.lastCall.args as [ContentMessage]; + expect(args[0]).to.equal(message); + }); }); describe('sendMessageToDevices', () => { @@ -243,51 +255,6 @@ describe('MessageQueue', () => { await messageQueueStub.sendMessageToDevices(devices, message); expect(pendingMessageCache.getCache()).to.have.length(devices.length); }); - - it('should send sync message if possible', async () => { - hasSessionStub.returns(false); - - sandbox.stub(SyncMessageUtils, 'canSync').returns(true); - - sandbox - .stub(SyncMessageUtils, 'from') - .returns(new TestSyncMessage({ timestamp: Date.now() })); - - // This stub ensures that the message won't process - const sendSyncMessageStub = sandbox - .stub(messageQueueStub, 'sendSyncMessage') - .resolves(); - - const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)]; - sandbox - .stub(MultiDeviceProtocol, 'getAllDevices') - .callsFake(async user => { - if (ourDevice.isEqual(user)) { - return ourDevices; - } - - return []; - }); - - const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)]; - const message = TestUtils.generateChatMessage(); - - await messageQueueStub.sendMessageToDevices(devices, message); - expect(sendSyncMessageStub.called).to.equal( - true, - 'sendSyncMessage was not called.' - ); - expect( - pendingMessageCache.getCache().map(c => c.device) - ).to.not.have.members( - ourDevices.map(d => d.key), - 'Sending regular messages to our own device is not allowed.' - ); - expect(pendingMessageCache.getCache()).to.have.length( - devices.length - ourDevices.length, - 'Messages should not be sent to our devices.' - ); - }); }); describe('sendSyncMessage', () => { @@ -320,6 +287,12 @@ describe('MessageQueue', () => { }); describe('closed groups', async () => { + beforeEach(() => { + sandbox + .stub(MultiDeviceProtocol, 'getPrimaryDevice') + .resolves(new PrimaryPubKey(ourNumber)); + }); + it('can send to closed group', async () => { const members = TestUtils.generateFakePubKeys(4).map( p => new PrimaryPubKey(p.key) @@ -351,6 +324,19 @@ describe('MessageQueue', () => { await messageQueueStub.sendToGroup(message); expect(sendUsingMultiDeviceStub.callCount).to.equal(0); }); + + it('wont send message to our device', async () => { + sandbox + .stub(GroupUtils, 'getGroupMembers') + .resolves([new PrimaryPubKey(ourNumber)]); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); + + const message = TestUtils.generateClosedGroupMessage(); + await messageQueueStub.sendToGroup(message); + expect(sendUsingMultiDeviceStub.callCount).to.equal(0); + }); }); describe('open groups', async () => {