diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 390825f4f..b9d4ffa32 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -408,11 +408,7 @@ class LokiSnodeAPI { }); } - // FIXME: need a lock because it is being called multiple times in parallel - async buildNewOnionPaths() { - // Note: this function may be called concurrently, so - // might consider blocking the other calls - + async buildNewOnionPathsWorker() { const _ = window.Lodash; log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths'); @@ -490,6 +486,14 @@ class LokiSnodeAPI { log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths); } + async buildNewOnionPaths() { + // this function may be called concurrently make sure we only have one inflight + return primitives.allowOnlyOneAtATime( + 'buildNewOnionPaths', + this.buildNewOnionPathsWorker + ); + } + async getRandomSnodeAddress() { // resolve random snode if (this.randomSnodePool.length === 0) { diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index 48bbd6908..4a4636bc7 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -650,9 +650,7 @@ export async function handleMessageEvent(event: any): Promise { confirm(); return; } - } - - if (source !== ourNumber) { + } else if (source !== ourNumber) { // Ignore auth from our devices conversationId = primarySource.key; } diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index b2c89d804..465dfb91a 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -8,6 +8,7 @@ import { ContentMessage, OpenGroupMessage, SessionRequestMessage, + SyncMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; import { @@ -26,9 +27,9 @@ export class MessageQueue implements MessageQueueInterface { private readonly jobQueues: Map = new Map(); private readonly pendingMessageCache: PendingMessageCache; - constructor() { + constructor(cache?: PendingMessageCache) { this.events = new EventEmitter(); - this.pendingMessageCache = new PendingMessageCache(); + this.pendingMessageCache = cache ?? new PendingMessageCache(); void this.processAllPending(); } @@ -50,20 +51,20 @@ export class MessageQueue implements MessageQueueInterface { // Sync to our devices if syncable if (SyncMessageUtils.canSync(message)) { - const currentDevice = await UserUtil.getCurrentDevicePubKey(); - - if (currentDevice) { - const ourDevices = await MultiDeviceProtocol.getAllDevices( - currentDevice + const syncMessage = SyncMessageUtils.from(message); + if (!syncMessage) { + throw new Error( + 'MessageQueue internal error occured: failed to make sync message' ); + } - await this.sendSyncMessage(message, ourDevices); + await this.sendSyncMessage(syncMessage); - // Remove our devices from currentDevices - currentDevices = currentDevices.filter(device => - ourDevices.some(d => device.isEqual(d)) - ); - } + const ourDevices = await MultiDeviceProtocol.getOurDevices(); + // Remove our devices from currentDevices + currentDevices = currentDevices.filter( + device => !ourDevices.some(d => device.isEqual(d)) + ); } const promises = currentDevices.map(async device => { @@ -79,30 +80,42 @@ export class MessageQueue implements MessageQueueInterface { // Closed groups if (message instanceof ClosedGroupMessage) { // Get devices in closed group - const groupPubKey = PubKey.from(message.groupId); - if (!groupPubKey) { + const recipients = await GroupUtils.getGroupMembers(message.groupId); + if (recipients.length === 0) { return false; } - const recipients = await GroupUtils.getGroupMembers(groupPubKey); + // Send to all devices of members + await Promise.all( + recipients.map(async recipient => + this.sendUsingMultiDevice(recipient, message) + ) + ); - if (recipients.length) { - await this.sendMessageToDevices(recipients, message); - - return true; - } + return true; } // Open groups if (message instanceof OpenGroupMessage) { // No queue needed for Open Groups; send directly + const error = new Error('Failed to send message to open group.'); + + // This is absolutely yucky ... we need to make it not use Promise try { - await MessageSender.sendToOpenGroup(message); - this.events.emit('success', message); + const result = await MessageSender.sendToOpenGroup(message); + if (result) { + this.events.emit('success', message); + } else { + this.events.emit('fail', message, error); + } - return true; + return result; } catch (e) { - this.events.emit('fail', message, e); + console.warn( + `Failed to send message to open group: ${message.group.server}`, + e + ); + this.events.emit('fail', message, error); return false; } @@ -111,21 +124,22 @@ export class MessageQueue implements MessageQueueInterface { return false; } - public async sendSyncMessage(message: ContentMessage, sendTo: Array) { - // Sync with our devices - const promises = sendTo.map(async device => { - const syncMessage = SyncMessageUtils.from(message); - - return this.process(device, syncMessage); - }); + public async sendSyncMessage(message: SyncMessage | undefined): Promise { + if (!message) { + return; + } + const ourDevices = await MultiDeviceProtocol.getOurDevices(); + const promises = ourDevices.map(async device => + this.process(device, message) + ); return Promise.all(promises); } public async processPending(device: PubKey) { - const messages = this.pendingMessageCache.getForDevice(device); + const messages = await this.pendingMessageCache.getForDevice(device); - const isMediumGroup = GroupUtils.isMediumGroup(device); + const isMediumGroup = GroupUtils.isMediumGroup(device.key); const hasSession = await SessionProtocol.hasSession(device); if (!isMediumGroup && !hasSession) { @@ -143,23 +157,28 @@ export class MessageQueue implements MessageQueueInterface { await jobQueue.addWithId(messageId, async () => MessageSender.send(message) ); - void this.pendingMessageCache.remove(message); this.events.emit('success', message); } catch (e) { this.events.emit('fail', message, e); + } finally { + // Remove from the cache because retrying is done in the sender + void this.pendingMessageCache.remove(message); } } }); } private async processAllPending() { - const devices = this.pendingMessageCache.getDevices(); + const devices = await this.pendingMessageCache.getDevices(); const promises = devices.map(async device => this.processPending(device)); return Promise.all(promises); } - private async process(device: PubKey, message?: ContentMessage) { + private async process( + device: PubKey, + message?: ContentMessage + ): Promise { // Don't send to ourselves const currentDevice = await UserUtil.getCurrentDevicePubKey(); if (!message || (currentDevice && device.isEqual(currentDevice))) { @@ -173,7 +192,7 @@ export class MessageQueue implements MessageQueueInterface { } await this.pendingMessageCache.add(device, message); - await this.processPending(device); + void this.processPending(device); } private getJobQueue(device: PubKey): JobQueue { diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index c3ee606aa..5bed428ca 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -2,6 +2,7 @@ import { ClosedGroupMessage, ContentMessage, OpenGroupMessage, + SyncMessage, } from '../messages/outgoing'; import { RawMessage } from '../types/RawMessage'; import { TypedEventEmitter } from '../utils'; @@ -19,8 +20,5 @@ export interface MessageQueueInterface { sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; - sendSyncMessage( - message: ContentMessage, - sendTo: Array - ): Promise>; + sendSyncMessage(message: SyncMessage | undefined): Promise; } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 0a4037c1c..a26cea8c6 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -1,3 +1,4 @@ +import _ from 'lodash'; import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; @@ -12,33 +13,25 @@ import { MessageUtils } from '../utils'; // memory and sync its state with the database on modification (add or remove). export class PendingMessageCache { - public readonly isReady: Promise; - private cache: Array; + protected loadPromise: Promise | undefined; + protected cache: Array = []; - constructor() { - // Load pending messages from the database - // You should await isReady on making a new PendingMessageCache - // if you'd like to have instant access to the cache - this.cache = []; - - this.isReady = new Promise(async resolve => { - await this.loadFromDB(); - resolve(true); - }); - } - - public getAllPending(): Array { + public async getAllPending(): Promise> { + await this.loadFromDBIfNeeded(); // Get all pending from cache, sorted with oldest first return [...this.cache].sort((a, b) => a.timestamp - b.timestamp); } - public getForDevice(device: PubKey): Array { - return this.getAllPending().filter(m => m.device === device.key); + public async getForDevice(device: PubKey): Promise> { + const pending = await this.getAllPending(); + return pending.filter(m => m.device === device.key); } - public getDevices(): Array { + public async getDevices(): Promise> { + await this.loadFromDBIfNeeded(); + // Gets all unique devices with pending messages - const pubkeyStrings = [...new Set(this.cache.map(m => m.device))]; + const pubkeyStrings = _.uniq(this.cache.map(m => m.device)); return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k); } @@ -47,6 +40,7 @@ export class PendingMessageCache { device: PubKey, message: ContentMessage ): Promise { + await this.loadFromDBIfNeeded(); const rawMessage = MessageUtils.toRawMessage(device, message); // Does it exist in cache already? @@ -63,6 +57,7 @@ export class PendingMessageCache { public async remove( message: RawMessage ): Promise | undefined> { + await this.loadFromDBIfNeeded(); // Should only be called after message is processed // Return if message doesn't exist in cache @@ -72,7 +67,11 @@ export class PendingMessageCache { // Remove item from cache and sync with database const updatedCache = this.cache.filter( - m => m.identifier !== message.identifier + cached => + !( + cached.device === message.device && + cached.timestamp === message.timestamp + ) ); this.cache = updatedCache; await this.saveToDB(); @@ -93,12 +92,20 @@ export class PendingMessageCache { await this.saveToDB(); } - private async loadFromDB() { + protected async loadFromDBIfNeeded() { + if (!this.loadPromise) { + this.loadPromise = this.loadFromDB(); + } + + await this.loadPromise; + } + + protected async loadFromDB() { const messages = await this.getFromStorage(); this.cache = messages; } - private async getFromStorage(): Promise> { + protected async getFromStorage(): Promise> { const data = await getItemById('pendingMessages'); if (!data || !data.value) { return []; @@ -117,7 +124,7 @@ export class PendingMessageCache { }); } - private async saveToDB() { + protected async saveToDB() { // For each plainTextBuffer in cache, save in as a simple Array to avoid // Node issues with JSON stringifying Buffer without strict typing const encodedCache = [...this.cache].map(item => { diff --git a/ts/session/utils/Groups.ts b/ts/session/utils/Groups.ts index d73104b63..c3c4cf1b9 100644 --- a/ts/session/utils/Groups.ts +++ b/ts/session/utils/Groups.ts @@ -1,7 +1,11 @@ -import { PubKey } from '../types'; +import _ from 'lodash'; +import { PrimaryPubKey } from '../types'; +import { MultiDeviceProtocol } from '../protocols'; -export async function getGroupMembers(groupId: PubKey): Promise> { - const groupConversation = window.ConversationController.get(groupId.key); +export async function getGroupMembers( + groupId: string +): Promise> { + const groupConversation = window.ConversationController.get(groupId); const groupMembers = groupConversation ? groupConversation.attributes.members : undefined; @@ -10,11 +14,16 @@ export async function getGroupMembers(groupId: PubKey): Promise> { return []; } - return groupMembers.map((member: string) => new PubKey(member)); + const promises = (groupMembers as Array).map(async (member: string) => + MultiDeviceProtocol.getPrimaryDevice(member) + ); + const primaryDevices = await Promise.all(promises); + + return _.uniqWith(primaryDevices, (a, b) => a.isEqual(b)); } -export function isMediumGroup(groupId: PubKey): boolean { - const conversation = window.ConversationController.get(groupId.key); +export function isMediumGroup(groupId: string): boolean { + const conversation = window.ConversationController.get(groupId); if (!conversation) { return false; diff --git a/ts/session/utils/Promise.ts b/ts/session/utils/Promise.ts new file mode 100644 index 000000000..b106f6032 --- /dev/null +++ b/ts/session/utils/Promise.ts @@ -0,0 +1,118 @@ +type SimpleFunction = (arg: T) => void; +type Return = Promise | T; + +async function toPromise(value: Return): Promise { + return value instanceof Promise ? value : Promise.resolve(value); +} + +/** + * Create a promise which waits until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param task The task to wait for. + * @param timeout The timeout period. + */ +export async function waitForTask( + task: (done: SimpleFunction) => Return, + timeout: number = 2000 +): Promise { + const timeoutPromise = new Promise((_, rej) => { + const wait = setTimeout(() => { + clearTimeout(wait); + rej(new Error('Task timed out.')); + }, timeout); + }); + + const taskPromise = new Promise(async (res, rej) => { + try { + await toPromise(task(res)); + } catch (e) { + rej(e); + } + }); + + return Promise.race([timeoutPromise, taskPromise]) as Promise; +} + +interface PollOptions { + timeout: number; + interval: number; +} + +/** + * Creates a promise which calls the `task` every `interval` until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The check which runs every `interval` ms. + * @param options The polling options. + */ +export async function poll( + task: (done: SimpleFunction) => Return, + options: Partial = {} +): Promise { + const defaults: PollOptions = { + timeout: 2000, + interval: 1000, + }; + + const { timeout, interval } = { + ...defaults, + ...options, + }; + + const endTime = Date.now() + timeout; + let stop = false; + const finish = () => { + stop = true; + }; + + const _poll = async (resolve: any, reject: any) => { + if (stop) { + resolve(); + } else if (Date.now() >= endTime) { + finish(); + reject(new Error('Periodic check timeout')); + } else { + try { + await toPromise(task(finish)); + } catch (e) { + finish(); + reject(e); + return; + } + + setTimeout(() => { + void _poll(resolve, reject); + }, interval); + } + }; + + return new Promise((resolve, reject) => { + void _poll(resolve, reject); + }); +} + +/** + * Creates a promise which waits until `check` returns `true` or rejects if `timeout` preiod is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The boolean check. + * @param timeout The time before an error is thrown. + */ +export async function waitUntil( + check: () => Return, + timeout: number = 2000 +) { + // This is causing unhandled promise rejection somewhere in MessageQueue tests + return poll( + async done => { + const result = await toPromise(check()); + if (result) { + done(); + } + }, + { + timeout, + } + ); +} diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts index afab775b0..601d3cb8b 100644 --- a/ts/session/utils/SyncMessageUtils.ts +++ b/ts/session/utils/SyncMessageUtils.ts @@ -5,7 +5,9 @@ import { ContentMessage, SyncMessage } from '../messages/outgoing'; import { MultiDeviceProtocol } from '../protocols'; export function from(message: ContentMessage): SyncMessage | undefined { - // const { timestamp, identifier } = message; + if (message instanceof SyncMessage) { + return message; + } // Stubbed for now return undefined; diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index c619b8d2f..dde73e928 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -2,8 +2,15 @@ import * as MessageUtils from './Messages'; import * as GroupUtils from './Groups'; import * as SyncMessageUtils from './SyncMessageUtils'; import * as StringUtils from './String'; +import * as PromiseUtils from './Promise'; export * from './TypedEmitter'; export * from './JobQueue'; -export { MessageUtils, SyncMessageUtils, GroupUtils, StringUtils }; +export { + MessageUtils, + SyncMessageUtils, + GroupUtils, + StringUtils, + PromiseUtils, +}; diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 746eb224f..c0d223ffc 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -1,96 +1,56 @@ -import { expect } from 'chai'; +import chai from 'chai'; import * as sinon from 'sinon'; -import * as _ from 'lodash'; -import { GroupUtils, SyncMessageUtils } from '../../../session/utils'; +import _ from 'lodash'; +import { + GroupUtils, + PromiseUtils, + SyncMessageUtils, +} from '../../../session/utils'; import { Stubs, TestUtils } from '../../../test/test-utils'; import { MessageQueue } from '../../../session/sending/MessageQueue'; import { - ChatMessage, ClosedGroupMessage, ContentMessage, OpenGroupMessage, } from '../../../session/messages/outgoing'; -import { PubKey, RawMessage } from '../../../session/types'; +import { PrimaryPubKey, PubKey, RawMessage } from '../../../session/types'; import { UserUtil } from '../../../util'; -import { MessageSender, PendingMessageCache } from '../../../session/sending'; -import { toRawMessage } from '../../../session/utils/Messages'; +import { MessageSender } from '../../../session/sending'; import { MultiDeviceProtocol, SessionProtocol, } from '../../../session/protocols'; +import { PendingMessageCacheStub } from '../../test-utils/stubs'; +import { describe } from 'mocha'; +import { TestSyncMessage } from '../../test-utils/stubs/messages/TestSyncMessage'; -// Equivalent to Data.StorageItem -interface StorageItem { - id: string; - value: any; -} - -// Helper function to force sequential on events checks -async function tick() { - return new Promise(resolve => { - // tslint:disable-next-line: no-string-based-set-timeout - setTimeout(resolve, 0); - }); -} +// tslint:disable-next-line: no-require-imports no-var-requires +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); + +const { expect } = chai; describe('MessageQueue', () => { // Initialize new stubbed cache - let data: StorageItem; const sandbox = sinon.createSandbox(); const ourDevice = TestUtils.generateFakePubKey(); const ourNumber = ourDevice.key; - const pairedDevices = TestUtils.generateFakePubKeys(2); // Initialize new stubbed queue + let pendingMessageCache: PendingMessageCacheStub; let messageQueueStub: MessageQueue; - // Spies - let sendMessageToDevicesSpy: sinon.SinonSpy< - [Array, ContentMessage], - Promise> - >; - let sendSyncMessageSpy: sinon.SinonSpy< - [ContentMessage, Array], - Promise> - >; - let sendToGroupSpy: sinon.SinonSpy< - [OpenGroupMessage | ClosedGroupMessage], - Promise - >; - // Message Sender Stubs let sendStub: sinon.SinonStub<[RawMessage, (number | undefined)?]>; - let sendToOpenGroupStub: sinon.SinonStub<[OpenGroupMessage]>; // Utils Stubs - let groupMembersStub: sinon.SinonStub; - let canSyncStub: sinon.SinonStub<[ContentMessage], boolean>; + let isMediumGroupStub: sinon.SinonStub<[string], boolean>; // Session Protocol Stubs let hasSessionStub: sinon.SinonStub<[PubKey]>; let sendSessionRequestIfNeededStub: sinon.SinonStub<[PubKey], Promise>; beforeEach(async () => { - // Stub out methods which touch the database - const storageID = 'pendingMessages'; - data = { - id: storageID, - value: '[]', - }; - - // Pending Message Cache Data Stubs - TestUtils.stubData('getItemById') - .withArgs('pendingMessages') - .resolves(data); - TestUtils.stubData('createOrUpdateItem').callsFake((item: StorageItem) => { - if (item.id === storageID) { - data = item; - } - }); - // Utils Stubs - canSyncStub = sandbox.stub(SyncMessageUtils, 'canSync'); - canSyncStub.returns(false); sandbox.stub(UserUtil, 'getCurrentDevicePubKey').resolves(ourNumber); - sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(pairedDevices); TestUtils.stubWindow('libsignal', { SignalProtocolAddress: sandbox.stub(), @@ -99,15 +59,11 @@ describe('MessageQueue', () => { // Message Sender Stubs sendStub = sandbox.stub(MessageSender, 'send').resolves(); - sendToOpenGroupStub = sandbox - .stub(MessageSender, 'sendToOpenGroup') - .resolves(true); // Group Utils Stubs - sandbox.stub(GroupUtils, 'isMediumGroup').returns(false); - groupMembersStub = sandbox - .stub(GroupUtils, 'getGroupMembers' as any) - .resolves(TestUtils.generateFakePubKeys(10)); + isMediumGroupStub = sandbox + .stub(GroupUtils, 'isMediumGroup') + .returns(false); // Session Protocol Stubs sandbox.stub(SessionProtocol, 'sendSessionRequest').resolves(); @@ -116,37 +72,9 @@ describe('MessageQueue', () => { .stub(SessionProtocol, 'sendSessionRequestIfNeeded') .resolves(); - // Pending Mesage Cache Stubs - const chatMessages = Array.from( - { length: 10 }, - TestUtils.generateChatMessage - ); - const rawMessage = toRawMessage( - TestUtils.generateFakePubKey(), - TestUtils.generateChatMessage() - ); - - sandbox.stub(PendingMessageCache.prototype, 'add').resolves(rawMessage); - sandbox.stub(PendingMessageCache.prototype, 'remove').resolves(); - sandbox - .stub(PendingMessageCache.prototype, 'getDevices') - .returns(TestUtils.generateFakePubKeys(10)); - sandbox - .stub(PendingMessageCache.prototype, 'getForDevice') - .returns( - chatMessages.map(m => toRawMessage(TestUtils.generateFakePubKey(), m)) - ); - - // Spies - sendSyncMessageSpy = sandbox.spy(MessageQueue.prototype, 'sendSyncMessage'); - sendMessageToDevicesSpy = sandbox.spy( - MessageQueue.prototype, - 'sendMessageToDevices' - ); - sendToGroupSpy = sandbox.spy(MessageQueue.prototype, 'sendToGroup'); - // Init Queue - messageQueueStub = new MessageQueue(); + pendingMessageCache = new PendingMessageCacheStub(); + messageQueueStub = new MessageQueue(pendingMessageCache); }); afterEach(() => { @@ -154,233 +82,314 @@ describe('MessageQueue', () => { sandbox.restore(); }); - describe('send', () => { - it('can send to a single device', async () => { - const device = TestUtils.generateFakePubKey(); - const message = TestUtils.generateChatMessage(); - - const promise = messageQueueStub.send(device, message); - await expect(promise).to.be.fulfilled; - }); - - it('can send sync message', async () => { - const devices = TestUtils.generateFakePubKeys(3); - const message = TestUtils.generateChatMessage(); - - const promise = messageQueueStub.sendSyncMessage(message, devices); - expect(promise).to.be.fulfilled; - }); - }); - describe('processPending', () => { it('will send session request message if no session', async () => { hasSessionStub.resolves(false); + isMediumGroupStub.returns(false); const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; - expect(sendSessionRequestIfNeededStub.callCount).to.equal(1); + await messageQueueStub.processPending(device); + + const stubCallPromise = PromiseUtils.waitUntil( + () => sendSessionRequestIfNeededStub.callCount === 1 + ); + await expect(stubCallPromise).to.be.fulfilled; }); it('will send message if session exists', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + sendStub.resolves(); + const device = TestUtils.generateFakePubKey(); - const hasSession = await hasSessionStub(device); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + const successPromise = PromiseUtils.waitForTask(done => { + messageQueueStub.events.once('success', done); + }); - expect(hasSession).to.equal(true, 'session does not exist'); - expect(sendSessionRequestIfNeededStub.callCount).to.equal(0); + await messageQueueStub.processPending(device); + await expect(successPromise).to.be.fulfilled; + expect(sendSessionRequestIfNeededStub.called).to.equal( + false, + 'Session request triggered when we have a session.' + ); }); - }); - describe('sendUsingMultiDevice', () => { - it('can send using multidevice', async () => { + it('will send message if sending to medium group', async () => { + isMediumGroupStub.returns(true); + sendStub.resolves(); + const device = TestUtils.generateFakePubKey(); - const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const promise = messageQueueStub.sendUsingMultiDevice(device, message); - await expect(promise).to.be.fulfilled; + const successPromise = PromiseUtils.waitForTask(done => { + messageQueueStub.events.once('success', done); + }); - // Ensure the arguments passed into sendMessageToDevices are correct - const previousArgs = sendMessageToDevicesSpy.lastCall.args as [ - Array, - ChatMessage - ]; + await messageQueueStub.processPending(device); + await expect(successPromise).to.be.fulfilled; + expect(sendSessionRequestIfNeededStub.called).to.equal( + false, + 'Session request triggered on medium group' + ); + }); - // Check that instances are equal - expect(previousArgs).to.have.length(2); + it('should remove message from cache', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + + const events = ['success', 'fail']; + for (const event of events) { + if (event === 'success') { + sendStub.resolves(); + } else { + sendStub.throws(new Error('fail')); + } + + const device = TestUtils.generateFakePubKey(); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); + + const initialMessages = await pendingMessageCache.getForDevice(device); + expect(initialMessages).to.have.length(1); + await messageQueueStub.processPending(device); + + const promise = PromiseUtils.waitUntil(async () => { + const messages = await pendingMessageCache.getForDevice(device); + return messages.length === 0; + }); + await expect(promise).to.be.fulfilled; + } + }).timeout(15000); - const argsPairedDevices = previousArgs[0]; - const argsChatMessage = previousArgs[1]; + describe('events', () => { + it('should send a success event if message was sent', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + sendStub.resolves(); - expect(argsChatMessage instanceof ChatMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a valid ChatMessage' - ); - expect(argsChatMessage.isEqual(message)).to.equal( - true, - 'message passed into sendMessageToDevices has been mutated' - ); + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, message); - argsPairedDevices.forEach((argsPaired: PubKey, index: number) => { - expect(argsPaired instanceof PubKey).to.equal( - true, - 'a device passed into sendMessageToDevices was not a PubKey' - ); - expect(argsPaired.isEqual(pairedDevices[index])).to.equal( - true, - 'a device passed into sendMessageToDevices did not match MessageDeviceProtocol.getAllDevices' - ); + const eventPromise = PromiseUtils.waitForTask< + RawMessage | OpenGroupMessage + >(complete => { + messageQueueStub.events.once('success', complete); + }); + + await messageQueueStub.processPending(device); + await expect(eventPromise).to.be.fulfilled; + + const rawMessage = await eventPromise; + expect(rawMessage.identifier).to.equal(message.identifier); + }); + + it('should send a fail event if something went wrong while sending', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + sendStub.throws(new Error('failure')); + + const spy = sandbox.spy(); + messageQueueStub.events.on('fail', spy); + + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, message); + + const eventPromise = PromiseUtils.waitForTask< + [RawMessage | OpenGroupMessage, Error] + >(complete => { + messageQueueStub.events.once('fail', (...args) => { + complete(args); + }); + }); + + await messageQueueStub.processPending(device); + await expect(eventPromise).to.be.fulfilled; + + const [rawMessage, error] = await eventPromise; + expect(rawMessage.identifier).to.equal(message.identifier); + expect(error.message).to.equal('failure'); }); }); }); + describe('sendUsingMultiDevice', () => { + it('should send the message to all the devices', async () => { + const devices = TestUtils.generateFakePubKeys(3); + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices); + const stub = sandbox + .stub(messageQueueStub, 'sendMessageToDevices') + .resolves(); + + const message = TestUtils.generateChatMessage(); + await messageQueueStub.sendUsingMultiDevice(devices[0], message); + + const args = stub.lastCall.args as [Array, ContentMessage]; + expect(args[0]).to.have.same.members(devices); + expect(args[1]).to.equal(message); + }); + }); + describe('sendMessageToDevices', () => { it('can send to many devices', async () => { - const devices = TestUtils.generateFakePubKeys(10); + hasSessionStub.resolves(false); + + const devices = TestUtils.generateFakePubKeys(5); const message = TestUtils.generateChatMessage(); - const promise = messageQueueStub.sendMessageToDevices(devices, message); - await expect(promise).to.be.fulfilled; + await messageQueueStub.sendMessageToDevices(devices, message); + expect(pendingMessageCache.getCache()).to.have.length(devices.length); }); - it('can send sync message and confirm canSync is valid', async () => { - canSyncStub.returns(true); + it('should send sync message if possible', async () => { + hasSessionStub.returns(false); - const devices = TestUtils.generateFakePubKeys(3); - const message = TestUtils.generateChatMessage(); - const pairedDeviceKeys = pairedDevices.map(device => device.key); + sandbox.stub(SyncMessageUtils, 'canSync').returns(true); + + sandbox + .stub(SyncMessageUtils, 'from') + .returns(new TestSyncMessage({ timestamp: Date.now() })); - const promise = messageQueueStub.sendMessageToDevices(devices, message); - await expect(promise).to.be.fulfilled; + // This stub ensures that the message won't process + const sendSyncMessageStub = sandbox + .stub(messageQueueStub, 'sendSyncMessage') + .resolves(); - // Check sendSyncMessage parameters - const previousArgs = sendSyncMessageSpy.lastCall.args as [ - ChatMessage, - Array - ]; - expect(sendSyncMessageSpy.callCount).to.equal(1); + const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)]; + sandbox + .stub(MultiDeviceProtocol, 'getAllDevices') + .callsFake(async user => { + if (ourDevice.isEqual(user)) { + return ourDevices; + } - // Check that instances are equal - expect(previousArgs).to.have.length(2); + return []; + }); - const argsChatMessage = previousArgs[0]; - const argsPairedKeys = [...previousArgs[1]].map(d => d.key); + const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)]; + const message = TestUtils.generateChatMessage(); - expect(argsChatMessage instanceof ChatMessage).to.equal( + await messageQueueStub.sendMessageToDevices(devices, message); + expect(sendSyncMessageStub.called).to.equal( true, - 'message passed into sendMessageToDevices was not a valid ChatMessage' + 'sendSyncMessage was not called.' ); - expect(argsChatMessage.isEqual(message)).to.equal( - true, - 'message passed into sendMessageToDevices has been mutated' + expect( + pendingMessageCache.getCache().map(c => c.device) + ).to.not.have.members( + ourDevices.map(d => d.key), + 'Sending regular messages to our own device is not allowed.' ); - - // argsPairedKeys and pairedDeviceKeys should contain the same values - const keyArgsValid = _.isEmpty(_.xor(argsPairedKeys, pairedDeviceKeys)); - expect(keyArgsValid).to.equal( - true, - 'devices passed into sendSyncMessage were invalid' + expect(pendingMessageCache.getCache()).to.have.length( + devices.length - ourDevices.length, + 'Messages should not be sent to our devices.' ); }); }); - describe('sendToGroup', () => { - it('can send to closed group', async () => { - const message = TestUtils.generateClosedGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); - expect(success).to.equal(true, 'sending to group failed'); - }); - - it('uses correct parameters for sendToGroup with ClosedGroupMessage', async () => { - const message = TestUtils.generateClosedGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); - - expect(success).to.equal(true, 'sending to group failed'); - - // Check parameters - const previousArgs = sendMessageToDevicesSpy.lastCall.args as [ - Array, - ClosedGroupMessage - ]; - expect(previousArgs).to.have.length(2); - - const argsClosedGroupMessage = previousArgs[1]; - expect(argsClosedGroupMessage instanceof ClosedGroupMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a ClosedGroupMessage' - ); - }); + describe('sendSyncMessage', () => { + it('should send a message to all our devices', async () => { + hasSessionStub.resolves(false); - it("won't send to invalid groupId", async () => { - const message = TestUtils.generateClosedGroupMessage('invalid-group-id'); - const success = await messageQueueStub.sendToGroup(message); + const ourOtherDevices = TestUtils.generateFakePubKeys(2); + const ourDevices = [ourDevice, ...ourOtherDevices]; + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(ourDevices); - // Ensure message parameter passed into sendToGroup is as expected - expect(success).to.equal( - false, - 'an invalid groupId was treated as valid' + await messageQueueStub.sendSyncMessage( + new TestSyncMessage({ timestamp: Date.now() }) ); - expect(sendToGroupSpy.callCount).to.equal(1); - const argsMessage = sendToGroupSpy.lastCall.args[0]; - expect(argsMessage instanceof ClosedGroupMessage).to.equal( - true, - 'message passed into sendToGroup was not a ClosedGroupMessage' + expect(pendingMessageCache.getCache()).to.have.length( + ourOtherDevices.length ); - expect(success).to.equal( - false, - 'invalid ClosedGroupMessage was propogated through sendToGroup' + expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members( + ourOtherDevices.map(d => d.key) ); }); + }); - it('wont send message to empty closed group', async () => { - groupMembersStub.resolves(TestUtils.generateFakePubKeys(0)); - - const message = TestUtils.generateClosedGroupMessage(); - const response = await messageQueueStub.sendToGroup(message); + describe('sendToGroup', () => { + describe('closed groups', async () => { + it('can send to closed group', async () => { + const members = TestUtils.generateFakePubKeys(4).map( + p => new PrimaryPubKey(p.key) + ); + sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members); - expect(response).to.equal( - false, - 'sendToGroup send a message to an empty group' - ); - }); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); - it('can send to open group', async () => { - const message = TestUtils.generateOpenGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); + const message = TestUtils.generateClosedGroupMessage(); + const success = await messageQueueStub.sendToGroup(message); + expect(success).to.equal(true, 'sending to group failed'); + expect(sendUsingMultiDeviceStub.callCount).to.equal(members.length); - expect(success).to.equal(true, 'sending to group failed'); - }); - }); + const arg = sendUsingMultiDeviceStub.getCall(0).args; + expect(arg[1] instanceof ClosedGroupMessage).to.equal( + true, + 'message sent to group member was not a ClosedGroupMessage' + ); + }); - describe('events', () => { - it('can send events on message sending success', async () => { - const successSpy = sandbox.spy(); - messageQueueStub.events.on('success', successSpy); + it('wont send message to empty closed group', async () => { + sandbox.stub(GroupUtils, 'getGroupMembers').resolves([]); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); - const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + const message = TestUtils.generateClosedGroupMessage(); + const response = await messageQueueStub.sendToGroup(message); - await tick(); - expect(successSpy.callCount).to.equal(1); + expect(response).to.equal( + false, + 'sendToGroup sent a message to an empty group' + ); + expect(sendUsingMultiDeviceStub.callCount).to.equal(0); + }); }); - it('can send events on message sending failure', async () => { - sendStub.throws(new Error('Failed to send message.')); + describe('open groups', async () => { + let sendToOpenGroupStub: sinon.SinonStub< + [OpenGroupMessage], + Promise + >; + beforeEach(() => { + sendToOpenGroupStub = sandbox + .stub(MessageSender, 'sendToOpenGroup') + .resolves(true); + }); - const failureSpy = sandbox.spy(); - messageQueueStub.events.on('fail', failureSpy); + it('can send to open group', async () => { + const message = TestUtils.generateOpenGroupMessage(); + const success = await messageQueueStub.sendToGroup(message); + expect(sendToOpenGroupStub.callCount).to.equal(1); + expect(success).to.equal(true, 'Sending to open group failed'); + }); - const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + it('should emit a success event when send was successful', async () => { + const message = TestUtils.generateOpenGroupMessage(); + const eventPromise = PromiseUtils.waitForTask(complete => { + messageQueueStub.events.once('success', complete); + }, 2000); + + await messageQueueStub.sendToGroup(message); + await expect(eventPromise).to.be.fulfilled; + }); + + it('should emit a fail event if something went wrong', async () => { + sendToOpenGroupStub.resolves(false); + const message = TestUtils.generateOpenGroupMessage(); + const eventPromise = PromiseUtils.waitForTask(complete => { + messageQueueStub.events.once('fail', complete); + }, 2000); - await tick(); - expect(failureSpy.callCount).to.equal(1); + await messageQueueStub.sendToGroup(message); + await expect(eventPromise).to.be.fulfilled; + }); }); }); }); diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index 5ed1e211e..5f806b5dc 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -36,7 +36,6 @@ describe('PendingMessageCache', () => { }); pendingMessageCacheStub = new PendingMessageCache(); - await pendingMessageCacheStub.isReady; }); afterEach(() => { @@ -44,7 +43,7 @@ describe('PendingMessageCache', () => { }); it('can initialize cache', async () => { - const cache = pendingMessageCacheStub.getAllPending(); + const cache = await pendingMessageCacheStub.getAllPending(); // We expect the cache to initialise as an empty array expect(cache).to.be.instanceOf(Array); @@ -59,7 +58,7 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); // Verify that the message is in the cache - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); @@ -68,6 +67,22 @@ describe('PendingMessageCache', () => { expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp); }); + it('can add multiple messages belonging to the same user', async () => { + const device = TestUtils.generateFakePubKey(); + + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + // We have to timeout here otherwise it's processed too fast and messages start having the same timestamp + await TestUtils.timeout(5); + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + await TestUtils.timeout(5); + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + + // Verify that the message is in the cache + const finalCache = await pendingMessageCacheStub.getAllPending(); + + expect(finalCache).to.have.length(3); + }); + it('can remove from cache', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); @@ -75,18 +90,47 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(1); // Remove the message await pendingMessageCacheStub.remove(rawMessage); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); // Verify that the message was removed expect(finalCache).to.have.length(0); }); + it('should only remove messages with different timestamp and device', async () => { + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + await pendingMessageCacheStub.add(device, message); + await TestUtils.timeout(5); + const one = await pendingMessageCacheStub.add( + device, + TestUtils.generateChatMessage(message.identifier) + ); + const two = await pendingMessageCacheStub.add( + TestUtils.generateFakePubKey(), + message + ); + + const initialCache = await pendingMessageCacheStub.getAllPending(); + expect(initialCache).to.have.length(3); + + // Remove the message + await pendingMessageCacheStub.remove(rawMessage); + + const finalCache = await pendingMessageCacheStub.getAllPending(); + + // Verify that the message was removed + expect(finalCache).to.have.length(2); + expect(finalCache).to.have.deep.members([one, two]); + }); + it('can get devices', async () => { const cacheItems = [ { @@ -103,16 +147,16 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const cache = pendingMessageCacheStub.getAllPending(); + const cache = await pendingMessageCacheStub.getAllPending(); expect(cache).to.have.length(cacheItems.length); // Get list of devices const devicesKeys = cacheItems.map(item => item.device.key); - const pulledDevices = pendingMessageCacheStub.getDevices(); + const pulledDevices = await pendingMessageCacheStub.getDevices(); const pulledDevicesKeys = pulledDevices.map(d => d.key); // Verify that device list from cache is equivalent to devices added @@ -131,21 +175,21 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(cacheItems.length); // Get pending for each specific device - cacheItems.forEach(item => { - const pendingForDevice = pendingMessageCacheStub.getForDevice( + for (const item of cacheItems) { + const pendingForDevice = await pendingMessageCacheStub.getForDevice( item.device ); expect(pendingForDevice).to.have.length(1); expect(pendingForDevice[0].device).to.equal(item.device.key); - }); + } }); it('can find nothing when empty', async () => { @@ -164,7 +208,7 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); const foundMessage = pendingMessageCacheStub.find(rawMessage); @@ -188,17 +232,17 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(cacheItems.length); // Clear cache await pendingMessageCacheStub.clear(); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(0); }); @@ -218,21 +262,20 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const addedMessages = pendingMessageCacheStub.getAllPending(); + const addedMessages = await pendingMessageCacheStub.getAllPending(); expect(addedMessages).to.have.length(cacheItems.length); // Rebuild from DB const freshCache = new PendingMessageCache(); - await freshCache.isReady; // Verify messages - const rebuiltMessages = freshCache.getAllPending(); + const rebuiltMessages = await freshCache.getAllPending(); - rebuiltMessages.forEach((message, index) => { + for (const [index, message] of rebuiltMessages.entries()) { const addedMessage = addedMessages[index]; // Pull out plainTextBuffer for a separate check @@ -254,6 +297,6 @@ describe('PendingMessageCache', () => { true, 'cached messages were not rebuilt properly' ); - }); + } }); }); diff --git a/ts/test/test-utils/stubs/index.ts b/ts/test/test-utils/stubs/index.ts index 10ad19f0e..d287adc26 100644 --- a/ts/test/test-utils/stubs/index.ts +++ b/ts/test/test-utils/stubs/index.ts @@ -1 +1,2 @@ export * from './ciphers'; +export * from './sending'; diff --git a/ts/test/test-utils/stubs/messages/TestSyncMessage.ts b/ts/test/test-utils/stubs/messages/TestSyncMessage.ts new file mode 100644 index 000000000..c82eecfa4 --- /dev/null +++ b/ts/test/test-utils/stubs/messages/TestSyncMessage.ts @@ -0,0 +1,7 @@ +import { SyncMessage } from '../../../../session/messages/outgoing'; +import { SignalService } from '../../../../protobuf'; +export class TestSyncMessage extends SyncMessage { + protected syncProto(): SignalService.SyncMessage { + return SignalService.SyncMessage.create({}); + } +} diff --git a/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts b/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts new file mode 100644 index 000000000..fa1fc5129 --- /dev/null +++ b/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts @@ -0,0 +1,22 @@ +import { PendingMessageCache } from '../../../../session/sending'; +import { RawMessage } from '../../../../session/types'; + +export class PendingMessageCacheStub extends PendingMessageCache { + public dbData: Array; + constructor(dbData: Array = []) { + super(); + this.dbData = dbData; + } + + public getCache(): Readonly> { + return this.cache; + } + + protected async getFromStorage() { + return this.dbData; + } + + protected async saveToDB() { + return; + } +} diff --git a/ts/test/test-utils/stubs/sending/index.ts b/ts/test/test-utils/stubs/sending/index.ts new file mode 100644 index 000000000..e9def4705 --- /dev/null +++ b/ts/test/test-utils/stubs/sending/index.ts @@ -0,0 +1 @@ +export * from './PendingMessageCacheStub'; diff --git a/ts/test/test-utils/utils/message.ts b/ts/test/test-utils/utils/message.ts index a1c2c7646..e2e71504a 100644 --- a/ts/test/test-utils/utils/message.ts +++ b/ts/test/test-utils/utils/message.ts @@ -7,10 +7,10 @@ import { v4 as uuid } from 'uuid'; import { OpenGroup } from '../../../session/types'; import { generateFakePubKey } from './pubkey'; -export function generateChatMessage(): ChatMessage { +export function generateChatMessage(identifier?: string): ChatMessage { return new ChatMessage({ body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', - identifier: uuid(), + identifier: identifier ?? uuid(), timestamp: Date.now(), attachments: undefined, quote: undefined,