From e2d5d9e793718686a3652ce975a0cd6e676bc67e Mon Sep 17 00:00:00 2001 From: Ryan Tharp <neuro@interx.net> Date: Wed, 10 Jun 2020 15:21:28 -0700 Subject: [PATCH 1/7] put lock around buildNewOnionPaths since it's called multiple times --- js/modules/loki_snode_api.js | 122 +++++++++++++++++------------------ 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index de0c8bf24..f8a1cd03e 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -416,87 +416,87 @@ class LokiSnodeAPI { }); } - // FIXME: need a lock because it is being called multiple times in parallel async buildNewOnionPaths() { - // Note: this function may be called concurrently, so - // might consider blocking the other calls + // Note: this function may be called concurrently - const _ = window.Lodash; - - log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths'); + return primitives.allowOnlyOneAtATime('buildNewOnionPaths', async () => { + const _ = window.Lodash; - const allNodes = await this.getRandomSnodePool(); + log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths'); - if (this.guardNodes.length === 0) { - // Not cached, load from DB - const nodes = await window.libloki.storage.getGuardNodes(); + const allNodes = await this.getRandomSnodePool(); - if (nodes.length === 0) { - log.warn( - 'LokiSnodeAPI::buildNewOnionPaths - no guard nodes in DB. Will be selecting new guards nodes...' - ); - } else { - // We only store the nodes' keys, need to find full entries: - const edKeys = nodes.map(x => x.ed25519PubKey); - this.guardNodes = allNodes.filter( - x => edKeys.indexOf(x.pubkey_ed25519) !== -1 - ); + if (this.guardNodes.length === 0) { + // Not cached, load from DB + const nodes = await window.libloki.storage.getGuardNodes(); - if (this.guardNodes.length < edKeys.length) { + if (nodes.length === 0) { log.warn( - `LokiSnodeAPI::buildNewOnionPaths - could not find some guard nodes: ${ - this.guardNodes.length - }/${edKeys.length} left` + 'LokiSnodeAPI::buildNewOnionPaths - no guard nodes in DB. Will be selecting new guards nodes...' + ); + } else { + // We only store the nodes' keys, need to find full entries: + const edKeys = nodes.map(x => x.ed25519PubKey); + this.guardNodes = allNodes.filter( + x => edKeys.indexOf(x.pubkey_ed25519) !== -1 ); + + if (this.guardNodes.length < edKeys.length) { + log.warn( + `LokiSnodeAPI::buildNewOnionPaths - could not find some guard nodes: ${ + this.guardNodes.length + }/${edKeys.length} left` + ); + } } - } - // If guard nodes is still empty (the old nodes are now invalid), select new ones: - if (this.guardNodes.length === 0) { - this.guardNodes = await this.selectGuardNodes(); + // If guard nodes is still empty (the old nodes are now invalid), select new ones: + if (this.guardNodes.length === 0) { + this.guardNodes = await this.selectGuardNodes(); + } } - } - // TODO: select one guard node and 2 other nodes randomly - let otherNodes = _.difference(allNodes, this.guardNodes); + // TODO: select one guard node and 2 other nodes randomly + let otherNodes = _.difference(allNodes, this.guardNodes); - if (otherNodes.length < 2) { - log.warn( - 'LokiSnodeAPI::buildNewOnionPaths - Too few nodes to build an onion path! Refreshing pool and retrying' - ); - await this.refreshRandomPool(); - await this.buildNewOnionPaths(); - return; - } + if (otherNodes.length < 2) { + log.warn( + 'LokiSnodeAPI::buildNewOnionPaths - Too few nodes to build an onion path! Refreshing pool and retrying' + ); + await this.refreshRandomPool(); + await this.buildNewOnionPaths(); + return; + } - otherNodes = _.shuffle(otherNodes); - const guards = _.shuffle(this.guardNodes); + otherNodes = _.shuffle(otherNodes); + const guards = _.shuffle(this.guardNodes); - // Create path for every guard node: - const nodesNeededPerPaths = window.lokiFeatureFlags.onionRequestHops - 1; + // Create path for every guard node: + const nodesNeededPerPaths = window.lokiFeatureFlags.onionRequestHops - 1; - // Each path needs X (nodesNeededPerPaths) nodes in addition to the guard node: - const maxPath = Math.floor( - Math.min( - guards.length, - nodesNeededPerPaths - ? otherNodes.length / nodesNeededPerPaths - : otherNodes.length - ) - ); + // Each path needs X (nodesNeededPerPaths) nodes in addition to the guard node: + const maxPath = Math.floor( + Math.min( + guards.length, + nodesNeededPerPaths + ? otherNodes.length / nodesNeededPerPaths + : otherNodes.length + ) + ); - // TODO: might want to keep some of the existing paths - this.onionPaths = []; + // TODO: might want to keep some of the existing paths + this.onionPaths = []; - for (let i = 0; i < maxPath; i += 1) { - const path = [guards[i]]; - for (let j = 0; j < nodesNeededPerPaths; j += 1) { - path.push(otherNodes[i * nodesNeededPerPaths + j]); + for (let i = 0; i < maxPath; i += 1) { + const path = [guards[i]]; + for (let j = 0; j < nodesNeededPerPaths; j += 1) { + path.push(otherNodes[i * nodesNeededPerPaths + j]); + } + this.onionPaths.push({ path, bad: false }); } - this.onionPaths.push({ path, bad: false }); - } - log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths); + log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths); + }); } async getRandomSnodeAddress() { From a905703cb4be3139fca83c9e4d0cde05b090277b Mon Sep 17 00:00:00 2001 From: Ryan Tharp <neuro@interx.net> Date: Thu, 18 Jun 2020 14:55:47 -0700 Subject: [PATCH 2/7] move internal buildNewOnionPaths function into buildNewOnionPathsWorker per review --- js/modules/loki_snode_api.js | 127 ++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 63 deletions(-) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index ebdc681ba..6d04da9c8 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -408,88 +408,89 @@ class LokiSnodeAPI { }); } - async buildNewOnionPaths() { - // Note: this function may be called concurrently + async buildNewOnionPathsWorker() { + const _ = window.Lodash; - return primitives.allowOnlyOneAtATime('buildNewOnionPaths', async () => { - const _ = window.Lodash; + log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths'); - log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths'); + const allNodes = await this.getRandomSnodePool(); - const allNodes = await this.getRandomSnodePool(); + if (this.guardNodes.length === 0) { + // Not cached, load from DB + const nodes = await window.libloki.storage.getGuardNodes(); - if (this.guardNodes.length === 0) { - // Not cached, load from DB - const nodes = await window.libloki.storage.getGuardNodes(); + if (nodes.length === 0) { + log.warn( + 'LokiSnodeAPI::buildNewOnionPaths - no guard nodes in DB. Will be selecting new guards nodes...' + ); + } else { + // We only store the nodes' keys, need to find full entries: + const edKeys = nodes.map(x => x.ed25519PubKey); + this.guardNodes = allNodes.filter( + x => edKeys.indexOf(x.pubkey_ed25519) !== -1 + ); - if (nodes.length === 0) { + if (this.guardNodes.length < edKeys.length) { log.warn( - 'LokiSnodeAPI::buildNewOnionPaths - no guard nodes in DB. Will be selecting new guards nodes...' - ); - } else { - // We only store the nodes' keys, need to find full entries: - const edKeys = nodes.map(x => x.ed25519PubKey); - this.guardNodes = allNodes.filter( - x => edKeys.indexOf(x.pubkey_ed25519) !== -1 + `LokiSnodeAPI::buildNewOnionPaths - could not find some guard nodes: ${ + this.guardNodes.length + }/${edKeys.length} left` ); - - if (this.guardNodes.length < edKeys.length) { - log.warn( - `LokiSnodeAPI::buildNewOnionPaths - could not find some guard nodes: ${ - this.guardNodes.length - }/${edKeys.length} left` - ); - } } + } - // If guard nodes is still empty (the old nodes are now invalid), select new ones: - if (this.guardNodes.length < MIN_GUARD_COUNT) { - // TODO: don't throw away potentially good guard nodes - this.guardNodes = await this.selectGuardNodes(); - } + // If guard nodes is still empty (the old nodes are now invalid), select new ones: + if (this.guardNodes.length < MIN_GUARD_COUNT) { + // TODO: don't throw away potentially good guard nodes + this.guardNodes = await this.selectGuardNodes(); } + } - // TODO: select one guard node and 2 other nodes randomly - let otherNodes = _.difference(allNodes, this.guardNodes); + // TODO: select one guard node and 2 other nodes randomly + let otherNodes = _.difference(allNodes, this.guardNodes); - if (otherNodes.length < 2) { - log.warn( - 'LokiSnodeAPI::buildNewOnionPaths - Too few nodes to build an onion path! Refreshing pool and retrying' - ); - await this.refreshRandomPool(); - await this.buildNewOnionPaths(); - return; - } + if (otherNodes.length < 2) { + log.warn( + 'LokiSnodeAPI::buildNewOnionPaths - Too few nodes to build an onion path! Refreshing pool and retrying' + ); + await this.refreshRandomPool(); + await this.buildNewOnionPaths(); + return; + } - otherNodes = _.shuffle(otherNodes); - const guards = _.shuffle(this.guardNodes); + otherNodes = _.shuffle(otherNodes); + const guards = _.shuffle(this.guardNodes); - // Create path for every guard node: - const nodesNeededPerPaths = window.lokiFeatureFlags.onionRequestHops - 1; + // Create path for every guard node: + const nodesNeededPerPaths = window.lokiFeatureFlags.onionRequestHops - 1; - // Each path needs X (nodesNeededPerPaths) nodes in addition to the guard node: - const maxPath = Math.floor( - Math.min( - guards.length, - nodesNeededPerPaths - ? otherNodes.length / nodesNeededPerPaths - : otherNodes.length - ) - ); + // Each path needs X (nodesNeededPerPaths) nodes in addition to the guard node: + const maxPath = Math.floor( + Math.min( + guards.length, + nodesNeededPerPaths + ? otherNodes.length / nodesNeededPerPaths + : otherNodes.length + ) + ); - // TODO: might want to keep some of the existing paths - this.onionPaths = []; + // TODO: might want to keep some of the existing paths + this.onionPaths = []; - for (let i = 0; i < maxPath; i += 1) { - const path = [guards[i]]; - for (let j = 0; j < nodesNeededPerPaths; j += 1) { - path.push(otherNodes[i * nodesNeededPerPaths + j]); - } - this.onionPaths.push({ path, bad: false }); + for (let i = 0; i < maxPath; i += 1) { + const path = [guards[i]]; + for (let j = 0; j < nodesNeededPerPaths; j += 1) { + path.push(otherNodes[i * nodesNeededPerPaths + j]); } + this.onionPaths.push({ path, bad: false }); + } - log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths); - }); + log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths); + } + + async buildNewOnionPaths() { + // this function may be called concurrently make sure we only have one inflight + return primitives.allowOnlyOneAtATime('buildNewOnionPaths', this.buildNewOnionPathsWorker); } async getRandomSnodeAddress() { From ef76972ccb07c3991c8433fb60c143d380782411 Mon Sep 17 00:00:00 2001 From: Mikunj <mikunj@live.com.au> Date: Fri, 19 Jun 2020 09:31:01 +1000 Subject: [PATCH 3/7] Allow passing a cache to the queue --- ts/session/sending/MessageQueue.ts | 10 ++++++---- ts/session/sending/PendingMessageCache.ts | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index b2c89d804..9df2584cc 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -26,9 +26,9 @@ export class MessageQueue implements MessageQueueInterface { private readonly jobQueues: Map<PubKey, JobQueue> = new Map(); private readonly pendingMessageCache: PendingMessageCache; - constructor() { + constructor(cache?: PendingMessageCache) { this.events = new EventEmitter(); - this.pendingMessageCache = new PendingMessageCache(); + this.pendingMessageCache = cache ?? new PendingMessageCache(); void this.processAllPending(); } @@ -143,10 +143,12 @@ export class MessageQueue implements MessageQueueInterface { await jobQueue.addWithId(messageId, async () => MessageSender.send(message) ); - void this.pendingMessageCache.remove(message); this.events.emit('success', message); } catch (e) { this.events.emit('fail', message, e); + } finally { + // Remove from the cache because retrying is done in the sender + void this.pendingMessageCache.remove(message); } } }); @@ -173,7 +175,7 @@ export class MessageQueue implements MessageQueueInterface { } await this.pendingMessageCache.add(device, message); - await this.processPending(device); + void this.processPending(device); } private getJobQueue(device: PubKey): JobQueue { diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 0a4037c1c..3ee7005f0 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -93,12 +93,12 @@ export class PendingMessageCache { await this.saveToDB(); } - private async loadFromDB() { + protected async loadFromDB() { const messages = await this.getFromStorage(); this.cache = messages; } - private async getFromStorage(): Promise<Array<RawMessage>> { + protected async getFromStorage(): Promise<Array<RawMessage>> { const data = await getItemById('pendingMessages'); if (!data || !data.value) { return []; @@ -117,7 +117,7 @@ export class PendingMessageCache { }); } - private async saveToDB() { + protected async saveToDB() { // For each plainTextBuffer in cache, save in as a simple Array<number> to avoid // Node issues with JSON stringifying Buffer without strict typing const encodedCache = [...this.cache].map(item => { From d862269f8d32d9d94848b91100b7eb6ba5e74d1b Mon Sep 17 00:00:00 2001 From: Mikunj <mikunj@live.com.au> Date: Fri, 19 Jun 2020 10:33:02 +1000 Subject: [PATCH 4/7] Update tests --- ts/session/sending/MessageQueue.ts | 82 +-- ts/session/sending/MessageQueueInterface.ts | 6 +- ts/session/sending/PendingMessageCache.ts | 43 +- ts/session/utils/Groups.ts | 21 +- ts/session/utils/SyncMessageUtils.ts | 4 +- ts/test/session/sending/MessageQueue_test.ts | 509 +++++++++--------- .../sending/PendingMessageCache_test.ts | 98 +++- ts/test/test-utils/stubs/index.ts | 1 + .../stubs/messages/TestSyncMessage.ts | 7 + .../stubs/sending/PendingMessageCacheStub.ts | 22 + ts/test/test-utils/stubs/sending/index.ts | 1 + ts/test/test-utils/testUtils.ts | 84 ++- 12 files changed, 523 insertions(+), 355 deletions(-) create mode 100644 ts/test/test-utils/stubs/messages/TestSyncMessage.ts create mode 100644 ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts create mode 100644 ts/test/test-utils/stubs/sending/index.ts diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 9df2584cc..d5e62286b 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -8,6 +8,7 @@ import { ContentMessage, OpenGroupMessage, SessionRequestMessage, + SyncMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; import { @@ -20,6 +21,7 @@ import { PubKey } from '../types'; import { MessageSender } from '.'; import { MultiDeviceProtocol, SessionProtocol } from '../protocols'; import { UserUtil } from '../../util'; +import promise from 'redux-promise-middleware'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>; @@ -50,20 +52,18 @@ export class MessageQueue implements MessageQueueInterface { // Sync to our devices if syncable if (SyncMessageUtils.canSync(message)) { - const currentDevice = await UserUtil.getCurrentDevicePubKey(); - - if (currentDevice) { - const ourDevices = await MultiDeviceProtocol.getAllDevices( - currentDevice + const syncMessage = SyncMessageUtils.from(message); + if (!syncMessage) { + throw new Error( + 'MessageQueue internal error occured: failed to make sync message' ); + } - await this.sendSyncMessage(message, ourDevices); + await this.sendSyncMessage(syncMessage); - // Remove our devices from currentDevices - currentDevices = currentDevices.filter(device => - ourDevices.some(d => device.isEqual(d)) - ); - } + const ourDevices = await MultiDeviceProtocol.getOurDevices(); + // Remove our devices from currentDevices + currentDevices = currentDevices.filter(device => !ourDevices.some(d => device.isEqual(d))); } const promises = currentDevices.map(async device => { @@ -79,30 +79,42 @@ export class MessageQueue implements MessageQueueInterface { // Closed groups if (message instanceof ClosedGroupMessage) { // Get devices in closed group - const groupPubKey = PubKey.from(message.groupId); - if (!groupPubKey) { + const recipients = await GroupUtils.getGroupMembers(message.groupId); + if (recipients.length === 0) { return false; } - const recipients = await GroupUtils.getGroupMembers(groupPubKey); - - if (recipients.length) { - await this.sendMessageToDevices(recipients, message); + // Send to all devices of members + await Promise.all( + recipients.map(async recipient => + this.sendUsingMultiDevice(recipient, message) + ) + ); - return true; - } + return true; } // Open groups if (message instanceof OpenGroupMessage) { // No queue needed for Open Groups; send directly + const error = new Error('Failed to send message to open group.'); + + // This is absolutely yucky ... we need to make it not use Promise<boolean> try { - await MessageSender.sendToOpenGroup(message); - this.events.emit('success', message); + const result = await MessageSender.sendToOpenGroup(message); + if (result) { + this.events.emit('success', message); + } else { + this.events.emit('fail', message, error); + } - return true; + return result; } catch (e) { - this.events.emit('fail', message, e); + console.warn( + `Failed to send message to open group: ${message.group.server}`, + e + ); + this.events.emit('fail', message, error); return false; } @@ -111,21 +123,20 @@ export class MessageQueue implements MessageQueueInterface { return false; } - public async sendSyncMessage(message: ContentMessage, sendTo: Array<PubKey>) { - // Sync with our devices - const promises = sendTo.map(async device => { - const syncMessage = SyncMessageUtils.from(message); - - return this.process(device, syncMessage); - }); + public async sendSyncMessage(message: SyncMessage | undefined): Promise<any> { + if (!message) { + return; + } + const ourDevices = await MultiDeviceProtocol.getOurDevices(); + const promises = ourDevices.map(async device => this.process(device, message)); return Promise.all(promises); } public async processPending(device: PubKey) { - const messages = this.pendingMessageCache.getForDevice(device); + const messages = await this.pendingMessageCache.getForDevice(device); - const isMediumGroup = GroupUtils.isMediumGroup(device); + const isMediumGroup = GroupUtils.isMediumGroup(device.key); const hasSession = await SessionProtocol.hasSession(device); if (!isMediumGroup && !hasSession) { @@ -155,13 +166,16 @@ export class MessageQueue implements MessageQueueInterface { } private async processAllPending() { - const devices = this.pendingMessageCache.getDevices(); + const devices = await this.pendingMessageCache.getDevices(); const promises = devices.map(async device => this.processPending(device)); return Promise.all(promises); } - private async process(device: PubKey, message?: ContentMessage) { + private async process( + device: PubKey, + message?: ContentMessage + ): Promise<void> { // Don't send to ourselves const currentDevice = await UserUtil.getCurrentDevicePubKey(); if (!message || (currentDevice && device.isEqual(currentDevice))) { diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index c3ee606aa..5bed428ca 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -2,6 +2,7 @@ import { ClosedGroupMessage, ContentMessage, OpenGroupMessage, + SyncMessage, } from '../messages/outgoing'; import { RawMessage } from '../types/RawMessage'; import { TypedEventEmitter } from '../utils'; @@ -19,8 +20,5 @@ export interface MessageQueueInterface { sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; - sendSyncMessage( - message: ContentMessage, - sendTo: Array<PubKey> - ): Promise<Array<void>>; + sendSyncMessage(message: SyncMessage | undefined): Promise<any>; } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 3ee7005f0..fcc571cc6 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -1,3 +1,4 @@ +import _ from 'lodash'; import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; @@ -12,33 +13,25 @@ import { MessageUtils } from '../utils'; // memory and sync its state with the database on modification (add or remove). export class PendingMessageCache { - public readonly isReady: Promise<boolean>; - private cache: Array<RawMessage>; + protected loadPromise: Promise<void> | undefined; + protected cache: Array<RawMessage> = []; - constructor() { - // Load pending messages from the database - // You should await isReady on making a new PendingMessageCache - // if you'd like to have instant access to the cache - this.cache = []; - - this.isReady = new Promise(async resolve => { - await this.loadFromDB(); - resolve(true); - }); - } - - public getAllPending(): Array<RawMessage> { + public async getAllPending(): Promise<Array<RawMessage>> { + await this.loadFromDBIfNeeded(); // Get all pending from cache, sorted with oldest first return [...this.cache].sort((a, b) => a.timestamp - b.timestamp); } - public getForDevice(device: PubKey): Array<RawMessage> { - return this.getAllPending().filter(m => m.device === device.key); + public async getForDevice(device: PubKey): Promise<Array<RawMessage>> { + const pending = await this.getAllPending(); + return pending.filter(m => m.device === device.key); } - public getDevices(): Array<PubKey> { + public async getDevices(): Promise<Array<PubKey>> { + await this.loadFromDBIfNeeded(); + // Gets all unique devices with pending messages - const pubkeyStrings = [...new Set(this.cache.map(m => m.device))]; + const pubkeyStrings = _.uniq(this.cache.map(m => m.device)); return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k); } @@ -47,6 +40,7 @@ export class PendingMessageCache { device: PubKey, message: ContentMessage ): Promise<RawMessage> { + await this.loadFromDBIfNeeded(); const rawMessage = MessageUtils.toRawMessage(device, message); // Does it exist in cache already? @@ -63,6 +57,7 @@ export class PendingMessageCache { public async remove( message: RawMessage ): Promise<Array<RawMessage> | undefined> { + await this.loadFromDBIfNeeded(); // Should only be called after message is processed // Return if message doesn't exist in cache @@ -72,7 +67,7 @@ export class PendingMessageCache { // Remove item from cache and sync with database const updatedCache = this.cache.filter( - m => m.identifier !== message.identifier + cached => !(cached.device === message.device && cached.timestamp === message.timestamp) ); this.cache = updatedCache; await this.saveToDB(); @@ -93,6 +88,14 @@ export class PendingMessageCache { await this.saveToDB(); } + protected async loadFromDBIfNeeded() { + if (!this.loadPromise) { + this.loadPromise = this.loadFromDB(); + } + + await this.loadPromise; + } + protected async loadFromDB() { const messages = await this.getFromStorage(); this.cache = messages; diff --git a/ts/session/utils/Groups.ts b/ts/session/utils/Groups.ts index d73104b63..c3c4cf1b9 100644 --- a/ts/session/utils/Groups.ts +++ b/ts/session/utils/Groups.ts @@ -1,7 +1,11 @@ -import { PubKey } from '../types'; +import _ from 'lodash'; +import { PrimaryPubKey } from '../types'; +import { MultiDeviceProtocol } from '../protocols'; -export async function getGroupMembers(groupId: PubKey): Promise<Array<PubKey>> { - const groupConversation = window.ConversationController.get(groupId.key); +export async function getGroupMembers( + groupId: string +): Promise<Array<PrimaryPubKey>> { + const groupConversation = window.ConversationController.get(groupId); const groupMembers = groupConversation ? groupConversation.attributes.members : undefined; @@ -10,11 +14,16 @@ export async function getGroupMembers(groupId: PubKey): Promise<Array<PubKey>> { return []; } - return groupMembers.map((member: string) => new PubKey(member)); + const promises = (groupMembers as Array<string>).map(async (member: string) => + MultiDeviceProtocol.getPrimaryDevice(member) + ); + const primaryDevices = await Promise.all(promises); + + return _.uniqWith(primaryDevices, (a, b) => a.isEqual(b)); } -export function isMediumGroup(groupId: PubKey): boolean { - const conversation = window.ConversationController.get(groupId.key); +export function isMediumGroup(groupId: string): boolean { + const conversation = window.ConversationController.get(groupId); if (!conversation) { return false; diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts index afab775b0..601d3cb8b 100644 --- a/ts/session/utils/SyncMessageUtils.ts +++ b/ts/session/utils/SyncMessageUtils.ts @@ -5,7 +5,9 @@ import { ContentMessage, SyncMessage } from '../messages/outgoing'; import { MultiDeviceProtocol } from '../protocols'; export function from(message: ContentMessage): SyncMessage | undefined { - // const { timestamp, identifier } = message; + if (message instanceof SyncMessage) { + return message; + } // Stubbed for now return undefined; diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 746eb224f..739bae2cf 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -1,96 +1,52 @@ -import { expect } from 'chai'; +import chai from 'chai'; import * as sinon from 'sinon'; import * as _ from 'lodash'; import { GroupUtils, SyncMessageUtils } from '../../../session/utils'; import { Stubs, TestUtils } from '../../../test/test-utils'; import { MessageQueue } from '../../../session/sending/MessageQueue'; import { - ChatMessage, ClosedGroupMessage, ContentMessage, OpenGroupMessage, } from '../../../session/messages/outgoing'; -import { PubKey, RawMessage } from '../../../session/types'; +import { PrimaryPubKey, PubKey, RawMessage } from '../../../session/types'; import { UserUtil } from '../../../util'; -import { MessageSender, PendingMessageCache } from '../../../session/sending'; -import { toRawMessage } from '../../../session/utils/Messages'; +import { MessageSender } from '../../../session/sending'; import { MultiDeviceProtocol, SessionProtocol, } from '../../../session/protocols'; +import { PendingMessageCacheStub } from '../../test-utils/stubs'; +import { describe } from 'mocha'; +import { TestSyncMessage } from '../../test-utils/stubs/messages/TestSyncMessage'; -// Equivalent to Data.StorageItem -interface StorageItem { - id: string; - value: any; -} - -// Helper function to force sequential on events checks -async function tick() { - return new Promise(resolve => { - // tslint:disable-next-line: no-string-based-set-timeout - setTimeout(resolve, 0); - }); -} +// tslint:disable-next-line: no-require-imports no-var-requires +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); + +const { expect } = chai; describe('MessageQueue', () => { // Initialize new stubbed cache - let data: StorageItem; const sandbox = sinon.createSandbox(); const ourDevice = TestUtils.generateFakePubKey(); const ourNumber = ourDevice.key; - const pairedDevices = TestUtils.generateFakePubKeys(2); // Initialize new stubbed queue + let pendingMessageCache: PendingMessageCacheStub; let messageQueueStub: MessageQueue; - // Spies - let sendMessageToDevicesSpy: sinon.SinonSpy< - [Array<PubKey>, ContentMessage], - Promise<Array<void>> - >; - let sendSyncMessageSpy: sinon.SinonSpy< - [ContentMessage, Array<PubKey>], - Promise<Array<void>> - >; - let sendToGroupSpy: sinon.SinonSpy< - [OpenGroupMessage | ClosedGroupMessage], - Promise<boolean> - >; - // Message Sender Stubs let sendStub: sinon.SinonStub<[RawMessage, (number | undefined)?]>; - let sendToOpenGroupStub: sinon.SinonStub<[OpenGroupMessage]>; // Utils Stubs - let groupMembersStub: sinon.SinonStub; - let canSyncStub: sinon.SinonStub<[ContentMessage], boolean>; + let isMediumGroupStub: sinon.SinonStub<[string], boolean>; // Session Protocol Stubs let hasSessionStub: sinon.SinonStub<[PubKey]>; let sendSessionRequestIfNeededStub: sinon.SinonStub<[PubKey], Promise<void>>; beforeEach(async () => { - // Stub out methods which touch the database - const storageID = 'pendingMessages'; - data = { - id: storageID, - value: '[]', - }; - - // Pending Message Cache Data Stubs - TestUtils.stubData('getItemById') - .withArgs('pendingMessages') - .resolves(data); - TestUtils.stubData('createOrUpdateItem').callsFake((item: StorageItem) => { - if (item.id === storageID) { - data = item; - } - }); - // Utils Stubs - canSyncStub = sandbox.stub(SyncMessageUtils, 'canSync'); - canSyncStub.returns(false); sandbox.stub(UserUtil, 'getCurrentDevicePubKey').resolves(ourNumber); - sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(pairedDevices); TestUtils.stubWindow('libsignal', { SignalProtocolAddress: sandbox.stub(), @@ -99,15 +55,11 @@ describe('MessageQueue', () => { // Message Sender Stubs sendStub = sandbox.stub(MessageSender, 'send').resolves(); - sendToOpenGroupStub = sandbox - .stub(MessageSender, 'sendToOpenGroup') - .resolves(true); // Group Utils Stubs - sandbox.stub(GroupUtils, 'isMediumGroup').returns(false); - groupMembersStub = sandbox - .stub(GroupUtils, 'getGroupMembers' as any) - .resolves(TestUtils.generateFakePubKeys(10)); + isMediumGroupStub = sandbox + .stub(GroupUtils, 'isMediumGroup') + .returns(false); // Session Protocol Stubs sandbox.stub(SessionProtocol, 'sendSessionRequest').resolves(); @@ -116,37 +68,9 @@ describe('MessageQueue', () => { .stub(SessionProtocol, 'sendSessionRequestIfNeeded') .resolves(); - // Pending Mesage Cache Stubs - const chatMessages = Array.from( - { length: 10 }, - TestUtils.generateChatMessage - ); - const rawMessage = toRawMessage( - TestUtils.generateFakePubKey(), - TestUtils.generateChatMessage() - ); - - sandbox.stub(PendingMessageCache.prototype, 'add').resolves(rawMessage); - sandbox.stub(PendingMessageCache.prototype, 'remove').resolves(); - sandbox - .stub(PendingMessageCache.prototype, 'getDevices') - .returns(TestUtils.generateFakePubKeys(10)); - sandbox - .stub(PendingMessageCache.prototype, 'getForDevice') - .returns( - chatMessages.map(m => toRawMessage(TestUtils.generateFakePubKey(), m)) - ); - - // Spies - sendSyncMessageSpy = sandbox.spy(MessageQueue.prototype, 'sendSyncMessage'); - sendMessageToDevicesSpy = sandbox.spy( - MessageQueue.prototype, - 'sendMessageToDevices' - ); - sendToGroupSpy = sandbox.spy(MessageQueue.prototype, 'sendToGroup'); - // Init Queue - messageQueueStub = new MessageQueue(); + pendingMessageCache = new PendingMessageCacheStub(); + messageQueueStub = new MessageQueue(pendingMessageCache); }); afterEach(() => { @@ -154,233 +78,300 @@ describe('MessageQueue', () => { sandbox.restore(); }); - describe('send', () => { - it('can send to a single device', async () => { - const device = TestUtils.generateFakePubKey(); - const message = TestUtils.generateChatMessage(); - - const promise = messageQueueStub.send(device, message); - await expect(promise).to.be.fulfilled; - }); - - it('can send sync message', async () => { - const devices = TestUtils.generateFakePubKeys(3); - const message = TestUtils.generateChatMessage(); - - const promise = messageQueueStub.sendSyncMessage(message, devices); - expect(promise).to.be.fulfilled; - }); - }); - describe('processPending', () => { it('will send session request message if no session', async () => { hasSessionStub.resolves(false); + isMediumGroupStub.resolves(false); const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; - expect(sendSessionRequestIfNeededStub.callCount).to.equal(1); + const stubCallPromise = TestUtils.waitUntil(() => sendSessionRequestIfNeededStub.callCount === 1); + + await messageQueueStub.processPending(device); + expect(stubCallPromise).to.be.fulfilled; }); it('will send message if session exists', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.resolves(false); + sendStub.resolves(); + const device = TestUtils.generateFakePubKey(); - const hasSession = await hasSessionStub(device); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + const successPromise = TestUtils.waitForTask(done => { + messageQueueStub.events.once('success', done); + }); - expect(hasSession).to.equal(true, 'session does not exist'); - expect(sendSessionRequestIfNeededStub.callCount).to.equal(0); + await messageQueueStub.processPending(device); + await expect(successPromise).to.be.fulfilled; + expect(sendSessionRequestIfNeededStub.called).to.equal( + false, + 'Session request triggered when we have a session.' + ); }); - }); - describe('sendUsingMultiDevice', () => { - it('can send using multidevice', async () => { + it('will send message if sending to medium group', async () => { + isMediumGroupStub.resolves(true); + sendStub.resolves(); + const device = TestUtils.generateFakePubKey(); - const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const promise = messageQueueStub.sendUsingMultiDevice(device, message); - await expect(promise).to.be.fulfilled; + const successPromise = TestUtils.waitForTask(done => { + messageQueueStub.events.once('success', done); + }); - // Ensure the arguments passed into sendMessageToDevices are correct - const previousArgs = sendMessageToDevicesSpy.lastCall.args as [ - Array<PubKey>, - ChatMessage - ]; + await messageQueueStub.processPending(device); + await expect(successPromise).to.be.fulfilled; + expect(sendSessionRequestIfNeededStub.called).to.equal( + false, + 'Session request triggered on medium group' + ); + }); - // Check that instances are equal - expect(previousArgs).to.have.length(2); + it('should remove message from cache', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.resolves(false); + + const events = ['success', 'fail']; + for (const event of events) { + if (event === 'success') { + sendStub.resolves(); + } else { + sendStub.throws(new Error('fail')); + } + + const device = TestUtils.generateFakePubKey(); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); + + const initialMessages = await pendingMessageCache.getForDevice(device); + expect(initialMessages).to.have.length(1); + await messageQueueStub.processPending(device); + + const promise = TestUtils.waitUntil(async () => { + const messages = await pendingMessageCache.getForDevice(device); + return messages.length === 0; + }); + expect(promise).to.be.fulfilled; + } + }); - const argsPairedDevices = previousArgs[0]; - const argsChatMessage = previousArgs[1]; + describe('events', () => { + it('should send a success event if message was sent', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.resolves(false); + sendStub.resolves(); - expect(argsChatMessage instanceof ChatMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a valid ChatMessage' - ); - expect(argsChatMessage.isEqual(message)).to.equal( - true, - 'message passed into sendMessageToDevices has been mutated' - ); + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, message); - argsPairedDevices.forEach((argsPaired: PubKey, index: number) => { - expect(argsPaired instanceof PubKey).to.equal( - true, - 'a device passed into sendMessageToDevices was not a PubKey' - ); - expect(argsPaired.isEqual(pairedDevices[index])).to.equal( - true, - 'a device passed into sendMessageToDevices did not match MessageDeviceProtocol.getAllDevices' - ); + const eventPromise = TestUtils.waitForTask<RawMessage | OpenGroupMessage>(complete => { + messageQueueStub.events.once('success', complete); + }); + + await messageQueueStub.processPending(device); + await expect(eventPromise).to.be.fulfilled; + + const rawMessage = await eventPromise; + expect(rawMessage.identifier).to.equal(message.identifier); + }); + + it('should send a fail event if something went wrong while sending', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.resolves(false); + sendStub.throws(new Error('failure')); + + const spy = sandbox.spy(); + messageQueueStub.events.on('fail', spy); + + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, message); + + const eventPromise = TestUtils.waitForTask<[RawMessage | OpenGroupMessage, Error]>(complete => { + messageQueueStub.events.once('fail', (...args) => { + complete(args); + }); + }); + + await messageQueueStub.processPending(device); + await expect(eventPromise).to.be.fulfilled; + + const [rawMessage, error] = await eventPromise; + expect(rawMessage.identifier).to.equal(message.identifier); + expect(error.message).to.equal('failure'); }); }); }); + describe('sendUsingMultiDevice', () => { + it('should send the message to all the devices', async () => { + const devices = TestUtils.generateFakePubKeys(3); + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices); + const stub = sandbox + .stub(messageQueueStub, 'sendMessageToDevices') + .resolves(); + + const message = TestUtils.generateChatMessage(); + await messageQueueStub.sendUsingMultiDevice(devices[0], message); + + const args = stub.lastCall.args as [Array<PubKey>, ContentMessage]; + expect(args[0]).to.have.same.members(devices); + expect(args[1]).to.equal(message); + }); + }); + describe('sendMessageToDevices', () => { it('can send to many devices', async () => { - const devices = TestUtils.generateFakePubKeys(10); + hasSessionStub.resolves(false); + + const devices = TestUtils.generateFakePubKeys(5); const message = TestUtils.generateChatMessage(); - const promise = messageQueueStub.sendMessageToDevices(devices, message); + await messageQueueStub.sendMessageToDevices(devices, message); + const promise = TestUtils.waitUntil(() => pendingMessageCache.getCache().length === devices.length); await expect(promise).to.be.fulfilled; }); - it('can send sync message and confirm canSync is valid', async () => { - canSyncStub.returns(true); + it('should send sync message if possible', async () => { + hasSessionStub.returns(false); - const devices = TestUtils.generateFakePubKeys(3); - const message = TestUtils.generateChatMessage(); - const pairedDeviceKeys = pairedDevices.map(device => device.key); + sandbox.stub(SyncMessageUtils, 'canSync').returns(true); - const promise = messageQueueStub.sendMessageToDevices(devices, message); - await expect(promise).to.be.fulfilled; + sandbox + .stub(SyncMessageUtils, 'from') + .returns(new TestSyncMessage({ timestamp: Date.now() })); - // Check sendSyncMessage parameters - const previousArgs = sendSyncMessageSpy.lastCall.args as [ - ChatMessage, - Array<PubKey> - ]; - expect(sendSyncMessageSpy.callCount).to.equal(1); + // This stub ensures that the message won't process + const sendSyncMessageStub = sandbox + .stub(messageQueueStub, 'sendSyncMessage') + .resolves(); - // Check that instances are equal - expect(previousArgs).to.have.length(2); + const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)]; + sandbox + .stub(MultiDeviceProtocol, 'getAllDevices') + .callsFake(async user => { + if (ourDevice.isEqual(user)) { + return ourDevices; + } - const argsChatMessage = previousArgs[0]; - const argsPairedKeys = [...previousArgs[1]].map(d => d.key); + return []; + }); - expect(argsChatMessage instanceof ChatMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a valid ChatMessage' - ); - expect(argsChatMessage.isEqual(message)).to.equal( - true, - 'message passed into sendMessageToDevices has been mutated' - ); + const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)]; + const message = TestUtils.generateChatMessage(); - // argsPairedKeys and pairedDeviceKeys should contain the same values - const keyArgsValid = _.isEmpty(_.xor(argsPairedKeys, pairedDeviceKeys)); - expect(keyArgsValid).to.equal( + await messageQueueStub.sendMessageToDevices(devices, message); + expect(sendSyncMessageStub.called).to.equal( true, - 'devices passed into sendSyncMessage were invalid' + 'sendSyncMessage was not called.' + ); + expect(pendingMessageCache.getCache().map(c => c.device)).to.not.have.members(ourDevices.map(d => d.key), 'Sending regular messages to our own device is not allowed.'); + expect(pendingMessageCache.getCache()).to.have.length( + devices.length - ourDevices.length, + 'Messages should not be sent to our devices.' ); }); }); - describe('sendToGroup', () => { - it('can send to closed group', async () => { - const message = TestUtils.generateClosedGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); - expect(success).to.equal(true, 'sending to group failed'); - }); - - it('uses correct parameters for sendToGroup with ClosedGroupMessage', async () => { - const message = TestUtils.generateClosedGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); - - expect(success).to.equal(true, 'sending to group failed'); + describe('sendSyncMessage', () => { + it('should send a message to all our devices', async () => { + hasSessionStub.resolves(false); - // Check parameters - const previousArgs = sendMessageToDevicesSpy.lastCall.args as [ - Array<PubKey>, - ClosedGroupMessage - ]; - expect(previousArgs).to.have.length(2); + const ourOtherDevices = TestUtils.generateFakePubKeys(2); + const ourDevices = [ourDevice, ...ourOtherDevices]; + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(ourDevices); - const argsClosedGroupMessage = previousArgs[1]; - expect(argsClosedGroupMessage instanceof ClosedGroupMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a ClosedGroupMessage' + await messageQueueStub.sendSyncMessage( + new TestSyncMessage({ timestamp: Date.now() }) ); - }); - it("won't send to invalid groupId", async () => { - const message = TestUtils.generateClosedGroupMessage('invalid-group-id'); - const success = await messageQueueStub.sendToGroup(message); + expect(pendingMessageCache.getCache()).to.have.length(ourOtherDevices.length); + expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(ourOtherDevices.map(d => d.key)); + }); + }); - // Ensure message parameter passed into sendToGroup is as expected - expect(success).to.equal( - false, - 'an invalid groupId was treated as valid' - ); - expect(sendToGroupSpy.callCount).to.equal(1); + describe('sendToGroup', () => { + describe('closed groups', async () => { + it('can send to closed group', async () => { + const members = TestUtils.generateFakePubKeys(4).map( + p => new PrimaryPubKey(p.key) + ); + sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members); - const argsMessage = sendToGroupSpy.lastCall.args[0]; - expect(argsMessage instanceof ClosedGroupMessage).to.equal( - true, - 'message passed into sendToGroup was not a ClosedGroupMessage' - ); - expect(success).to.equal( - false, - 'invalid ClosedGroupMessage was propogated through sendToGroup' - ); - }); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); - it('wont send message to empty closed group', async () => { - groupMembersStub.resolves(TestUtils.generateFakePubKeys(0)); + const message = TestUtils.generateClosedGroupMessage(); + const success = await messageQueueStub.sendToGroup(message); + expect(success).to.equal(true, 'sending to group failed'); + expect(sendUsingMultiDeviceStub.callCount).to.equal(members.length); - const message = TestUtils.generateClosedGroupMessage(); - const response = await messageQueueStub.sendToGroup(message); + const arg = sendUsingMultiDeviceStub.getCall(0).args; + expect(arg[1] instanceof ClosedGroupMessage).to.equal( + true, + 'message sent to group member was not a ClosedGroupMessage' + ); + }); - expect(response).to.equal( - false, - 'sendToGroup send a message to an empty group' - ); - }); + it('wont send message to empty closed group', async () => { + sandbox.stub(GroupUtils, 'getGroupMembers').resolves([]); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); - it('can send to open group', async () => { - const message = TestUtils.generateOpenGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); + const message = TestUtils.generateClosedGroupMessage(); + const response = await messageQueueStub.sendToGroup(message); - expect(success).to.equal(true, 'sending to group failed'); + expect(response).to.equal( + false, + 'sendToGroup sent a message to an empty group' + ); + expect(sendUsingMultiDeviceStub.callCount).to.equal(0); + }); }); - }); - describe('events', () => { - it('can send events on message sending success', async () => { - const successSpy = sandbox.spy(); - messageQueueStub.events.on('success', successSpy); - - const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + describe('open groups', async () => { + let sendToOpenGroupStub: sinon.SinonStub< + [OpenGroupMessage], + Promise<boolean> + >; + beforeEach(() => { + sendToOpenGroupStub = sandbox + .stub(MessageSender, 'sendToOpenGroup') + .resolves(true); + }); - await tick(); - expect(successSpy.callCount).to.equal(1); - }); + it('can send to open group', async () => { + const message = TestUtils.generateOpenGroupMessage(); + const success = await messageQueueStub.sendToGroup(message); + expect(sendToOpenGroupStub.callCount).to.equal(1); + expect(success).to.equal(true, 'Sending to open group failed'); + }); - it('can send events on message sending failure', async () => { - sendStub.throws(new Error('Failed to send message.')); + it('should emit a success event when send was successful', async () => { + const message = TestUtils.generateOpenGroupMessage(); + const eventPromise = TestUtils.waitForTask(complete => { + messageQueueStub.events.once('success', complete); + }, 2000); - const failureSpy = sandbox.spy(); - messageQueueStub.events.on('fail', failureSpy); + await messageQueueStub.sendToGroup(message); + await expect(eventPromise).to.be.fulfilled; + }); - const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + it('should emit a fail event if something went wrong', async () => { + sendToOpenGroupStub.resolves(false); + const message = TestUtils.generateOpenGroupMessage(); + const eventPromise = TestUtils.waitForTask(complete => { + messageQueueStub.events.once('fail', complete); + }, 2000); - await tick(); - expect(failureSpy.callCount).to.equal(1); + await messageQueueStub.sendToGroup(message); + await expect(eventPromise).to.be.fulfilled; + }); }); }); }); diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index 5ed1e211e..2dacef6f7 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -1,8 +1,9 @@ import { expect } from 'chai'; import * as _ from 'lodash'; import { MessageUtils } from '../../../session/utils'; -import { TestUtils } from '../../../test/test-utils'; +import { TestUtils, timeout } from '../../../test/test-utils'; import { PendingMessageCache } from '../../../session/sending/PendingMessageCache'; +import { initial } from 'lodash'; // Equivalent to Data.StorageItem interface StorageItem { @@ -36,7 +37,6 @@ describe('PendingMessageCache', () => { }); pendingMessageCacheStub = new PendingMessageCache(); - await pendingMessageCacheStub.isReady; }); afterEach(() => { @@ -44,7 +44,7 @@ describe('PendingMessageCache', () => { }); it('can initialize cache', async () => { - const cache = pendingMessageCacheStub.getAllPending(); + const cache = await pendingMessageCacheStub.getAllPending(); // We expect the cache to initialise as an empty array expect(cache).to.be.instanceOf(Array); @@ -59,7 +59,7 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); // Verify that the message is in the cache - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); @@ -68,6 +68,22 @@ describe('PendingMessageCache', () => { expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp); }); + it('can add multiple messages belonging to the same user', async () => { + const device = TestUtils.generateFakePubKey(); + + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + // We have to timeout here otherwise it's processed too fast and messages start having the same timestamp + await timeout(5); + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + await timeout(5); + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + + // Verify that the message is in the cache + const finalCache = await pendingMessageCacheStub.getAllPending(); + + expect(finalCache).to.have.length(3); + }); + it('can remove from cache', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); @@ -75,18 +91,43 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(1); // Remove the message await pendingMessageCacheStub.remove(rawMessage); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); // Verify that the message was removed expect(finalCache).to.have.length(0); }); + it('should only remove messages with different timestamp and device', async () => { + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + await pendingMessageCacheStub.add(device, message); + await timeout(5); + await pendingMessageCacheStub.add( + device, + TestUtils.generateChatMessage(message.identifier) + ); + await pendingMessageCacheStub.add(TestUtils.generateFakePubKey(), message); + + const initialCache = await pendingMessageCacheStub.getAllPending(); + expect(initialCache).to.have.length(3); + + // Remove the message + await pendingMessageCacheStub.remove(rawMessage); + + const finalCache = await pendingMessageCacheStub.getAllPending(); + + // Verify that the message was removed + expect(finalCache).to.have.length(2); + }); + it('can get devices', async () => { const cacheItems = [ { @@ -103,16 +144,16 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const cache = pendingMessageCacheStub.getAllPending(); + const cache = await pendingMessageCacheStub.getAllPending(); expect(cache).to.have.length(cacheItems.length); // Get list of devices const devicesKeys = cacheItems.map(item => item.device.key); - const pulledDevices = pendingMessageCacheStub.getDevices(); + const pulledDevices = await pendingMessageCacheStub.getDevices(); const pulledDevicesKeys = pulledDevices.map(d => d.key); // Verify that device list from cache is equivalent to devices added @@ -131,21 +172,21 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(cacheItems.length); // Get pending for each specific device - cacheItems.forEach(item => { - const pendingForDevice = pendingMessageCacheStub.getForDevice( + for (const item of cacheItems) { + const pendingForDevice = await pendingMessageCacheStub.getForDevice( item.device ); expect(pendingForDevice).to.have.length(1); expect(pendingForDevice[0].device).to.equal(item.device.key); - }); + } }); it('can find nothing when empty', async () => { @@ -164,7 +205,7 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); const foundMessage = pendingMessageCacheStub.find(rawMessage); @@ -188,17 +229,17 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(cacheItems.length); // Clear cache await pendingMessageCacheStub.clear(); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(0); }); @@ -218,21 +259,20 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const addedMessages = pendingMessageCacheStub.getAllPending(); + const addedMessages = await pendingMessageCacheStub.getAllPending(); expect(addedMessages).to.have.length(cacheItems.length); // Rebuild from DB const freshCache = new PendingMessageCache(); - await freshCache.isReady; // Verify messages - const rebuiltMessages = freshCache.getAllPending(); - - rebuiltMessages.forEach((message, index) => { + const rebuiltMessages = await freshCache.getAllPending(); + // tslint:disable-next-line: no-for-in no-for-in-array + for (const [index, message] of rebuiltMessages.entries()) { const addedMessage = addedMessages[index]; // Pull out plainTextBuffer for a separate check @@ -254,6 +294,6 @@ describe('PendingMessageCache', () => { true, 'cached messages were not rebuilt properly' ); - }); + } }); }); diff --git a/ts/test/test-utils/stubs/index.ts b/ts/test/test-utils/stubs/index.ts index 10ad19f0e..d287adc26 100644 --- a/ts/test/test-utils/stubs/index.ts +++ b/ts/test/test-utils/stubs/index.ts @@ -1 +1,2 @@ export * from './ciphers'; +export * from './sending'; diff --git a/ts/test/test-utils/stubs/messages/TestSyncMessage.ts b/ts/test/test-utils/stubs/messages/TestSyncMessage.ts new file mode 100644 index 000000000..c82eecfa4 --- /dev/null +++ b/ts/test/test-utils/stubs/messages/TestSyncMessage.ts @@ -0,0 +1,7 @@ +import { SyncMessage } from '../../../../session/messages/outgoing'; +import { SignalService } from '../../../../protobuf'; +export class TestSyncMessage extends SyncMessage { + protected syncProto(): SignalService.SyncMessage { + return SignalService.SyncMessage.create({}); + } +} diff --git a/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts b/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts new file mode 100644 index 000000000..fa1fc5129 --- /dev/null +++ b/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts @@ -0,0 +1,22 @@ +import { PendingMessageCache } from '../../../../session/sending'; +import { RawMessage } from '../../../../session/types'; + +export class PendingMessageCacheStub extends PendingMessageCache { + public dbData: Array<RawMessage>; + constructor(dbData: Array<RawMessage> = []) { + super(); + this.dbData = dbData; + } + + public getCache(): Readonly<Array<RawMessage>> { + return this.cache; + } + + protected async getFromStorage() { + return this.dbData; + } + + protected async saveToDB() { + return; + } +} diff --git a/ts/test/test-utils/stubs/sending/index.ts b/ts/test/test-utils/stubs/sending/index.ts new file mode 100644 index 000000000..e9def4705 --- /dev/null +++ b/ts/test/test-utils/stubs/sending/index.ts @@ -0,0 +1 @@ +export * from './PendingMessageCacheStub'; diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts index ef2efc0fb..079148170 100644 --- a/ts/test/test-utils/testUtils.ts +++ b/ts/test/test-utils/testUtils.ts @@ -85,10 +85,10 @@ export function generateFakePubKeys(amount: number): Array<PubKey> { return new Array(numPubKeys).fill(0).map(() => generateFakePubKey()); } -export function generateChatMessage(): ChatMessage { +export function generateChatMessage(identifier?: string): ChatMessage { return new ChatMessage({ body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', - identifier: uuid(), + identifier: identifier ?? uuid(), timestamp: Date.now(), attachments: undefined, quote: undefined, @@ -124,3 +124,83 @@ export function generateClosedGroupMessage( chatMessage: generateChatMessage(), }); } + +type ArgFunction<T> = (arg: T) => void; +type MaybePromise<T> = Promise<T> | T; + +/** + * Create a promise which waits until `done` is called or until timeout period is reached. + * @param task The task to wait for. + * @param timeout The timeout period. + */ +// tslint:disable-next-line: no-shadowed-variable +export async function waitForTask<T>(task: (done: ArgFunction<T>) => MaybePromise<void>, timeout: number = 2000): Promise<T> { + const timeoutPromise = new Promise<T>((_, rej) => { + const wait = setTimeout(() => { + clearTimeout(wait); + rej(new Error('Task timed out.')); + }, timeout); + }); + + // tslint:disable-next-line: no-shadowed-variable + const taskPromise = new Promise(async (res, rej) => { + try { + const taskReturn = task(res); + return taskReturn instanceof Promise ? taskReturn : Promise.resolve(taskReturn); + } catch (e) { + rej(e); + } + }); + + return Promise.race([timeoutPromise, taskPromise]) as Promise<T>; +} + +/** + * Creates a promise which periodically calls the `check` until `done` is called or until timeout period is reached. + * @param check The check which runs every 100ms. + * @param timeout The time before an error is thrown. + */ +// tslint:disable-next-line: no-shadowed-variable +export async function periodicallyCheck(check: (done: ArgFunction<void>) => MaybePromise<void>, timeout: number = 1000): Promise<void> { + return waitForTask(complete => { + let interval: NodeJS.Timeout | undefined; + const cleanup = () => { + if (interval) { + clearInterval(interval); + interval = undefined; + } + }; + setTimeout(cleanup, timeout); + + const onDone = () => { + complete(); + cleanup(); + }; + interval = setInterval(async () => { + try { + await toPromise(check(onDone)); + } catch (e) { + cleanup(); + throw e; + } + }, 100); + }, timeout); +} + +/** + * Creates a promise which waits until `check` returns `true` or rejects if timeout preiod is reached. + * @param check The boolean check. + * @param timeout The time before an error is thrown. + */ +export async function waitUntil(check: () => MaybePromise<boolean>, timeout: number = 2000) { + return periodicallyCheck(async done => { + const result = await toPromise(check()); + if (result) { + done(); + } + }, timeout); +} + +async function toPromise<T>(maybePromise: MaybePromise<T>): Promise<T> { + return maybePromise instanceof Promise ? maybePromise : Promise.resolve(maybePromise); +} From 7fe6583608e539c471f7100e0eb6411dbe9ca3ab Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev <msgmaxim@gmail.com> Date: Fri, 19 Jun 2020 17:03:33 +1000 Subject: [PATCH 5/7] Fix incorrect conversation id for incoming open group messages --- ts/receiver/receiver.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index 48bbd6908..08ccdb3b5 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -650,11 +650,9 @@ export async function handleMessageEvent(event: any): Promise<void> { confirm(); return; } - } - - if (source !== ourNumber) { - // Ignore auth from our devices - conversationId = primarySource.key; + } else if (source !== ourNumber) { + // Ignore auth from our devices + conversationId = primarySource.key; } // the conversation with the primary device of that source (can be the same as conversationOrigin) From 3bf5796cd511aa6a3a8541f0618c45abb6b3028a Mon Sep 17 00:00:00 2001 From: Mikunj <mikunj@live.com.au> Date: Fri, 19 Jun 2020 16:46:42 +1000 Subject: [PATCH 6/7] Modify periodic check implementation --- ts/session/sending/MessageQueue.ts | 8 +- ts/session/sending/PendingMessageCache.ts | 6 +- ts/session/utils/Promise.ts | 118 ++++++++++++++++++ ts/session/utils/index.ts | 9 +- ts/test/session/sending/MessageQueue_test.ts | 66 ++++++---- .../sending/PendingMessageCache_test.ts | 3 +- ts/test/test-utils/testUtils.ts | 80 ------------ 7 files changed, 180 insertions(+), 110 deletions(-) create mode 100644 ts/session/utils/Promise.ts diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index d5e62286b..f1c3a5525 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -63,7 +63,9 @@ export class MessageQueue implements MessageQueueInterface { const ourDevices = await MultiDeviceProtocol.getOurDevices(); // Remove our devices from currentDevices - currentDevices = currentDevices.filter(device => !ourDevices.some(d => device.isEqual(d))); + currentDevices = currentDevices.filter( + device => !ourDevices.some(d => device.isEqual(d)) + ); } const promises = currentDevices.map(async device => { @@ -129,7 +131,9 @@ export class MessageQueue implements MessageQueueInterface { } const ourDevices = await MultiDeviceProtocol.getOurDevices(); - const promises = ourDevices.map(async device => this.process(device, message)); + const promises = ourDevices.map(async device => + this.process(device, message) + ); return Promise.all(promises); } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index fcc571cc6..a26cea8c6 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -67,7 +67,11 @@ export class PendingMessageCache { // Remove item from cache and sync with database const updatedCache = this.cache.filter( - cached => !(cached.device === message.device && cached.timestamp === message.timestamp) + cached => + !( + cached.device === message.device && + cached.timestamp === message.timestamp + ) ); this.cache = updatedCache; await this.saveToDB(); diff --git a/ts/session/utils/Promise.ts b/ts/session/utils/Promise.ts new file mode 100644 index 000000000..b106f6032 --- /dev/null +++ b/ts/session/utils/Promise.ts @@ -0,0 +1,118 @@ +type SimpleFunction<T> = (arg: T) => void; +type Return<T> = Promise<T> | T; + +async function toPromise<T>(value: Return<T>): Promise<T> { + return value instanceof Promise ? value : Promise.resolve(value); +} + +/** + * Create a promise which waits until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param task The task to wait for. + * @param timeout The timeout period. + */ +export async function waitForTask<T>( + task: (done: SimpleFunction<T>) => Return<void>, + timeout: number = 2000 +): Promise<T> { + const timeoutPromise = new Promise<T>((_, rej) => { + const wait = setTimeout(() => { + clearTimeout(wait); + rej(new Error('Task timed out.')); + }, timeout); + }); + + const taskPromise = new Promise(async (res, rej) => { + try { + await toPromise(task(res)); + } catch (e) { + rej(e); + } + }); + + return Promise.race([timeoutPromise, taskPromise]) as Promise<T>; +} + +interface PollOptions { + timeout: number; + interval: number; +} + +/** + * Creates a promise which calls the `task` every `interval` until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The check which runs every `interval` ms. + * @param options The polling options. + */ +export async function poll( + task: (done: SimpleFunction<void>) => Return<void>, + options: Partial<PollOptions> = {} +): Promise<void> { + const defaults: PollOptions = { + timeout: 2000, + interval: 1000, + }; + + const { timeout, interval } = { + ...defaults, + ...options, + }; + + const endTime = Date.now() + timeout; + let stop = false; + const finish = () => { + stop = true; + }; + + const _poll = async (resolve: any, reject: any) => { + if (stop) { + resolve(); + } else if (Date.now() >= endTime) { + finish(); + reject(new Error('Periodic check timeout')); + } else { + try { + await toPromise(task(finish)); + } catch (e) { + finish(); + reject(e); + return; + } + + setTimeout(() => { + void _poll(resolve, reject); + }, interval); + } + }; + + return new Promise((resolve, reject) => { + void _poll(resolve, reject); + }); +} + +/** + * Creates a promise which waits until `check` returns `true` or rejects if `timeout` preiod is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The boolean check. + * @param timeout The time before an error is thrown. + */ +export async function waitUntil( + check: () => Return<boolean>, + timeout: number = 2000 +) { + // This is causing unhandled promise rejection somewhere in MessageQueue tests + return poll( + async done => { + const result = await toPromise(check()); + if (result) { + done(); + } + }, + { + timeout, + } + ); +} diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index c619b8d2f..dde73e928 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -2,8 +2,15 @@ import * as MessageUtils from './Messages'; import * as GroupUtils from './Groups'; import * as SyncMessageUtils from './SyncMessageUtils'; import * as StringUtils from './String'; +import * as PromiseUtils from './Promise'; export * from './TypedEmitter'; export * from './JobQueue'; -export { MessageUtils, SyncMessageUtils, GroupUtils, StringUtils }; +export { + MessageUtils, + SyncMessageUtils, + GroupUtils, + StringUtils, + PromiseUtils, +}; diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 739bae2cf..813ae20c6 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -1,7 +1,11 @@ import chai from 'chai'; import * as sinon from 'sinon'; -import * as _ from 'lodash'; -import { GroupUtils, SyncMessageUtils } from '../../../session/utils'; +import _ from 'lodash'; +import { + GroupUtils, + PromiseUtils, + SyncMessageUtils, +} from '../../../session/utils'; import { Stubs, TestUtils } from '../../../test/test-utils'; import { MessageQueue } from '../../../session/sending/MessageQueue'; import { @@ -81,25 +85,27 @@ describe('MessageQueue', () => { describe('processPending', () => { it('will send session request message if no session', async () => { hasSessionStub.resolves(false); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); const device = TestUtils.generateFakePubKey(); - const stubCallPromise = TestUtils.waitUntil(() => sendSessionRequestIfNeededStub.callCount === 1); - await messageQueueStub.processPending(device); - expect(stubCallPromise).to.be.fulfilled; + + const stubCallPromise = PromiseUtils.waitUntil( + () => sendSessionRequestIfNeededStub.callCount === 1 + ); + await expect(stubCallPromise).to.be.fulfilled; }); it('will send message if session exists', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); sendStub.resolves(); const device = TestUtils.generateFakePubKey(); await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const successPromise = TestUtils.waitForTask(done => { + const successPromise = PromiseUtils.waitForTask(done => { messageQueueStub.events.once('success', done); }); @@ -112,13 +118,13 @@ describe('MessageQueue', () => { }); it('will send message if sending to medium group', async () => { - isMediumGroupStub.resolves(true); + isMediumGroupStub.returns(true); sendStub.resolves(); const device = TestUtils.generateFakePubKey(); await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const successPromise = TestUtils.waitForTask(done => { + const successPromise = PromiseUtils.waitForTask(done => { messageQueueStub.events.once('success', done); }); @@ -132,7 +138,7 @@ describe('MessageQueue', () => { it('should remove message from cache', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); const events = ['success', 'fail']; for (const event of events) { @@ -149,25 +155,27 @@ describe('MessageQueue', () => { expect(initialMessages).to.have.length(1); await messageQueueStub.processPending(device); - const promise = TestUtils.waitUntil(async () => { + const promise = PromiseUtils.waitUntil(async () => { const messages = await pendingMessageCache.getForDevice(device); return messages.length === 0; }); - expect(promise).to.be.fulfilled; + await expect(promise).to.be.fulfilled; } }); describe('events', () => { it('should send a success event if message was sent', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); sendStub.resolves(); const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); await pendingMessageCache.add(device, message); - const eventPromise = TestUtils.waitForTask<RawMessage | OpenGroupMessage>(complete => { + const eventPromise = PromiseUtils.waitForTask< + RawMessage | OpenGroupMessage + >(complete => { messageQueueStub.events.once('success', complete); }); @@ -180,7 +188,7 @@ describe('MessageQueue', () => { it('should send a fail event if something went wrong while sending', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); sendStub.throws(new Error('failure')); const spy = sandbox.spy(); @@ -190,7 +198,9 @@ describe('MessageQueue', () => { const message = TestUtils.generateChatMessage(); await pendingMessageCache.add(device, message); - const eventPromise = TestUtils.waitForTask<[RawMessage | OpenGroupMessage, Error]>(complete => { + const eventPromise = PromiseUtils.waitForTask< + [RawMessage | OpenGroupMessage, Error] + >(complete => { messageQueueStub.events.once('fail', (...args) => { complete(args); }); @@ -231,8 +241,7 @@ describe('MessageQueue', () => { const message = TestUtils.generateChatMessage(); await messageQueueStub.sendMessageToDevices(devices, message); - const promise = TestUtils.waitUntil(() => pendingMessageCache.getCache().length === devices.length); - await expect(promise).to.be.fulfilled; + expect(pendingMessageCache.getCache()).to.have.length(devices.length); }); it('should send sync message if possible', async () => { @@ -268,7 +277,12 @@ describe('MessageQueue', () => { true, 'sendSyncMessage was not called.' ); - expect(pendingMessageCache.getCache().map(c => c.device)).to.not.have.members(ourDevices.map(d => d.key), 'Sending regular messages to our own device is not allowed.'); + expect( + pendingMessageCache.getCache().map(c => c.device) + ).to.not.have.members( + ourDevices.map(d => d.key), + 'Sending regular messages to our own device is not allowed.' + ); expect(pendingMessageCache.getCache()).to.have.length( devices.length - ourDevices.length, 'Messages should not be sent to our devices.' @@ -288,8 +302,12 @@ describe('MessageQueue', () => { new TestSyncMessage({ timestamp: Date.now() }) ); - expect(pendingMessageCache.getCache()).to.have.length(ourOtherDevices.length); - expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(ourOtherDevices.map(d => d.key)); + expect(pendingMessageCache.getCache()).to.have.length( + ourOtherDevices.length + ); + expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members( + ourOtherDevices.map(d => d.key) + ); }); }); @@ -354,7 +372,7 @@ describe('MessageQueue', () => { it('should emit a success event when send was successful', async () => { const message = TestUtils.generateOpenGroupMessage(); - const eventPromise = TestUtils.waitForTask(complete => { + const eventPromise = PromiseUtils.waitForTask(complete => { messageQueueStub.events.once('success', complete); }, 2000); @@ -365,7 +383,7 @@ describe('MessageQueue', () => { it('should emit a fail event if something went wrong', async () => { sendToOpenGroupStub.resolves(false); const message = TestUtils.generateOpenGroupMessage(); - const eventPromise = TestUtils.waitForTask(complete => { + const eventPromise = PromiseUtils.waitForTask(complete => { messageQueueStub.events.once('fail', complete); }, 2000); diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index 2dacef6f7..dd6544364 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -3,7 +3,6 @@ import * as _ from 'lodash'; import { MessageUtils } from '../../../session/utils'; import { TestUtils, timeout } from '../../../test/test-utils'; import { PendingMessageCache } from '../../../session/sending/PendingMessageCache'; -import { initial } from 'lodash'; // Equivalent to Data.StorageItem interface StorageItem { @@ -271,7 +270,7 @@ describe('PendingMessageCache', () => { // Verify messages const rebuiltMessages = await freshCache.getAllPending(); - // tslint:disable-next-line: no-for-in no-for-in-array + for (const [index, message] of rebuiltMessages.entries()) { const addedMessage = addedMessages[index]; diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts index 079148170..eb9644838 100644 --- a/ts/test/test-utils/testUtils.ts +++ b/ts/test/test-utils/testUtils.ts @@ -124,83 +124,3 @@ export function generateClosedGroupMessage( chatMessage: generateChatMessage(), }); } - -type ArgFunction<T> = (arg: T) => void; -type MaybePromise<T> = Promise<T> | T; - -/** - * Create a promise which waits until `done` is called or until timeout period is reached. - * @param task The task to wait for. - * @param timeout The timeout period. - */ -// tslint:disable-next-line: no-shadowed-variable -export async function waitForTask<T>(task: (done: ArgFunction<T>) => MaybePromise<void>, timeout: number = 2000): Promise<T> { - const timeoutPromise = new Promise<T>((_, rej) => { - const wait = setTimeout(() => { - clearTimeout(wait); - rej(new Error('Task timed out.')); - }, timeout); - }); - - // tslint:disable-next-line: no-shadowed-variable - const taskPromise = new Promise(async (res, rej) => { - try { - const taskReturn = task(res); - return taskReturn instanceof Promise ? taskReturn : Promise.resolve(taskReturn); - } catch (e) { - rej(e); - } - }); - - return Promise.race([timeoutPromise, taskPromise]) as Promise<T>; -} - -/** - * Creates a promise which periodically calls the `check` until `done` is called or until timeout period is reached. - * @param check The check which runs every 100ms. - * @param timeout The time before an error is thrown. - */ -// tslint:disable-next-line: no-shadowed-variable -export async function periodicallyCheck(check: (done: ArgFunction<void>) => MaybePromise<void>, timeout: number = 1000): Promise<void> { - return waitForTask(complete => { - let interval: NodeJS.Timeout | undefined; - const cleanup = () => { - if (interval) { - clearInterval(interval); - interval = undefined; - } - }; - setTimeout(cleanup, timeout); - - const onDone = () => { - complete(); - cleanup(); - }; - interval = setInterval(async () => { - try { - await toPromise(check(onDone)); - } catch (e) { - cleanup(); - throw e; - } - }, 100); - }, timeout); -} - -/** - * Creates a promise which waits until `check` returns `true` or rejects if timeout preiod is reached. - * @param check The boolean check. - * @param timeout The time before an error is thrown. - */ -export async function waitUntil(check: () => MaybePromise<boolean>, timeout: number = 2000) { - return periodicallyCheck(async done => { - const result = await toPromise(check()); - if (result) { - done(); - } - }, timeout); -} - -async function toPromise<T>(maybePromise: MaybePromise<T>): Promise<T> { - return maybePromise instanceof Promise ? maybePromise : Promise.resolve(maybePromise); -} From 8f492f8e745b313703be172248825dd8b548e8c1 Mon Sep 17 00:00:00 2001 From: Mikunj <mikunj@live.com.au> Date: Mon, 22 Jun 2020 09:10:57 +1000 Subject: [PATCH 7/7] Review changes --- ts/session/sending/MessageQueue.ts | 1 - ts/test/session/sending/MessageQueue_test.ts | 2 +- ts/test/session/sending/PendingMessageCache_test.ts | 8 ++++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index f1c3a5525..465dfb91a 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -21,7 +21,6 @@ import { PubKey } from '../types'; import { MessageSender } from '.'; import { MultiDeviceProtocol, SessionProtocol } from '../protocols'; import { UserUtil } from '../../util'; -import promise from 'redux-promise-middleware'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>; diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 813ae20c6..c0d223ffc 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -161,7 +161,7 @@ describe('MessageQueue', () => { }); await expect(promise).to.be.fulfilled; } - }); + }).timeout(15000); describe('events', () => { it('should send a success event if message was sent', async () => { diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index dd6544364..128947bdb 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -109,11 +109,14 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); await timeout(5); - await pendingMessageCacheStub.add( + const one = await pendingMessageCacheStub.add( device, TestUtils.generateChatMessage(message.identifier) ); - await pendingMessageCacheStub.add(TestUtils.generateFakePubKey(), message); + const two = await pendingMessageCacheStub.add( + TestUtils.generateFakePubKey(), + message + ); const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(3); @@ -125,6 +128,7 @@ describe('PendingMessageCache', () => { // Verify that the message was removed expect(finalCache).to.have.length(2); + expect(finalCache).to.have.deep.members([one, two]); }); it('can get devices', async () => {