Merge branch 'clearnet' into folder-restructure

pull/1184/head
Mikunj 5 years ago
commit b85348569b

@ -408,11 +408,7 @@ class LokiSnodeAPI {
});
}
// FIXME: need a lock because it is being called multiple times in parallel
async buildNewOnionPaths() {
// Note: this function may be called concurrently, so
// might consider blocking the other calls
async buildNewOnionPathsWorker() {
const _ = window.Lodash;
log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths');
@ -490,6 +486,14 @@ class LokiSnodeAPI {
log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths);
}
async buildNewOnionPaths() {
// this function may be called concurrently make sure we only have one inflight
return primitives.allowOnlyOneAtATime(
'buildNewOnionPaths',
this.buildNewOnionPathsWorker
);
}
async getRandomSnodeAddress() {
// resolve random snode
if (this.randomSnodePool.length === 0) {

@ -650,9 +650,7 @@ export async function handleMessageEvent(event: any): Promise<void> {
confirm();
return;
}
}
if (source !== ourNumber) {
} else if (source !== ourNumber) {
// Ignore auth from our devices
conversationId = primarySource.key;
}

@ -8,6 +8,7 @@ import {
ContentMessage,
OpenGroupMessage,
SessionRequestMessage,
SyncMessage,
} from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
import {
@ -26,9 +27,9 @@ export class MessageQueue implements MessageQueueInterface {
private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly pendingMessageCache: PendingMessageCache;
constructor() {
constructor(cache?: PendingMessageCache) {
this.events = new EventEmitter();
this.pendingMessageCache = new PendingMessageCache();
this.pendingMessageCache = cache ?? new PendingMessageCache();
void this.processAllPending();
}
@ -50,20 +51,20 @@ export class MessageQueue implements MessageQueueInterface {
// Sync to our devices if syncable
if (SyncMessageUtils.canSync(message)) {
const currentDevice = await UserUtil.getCurrentDevicePubKey();
if (currentDevice) {
const ourDevices = await MultiDeviceProtocol.getAllDevices(
currentDevice
const syncMessage = SyncMessageUtils.from(message);
if (!syncMessage) {
throw new Error(
'MessageQueue internal error occured: failed to make sync message'
);
}
await this.sendSyncMessage(message, ourDevices);
await this.sendSyncMessage(syncMessage);
// Remove our devices from currentDevices
currentDevices = currentDevices.filter(device =>
ourDevices.some(d => device.isEqual(d))
);
}
const ourDevices = await MultiDeviceProtocol.getOurDevices();
// Remove our devices from currentDevices
currentDevices = currentDevices.filter(
device => !ourDevices.some(d => device.isEqual(d))
);
}
const promises = currentDevices.map(async device => {
@ -79,30 +80,42 @@ export class MessageQueue implements MessageQueueInterface {
// Closed groups
if (message instanceof ClosedGroupMessage) {
// Get devices in closed group
const groupPubKey = PubKey.from(message.groupId);
if (!groupPubKey) {
const recipients = await GroupUtils.getGroupMembers(message.groupId);
if (recipients.length === 0) {
return false;
}
const recipients = await GroupUtils.getGroupMembers(groupPubKey);
// Send to all devices of members
await Promise.all(
recipients.map(async recipient =>
this.sendUsingMultiDevice(recipient, message)
)
);
if (recipients.length) {
await this.sendMessageToDevices(recipients, message);
return true;
}
return true;
}
// Open groups
if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly
const error = new Error('Failed to send message to open group.');
// This is absolutely yucky ... we need to make it not use Promise<boolean>
try {
await MessageSender.sendToOpenGroup(message);
this.events.emit('success', message);
const result = await MessageSender.sendToOpenGroup(message);
if (result) {
this.events.emit('success', message);
} else {
this.events.emit('fail', message, error);
}
return true;
return result;
} catch (e) {
this.events.emit('fail', message, e);
console.warn(
`Failed to send message to open group: ${message.group.server}`,
e
);
this.events.emit('fail', message, error);
return false;
}
@ -111,21 +124,22 @@ export class MessageQueue implements MessageQueueInterface {
return false;
}
public async sendSyncMessage(message: ContentMessage, sendTo: Array<PubKey>) {
// Sync with our devices
const promises = sendTo.map(async device => {
const syncMessage = SyncMessageUtils.from(message);
return this.process(device, syncMessage);
});
public async sendSyncMessage(message: SyncMessage | undefined): Promise<any> {
if (!message) {
return;
}
const ourDevices = await MultiDeviceProtocol.getOurDevices();
const promises = ourDevices.map(async device =>
this.process(device, message)
);
return Promise.all(promises);
}
public async processPending(device: PubKey) {
const messages = this.pendingMessageCache.getForDevice(device);
const messages = await this.pendingMessageCache.getForDevice(device);
const isMediumGroup = GroupUtils.isMediumGroup(device);
const isMediumGroup = GroupUtils.isMediumGroup(device.key);
const hasSession = await SessionProtocol.hasSession(device);
if (!isMediumGroup && !hasSession) {
@ -143,23 +157,28 @@ export class MessageQueue implements MessageQueueInterface {
await jobQueue.addWithId(messageId, async () =>
MessageSender.send(message)
);
void this.pendingMessageCache.remove(message);
this.events.emit('success', message);
} catch (e) {
this.events.emit('fail', message, e);
} finally {
// Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(message);
}
}
});
}
private async processAllPending() {
const devices = this.pendingMessageCache.getDevices();
const devices = await this.pendingMessageCache.getDevices();
const promises = devices.map(async device => this.processPending(device));
return Promise.all(promises);
}
private async process(device: PubKey, message?: ContentMessage) {
private async process(
device: PubKey,
message?: ContentMessage
): Promise<void> {
// Don't send to ourselves
const currentDevice = await UserUtil.getCurrentDevicePubKey();
if (!message || (currentDevice && device.isEqual(currentDevice))) {
@ -173,7 +192,7 @@ export class MessageQueue implements MessageQueueInterface {
}
await this.pendingMessageCache.add(device, message);
await this.processPending(device);
void this.processPending(device);
}
private getJobQueue(device: PubKey): JobQueue {

@ -2,6 +2,7 @@ import {
ClosedGroupMessage,
ContentMessage,
OpenGroupMessage,
SyncMessage,
} from '../messages/outgoing';
import { RawMessage } from '../types/RawMessage';
import { TypedEventEmitter } from '../utils';
@ -19,8 +20,5 @@ export interface MessageQueueInterface {
sendUsingMultiDevice(user: PubKey, message: ContentMessage): void;
send(device: PubKey, message: ContentMessage): void;
sendToGroup(message: GroupMessageType): void;
sendSyncMessage(
message: ContentMessage,
sendTo: Array<PubKey>
): Promise<Array<void>>;
sendSyncMessage(message: SyncMessage | undefined): Promise<any>;
}

@ -1,3 +1,4 @@
import _ from 'lodash';
import { createOrUpdateItem, getItemById } from '../../../js/modules/data';
import { PartialRawMessage, RawMessage } from '../types/RawMessage';
import { ContentMessage } from '../messages/outgoing';
@ -12,33 +13,25 @@ import { MessageUtils } from '../utils';
// memory and sync its state with the database on modification (add or remove).
export class PendingMessageCache {
public readonly isReady: Promise<boolean>;
private cache: Array<RawMessage>;
protected loadPromise: Promise<void> | undefined;
protected cache: Array<RawMessage> = [];
constructor() {
// Load pending messages from the database
// You should await isReady on making a new PendingMessageCache
// if you'd like to have instant access to the cache
this.cache = [];
this.isReady = new Promise(async resolve => {
await this.loadFromDB();
resolve(true);
});
}
public getAllPending(): Array<RawMessage> {
public async getAllPending(): Promise<Array<RawMessage>> {
await this.loadFromDBIfNeeded();
// Get all pending from cache, sorted with oldest first
return [...this.cache].sort((a, b) => a.timestamp - b.timestamp);
}
public getForDevice(device: PubKey): Array<RawMessage> {
return this.getAllPending().filter(m => m.device === device.key);
public async getForDevice(device: PubKey): Promise<Array<RawMessage>> {
const pending = await this.getAllPending();
return pending.filter(m => m.device === device.key);
}
public getDevices(): Array<PubKey> {
public async getDevices(): Promise<Array<PubKey>> {
await this.loadFromDBIfNeeded();
// Gets all unique devices with pending messages
const pubkeyStrings = [...new Set(this.cache.map(m => m.device))];
const pubkeyStrings = _.uniq(this.cache.map(m => m.device));
return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k);
}
@ -47,6 +40,7 @@ export class PendingMessageCache {
device: PubKey,
message: ContentMessage
): Promise<RawMessage> {
await this.loadFromDBIfNeeded();
const rawMessage = MessageUtils.toRawMessage(device, message);
// Does it exist in cache already?
@ -63,6 +57,7 @@ export class PendingMessageCache {
public async remove(
message: RawMessage
): Promise<Array<RawMessage> | undefined> {
await this.loadFromDBIfNeeded();
// Should only be called after message is processed
// Return if message doesn't exist in cache
@ -72,7 +67,11 @@ export class PendingMessageCache {
// Remove item from cache and sync with database
const updatedCache = this.cache.filter(
m => m.identifier !== message.identifier
cached =>
!(
cached.device === message.device &&
cached.timestamp === message.timestamp
)
);
this.cache = updatedCache;
await this.saveToDB();
@ -93,12 +92,20 @@ export class PendingMessageCache {
await this.saveToDB();
}
private async loadFromDB() {
protected async loadFromDBIfNeeded() {
if (!this.loadPromise) {
this.loadPromise = this.loadFromDB();
}
await this.loadPromise;
}
protected async loadFromDB() {
const messages = await this.getFromStorage();
this.cache = messages;
}
private async getFromStorage(): Promise<Array<RawMessage>> {
protected async getFromStorage(): Promise<Array<RawMessage>> {
const data = await getItemById('pendingMessages');
if (!data || !data.value) {
return [];
@ -117,7 +124,7 @@ export class PendingMessageCache {
});
}
private async saveToDB() {
protected async saveToDB() {
// For each plainTextBuffer in cache, save in as a simple Array<number> to avoid
// Node issues with JSON stringifying Buffer without strict typing
const encodedCache = [...this.cache].map(item => {

@ -1,7 +1,11 @@
import { PubKey } from '../types';
import _ from 'lodash';
import { PrimaryPubKey } from '../types';
import { MultiDeviceProtocol } from '../protocols';
export async function getGroupMembers(groupId: PubKey): Promise<Array<PubKey>> {
const groupConversation = window.ConversationController.get(groupId.key);
export async function getGroupMembers(
groupId: string
): Promise<Array<PrimaryPubKey>> {
const groupConversation = window.ConversationController.get(groupId);
const groupMembers = groupConversation
? groupConversation.attributes.members
: undefined;
@ -10,11 +14,16 @@ export async function getGroupMembers(groupId: PubKey): Promise<Array<PubKey>> {
return [];
}
return groupMembers.map((member: string) => new PubKey(member));
const promises = (groupMembers as Array<string>).map(async (member: string) =>
MultiDeviceProtocol.getPrimaryDevice(member)
);
const primaryDevices = await Promise.all(promises);
return _.uniqWith(primaryDevices, (a, b) => a.isEqual(b));
}
export function isMediumGroup(groupId: PubKey): boolean {
const conversation = window.ConversationController.get(groupId.key);
export function isMediumGroup(groupId: string): boolean {
const conversation = window.ConversationController.get(groupId);
if (!conversation) {
return false;

@ -0,0 +1,118 @@
type SimpleFunction<T> = (arg: T) => void;
type Return<T> = Promise<T> | T;
async function toPromise<T>(value: Return<T>): Promise<T> {
return value instanceof Promise ? value : Promise.resolve(value);
}
/**
* Create a promise which waits until `done` is called or until `timeout` period is reached.
* If `timeout` is reached then this will throw an Error.
*
* @param task The task to wait for.
* @param timeout The timeout period.
*/
export async function waitForTask<T>(
task: (done: SimpleFunction<T>) => Return<void>,
timeout: number = 2000
): Promise<T> {
const timeoutPromise = new Promise<T>((_, rej) => {
const wait = setTimeout(() => {
clearTimeout(wait);
rej(new Error('Task timed out.'));
}, timeout);
});
const taskPromise = new Promise(async (res, rej) => {
try {
await toPromise(task(res));
} catch (e) {
rej(e);
}
});
return Promise.race([timeoutPromise, taskPromise]) as Promise<T>;
}
interface PollOptions {
timeout: number;
interval: number;
}
/**
* Creates a promise which calls the `task` every `interval` until `done` is called or until `timeout` period is reached.
* If `timeout` is reached then this will throw an Error.
*
* @param check The check which runs every `interval` ms.
* @param options The polling options.
*/
export async function poll(
task: (done: SimpleFunction<void>) => Return<void>,
options: Partial<PollOptions> = {}
): Promise<void> {
const defaults: PollOptions = {
timeout: 2000,
interval: 1000,
};
const { timeout, interval } = {
...defaults,
...options,
};
const endTime = Date.now() + timeout;
let stop = false;
const finish = () => {
stop = true;
};
const _poll = async (resolve: any, reject: any) => {
if (stop) {
resolve();
} else if (Date.now() >= endTime) {
finish();
reject(new Error('Periodic check timeout'));
} else {
try {
await toPromise(task(finish));
} catch (e) {
finish();
reject(e);
return;
}
setTimeout(() => {
void _poll(resolve, reject);
}, interval);
}
};
return new Promise((resolve, reject) => {
void _poll(resolve, reject);
});
}
/**
* Creates a promise which waits until `check` returns `true` or rejects if `timeout` preiod is reached.
* If `timeout` is reached then this will throw an Error.
*
* @param check The boolean check.
* @param timeout The time before an error is thrown.
*/
export async function waitUntil(
check: () => Return<boolean>,
timeout: number = 2000
) {
// This is causing unhandled promise rejection somewhere in MessageQueue tests
return poll(
async done => {
const result = await toPromise(check());
if (result) {
done();
}
},
{
timeout,
}
);
}

@ -5,7 +5,9 @@ import { ContentMessage, SyncMessage } from '../messages/outgoing';
import { MultiDeviceProtocol } from '../protocols';
export function from(message: ContentMessage): SyncMessage | undefined {
// const { timestamp, identifier } = message;
if (message instanceof SyncMessage) {
return message;
}
// Stubbed for now
return undefined;

@ -2,8 +2,15 @@ import * as MessageUtils from './Messages';
import * as GroupUtils from './Groups';
import * as SyncMessageUtils from './SyncMessageUtils';
import * as StringUtils from './String';
import * as PromiseUtils from './Promise';
export * from './TypedEmitter';
export * from './JobQueue';
export { MessageUtils, SyncMessageUtils, GroupUtils, StringUtils };
export {
MessageUtils,
SyncMessageUtils,
GroupUtils,
StringUtils,
PromiseUtils,
};

@ -1,96 +1,56 @@
import { expect } from 'chai';
import chai from 'chai';
import * as sinon from 'sinon';
import * as _ from 'lodash';
import { GroupUtils, SyncMessageUtils } from '../../../session/utils';
import _ from 'lodash';
import {
GroupUtils,
PromiseUtils,
SyncMessageUtils,
} from '../../../session/utils';
import { Stubs, TestUtils } from '../../../test/test-utils';
import { MessageQueue } from '../../../session/sending/MessageQueue';
import {
ChatMessage,
ClosedGroupMessage,
ContentMessage,
OpenGroupMessage,
} from '../../../session/messages/outgoing';
import { PubKey, RawMessage } from '../../../session/types';
import { PrimaryPubKey, PubKey, RawMessage } from '../../../session/types';
import { UserUtil } from '../../../util';
import { MessageSender, PendingMessageCache } from '../../../session/sending';
import { toRawMessage } from '../../../session/utils/Messages';
import { MessageSender } from '../../../session/sending';
import {
MultiDeviceProtocol,
SessionProtocol,
} from '../../../session/protocols';
import { PendingMessageCacheStub } from '../../test-utils/stubs';
import { describe } from 'mocha';
import { TestSyncMessage } from '../../test-utils/stubs/messages/TestSyncMessage';
// Equivalent to Data.StorageItem
interface StorageItem {
id: string;
value: any;
}
// Helper function to force sequential on events checks
async function tick() {
return new Promise(resolve => {
// tslint:disable-next-line: no-string-based-set-timeout
setTimeout(resolve, 0);
});
}
// tslint:disable-next-line: no-require-imports no-var-requires
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const { expect } = chai;
describe('MessageQueue', () => {
// Initialize new stubbed cache
let data: StorageItem;
const sandbox = sinon.createSandbox();
const ourDevice = TestUtils.generateFakePubKey();
const ourNumber = ourDevice.key;
const pairedDevices = TestUtils.generateFakePubKeys(2);
// Initialize new stubbed queue
let pendingMessageCache: PendingMessageCacheStub;
let messageQueueStub: MessageQueue;
// Spies
let sendMessageToDevicesSpy: sinon.SinonSpy<
[Array<PubKey>, ContentMessage],
Promise<Array<void>>
>;
let sendSyncMessageSpy: sinon.SinonSpy<
[ContentMessage, Array<PubKey>],
Promise<Array<void>>
>;
let sendToGroupSpy: sinon.SinonSpy<
[OpenGroupMessage | ClosedGroupMessage],
Promise<boolean>
>;
// Message Sender Stubs
let sendStub: sinon.SinonStub<[RawMessage, (number | undefined)?]>;
let sendToOpenGroupStub: sinon.SinonStub<[OpenGroupMessage]>;
// Utils Stubs
let groupMembersStub: sinon.SinonStub;
let canSyncStub: sinon.SinonStub<[ContentMessage], boolean>;
let isMediumGroupStub: sinon.SinonStub<[string], boolean>;
// Session Protocol Stubs
let hasSessionStub: sinon.SinonStub<[PubKey]>;
let sendSessionRequestIfNeededStub: sinon.SinonStub<[PubKey], Promise<void>>;
beforeEach(async () => {
// Stub out methods which touch the database
const storageID = 'pendingMessages';
data = {
id: storageID,
value: '[]',
};
// Pending Message Cache Data Stubs
TestUtils.stubData('getItemById')
.withArgs('pendingMessages')
.resolves(data);
TestUtils.stubData('createOrUpdateItem').callsFake((item: StorageItem) => {
if (item.id === storageID) {
data = item;
}
});
// Utils Stubs
canSyncStub = sandbox.stub(SyncMessageUtils, 'canSync');
canSyncStub.returns(false);
sandbox.stub(UserUtil, 'getCurrentDevicePubKey').resolves(ourNumber);
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(pairedDevices);
TestUtils.stubWindow('libsignal', {
SignalProtocolAddress: sandbox.stub(),
@ -99,15 +59,11 @@ describe('MessageQueue', () => {
// Message Sender Stubs
sendStub = sandbox.stub(MessageSender, 'send').resolves();
sendToOpenGroupStub = sandbox
.stub(MessageSender, 'sendToOpenGroup')
.resolves(true);
// Group Utils Stubs
sandbox.stub(GroupUtils, 'isMediumGroup').returns(false);
groupMembersStub = sandbox
.stub(GroupUtils, 'getGroupMembers' as any)
.resolves(TestUtils.generateFakePubKeys(10));
isMediumGroupStub = sandbox
.stub(GroupUtils, 'isMediumGroup')
.returns(false);
// Session Protocol Stubs
sandbox.stub(SessionProtocol, 'sendSessionRequest').resolves();
@ -116,37 +72,9 @@ describe('MessageQueue', () => {
.stub(SessionProtocol, 'sendSessionRequestIfNeeded')
.resolves();
// Pending Mesage Cache Stubs
const chatMessages = Array.from(
{ length: 10 },
TestUtils.generateChatMessage
);
const rawMessage = toRawMessage(
TestUtils.generateFakePubKey(),
TestUtils.generateChatMessage()
);
sandbox.stub(PendingMessageCache.prototype, 'add').resolves(rawMessage);
sandbox.stub(PendingMessageCache.prototype, 'remove').resolves();
sandbox
.stub(PendingMessageCache.prototype, 'getDevices')
.returns(TestUtils.generateFakePubKeys(10));
sandbox
.stub(PendingMessageCache.prototype, 'getForDevice')
.returns(
chatMessages.map(m => toRawMessage(TestUtils.generateFakePubKey(), m))
);
// Spies
sendSyncMessageSpy = sandbox.spy(MessageQueue.prototype, 'sendSyncMessage');
sendMessageToDevicesSpy = sandbox.spy(
MessageQueue.prototype,
'sendMessageToDevices'
);
sendToGroupSpy = sandbox.spy(MessageQueue.prototype, 'sendToGroup');
// Init Queue
messageQueueStub = new MessageQueue();
pendingMessageCache = new PendingMessageCacheStub();
messageQueueStub = new MessageQueue(pendingMessageCache);
});
afterEach(() => {
@ -154,233 +82,314 @@ describe('MessageQueue', () => {
sandbox.restore();
});
describe('send', () => {
it('can send to a single device', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
const promise = messageQueueStub.send(device, message);
await expect(promise).to.be.fulfilled;
});
it('can send sync message', async () => {
const devices = TestUtils.generateFakePubKeys(3);
const message = TestUtils.generateChatMessage();
const promise = messageQueueStub.sendSyncMessage(message, devices);
expect(promise).to.be.fulfilled;
});
});
describe('processPending', () => {
it('will send session request message if no session', async () => {
hasSessionStub.resolves(false);
isMediumGroupStub.returns(false);
const device = TestUtils.generateFakePubKey();
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
expect(sendSessionRequestIfNeededStub.callCount).to.equal(1);
await messageQueueStub.processPending(device);
const stubCallPromise = PromiseUtils.waitUntil(
() => sendSessionRequestIfNeededStub.callCount === 1
);
await expect(stubCallPromise).to.be.fulfilled;
});
it('will send message if session exists', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.returns(false);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
const hasSession = await hasSessionStub(device);
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
const successPromise = PromiseUtils.waitForTask(done => {
messageQueueStub.events.once('success', done);
});
expect(hasSession).to.equal(true, 'session does not exist');
expect(sendSessionRequestIfNeededStub.callCount).to.equal(0);
await messageQueueStub.processPending(device);
await expect(successPromise).to.be.fulfilled;
expect(sendSessionRequestIfNeededStub.called).to.equal(
false,
'Session request triggered when we have a session.'
);
});
});
describe('sendUsingMultiDevice', () => {
it('can send using multidevice', async () => {
it('will send message if sending to medium group', async () => {
isMediumGroupStub.returns(true);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const promise = messageQueueStub.sendUsingMultiDevice(device, message);
await expect(promise).to.be.fulfilled;
const successPromise = PromiseUtils.waitForTask(done => {
messageQueueStub.events.once('success', done);
});
// Ensure the arguments passed into sendMessageToDevices are correct
const previousArgs = sendMessageToDevicesSpy.lastCall.args as [
Array<PubKey>,
ChatMessage
];
await messageQueueStub.processPending(device);
await expect(successPromise).to.be.fulfilled;
expect(sendSessionRequestIfNeededStub.called).to.equal(
false,
'Session request triggered on medium group'
);
});
// Check that instances are equal
expect(previousArgs).to.have.length(2);
it('should remove message from cache', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.returns(false);
const events = ['success', 'fail'];
for (const event of events) {
if (event === 'success') {
sendStub.resolves();
} else {
sendStub.throws(new Error('fail'));
}
const device = TestUtils.generateFakePubKey();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const initialMessages = await pendingMessageCache.getForDevice(device);
expect(initialMessages).to.have.length(1);
await messageQueueStub.processPending(device);
const promise = PromiseUtils.waitUntil(async () => {
const messages = await pendingMessageCache.getForDevice(device);
return messages.length === 0;
});
await expect(promise).to.be.fulfilled;
}
}).timeout(15000);
const argsPairedDevices = previousArgs[0];
const argsChatMessage = previousArgs[1];
describe('events', () => {
it('should send a success event if message was sent', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.returns(false);
sendStub.resolves();
expect(argsChatMessage instanceof ChatMessage).to.equal(
true,
'message passed into sendMessageToDevices was not a valid ChatMessage'
);
expect(argsChatMessage.isEqual(message)).to.equal(
true,
'message passed into sendMessageToDevices has been mutated'
);
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message);
argsPairedDevices.forEach((argsPaired: PubKey, index: number) => {
expect(argsPaired instanceof PubKey).to.equal(
true,
'a device passed into sendMessageToDevices was not a PubKey'
);
expect(argsPaired.isEqual(pairedDevices[index])).to.equal(
true,
'a device passed into sendMessageToDevices did not match MessageDeviceProtocol.getAllDevices'
);
const eventPromise = PromiseUtils.waitForTask<
RawMessage | OpenGroupMessage
>(complete => {
messageQueueStub.events.once('success', complete);
});
await messageQueueStub.processPending(device);
await expect(eventPromise).to.be.fulfilled;
const rawMessage = await eventPromise;
expect(rawMessage.identifier).to.equal(message.identifier);
});
it('should send a fail event if something went wrong while sending', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.returns(false);
sendStub.throws(new Error('failure'));
const spy = sandbox.spy();
messageQueueStub.events.on('fail', spy);
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message);
const eventPromise = PromiseUtils.waitForTask<
[RawMessage | OpenGroupMessage, Error]
>(complete => {
messageQueueStub.events.once('fail', (...args) => {
complete(args);
});
});
await messageQueueStub.processPending(device);
await expect(eventPromise).to.be.fulfilled;
const [rawMessage, error] = await eventPromise;
expect(rawMessage.identifier).to.equal(message.identifier);
expect(error.message).to.equal('failure');
});
});
});
describe('sendUsingMultiDevice', () => {
it('should send the message to all the devices', async () => {
const devices = TestUtils.generateFakePubKeys(3);
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices);
const stub = sandbox
.stub(messageQueueStub, 'sendMessageToDevices')
.resolves();
const message = TestUtils.generateChatMessage();
await messageQueueStub.sendUsingMultiDevice(devices[0], message);
const args = stub.lastCall.args as [Array<PubKey>, ContentMessage];
expect(args[0]).to.have.same.members(devices);
expect(args[1]).to.equal(message);
});
});
describe('sendMessageToDevices', () => {
it('can send to many devices', async () => {
const devices = TestUtils.generateFakePubKeys(10);
hasSessionStub.resolves(false);
const devices = TestUtils.generateFakePubKeys(5);
const message = TestUtils.generateChatMessage();
const promise = messageQueueStub.sendMessageToDevices(devices, message);
await expect(promise).to.be.fulfilled;
await messageQueueStub.sendMessageToDevices(devices, message);
expect(pendingMessageCache.getCache()).to.have.length(devices.length);
});
it('can send sync message and confirm canSync is valid', async () => {
canSyncStub.returns(true);
it('should send sync message if possible', async () => {
hasSessionStub.returns(false);
const devices = TestUtils.generateFakePubKeys(3);
const message = TestUtils.generateChatMessage();
const pairedDeviceKeys = pairedDevices.map(device => device.key);
sandbox.stub(SyncMessageUtils, 'canSync').returns(true);
sandbox
.stub(SyncMessageUtils, 'from')
.returns(new TestSyncMessage({ timestamp: Date.now() }));
const promise = messageQueueStub.sendMessageToDevices(devices, message);
await expect(promise).to.be.fulfilled;
// This stub ensures that the message won't process
const sendSyncMessageStub = sandbox
.stub(messageQueueStub, 'sendSyncMessage')
.resolves();
// Check sendSyncMessage parameters
const previousArgs = sendSyncMessageSpy.lastCall.args as [
ChatMessage,
Array<PubKey>
];
expect(sendSyncMessageSpy.callCount).to.equal(1);
const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)];
sandbox
.stub(MultiDeviceProtocol, 'getAllDevices')
.callsFake(async user => {
if (ourDevice.isEqual(user)) {
return ourDevices;
}
// Check that instances are equal
expect(previousArgs).to.have.length(2);
return [];
});
const argsChatMessage = previousArgs[0];
const argsPairedKeys = [...previousArgs[1]].map(d => d.key);
const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)];
const message = TestUtils.generateChatMessage();
expect(argsChatMessage instanceof ChatMessage).to.equal(
await messageQueueStub.sendMessageToDevices(devices, message);
expect(sendSyncMessageStub.called).to.equal(
true,
'message passed into sendMessageToDevices was not a valid ChatMessage'
'sendSyncMessage was not called.'
);
expect(argsChatMessage.isEqual(message)).to.equal(
true,
'message passed into sendMessageToDevices has been mutated'
expect(
pendingMessageCache.getCache().map(c => c.device)
).to.not.have.members(
ourDevices.map(d => d.key),
'Sending regular messages to our own device is not allowed.'
);
// argsPairedKeys and pairedDeviceKeys should contain the same values
const keyArgsValid = _.isEmpty(_.xor(argsPairedKeys, pairedDeviceKeys));
expect(keyArgsValid).to.equal(
true,
'devices passed into sendSyncMessage were invalid'
expect(pendingMessageCache.getCache()).to.have.length(
devices.length - ourDevices.length,
'Messages should not be sent to our devices.'
);
});
});
describe('sendToGroup', () => {
it('can send to closed group', async () => {
const message = TestUtils.generateClosedGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
});
it('uses correct parameters for sendToGroup with ClosedGroupMessage', async () => {
const message = TestUtils.generateClosedGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
// Check parameters
const previousArgs = sendMessageToDevicesSpy.lastCall.args as [
Array<PubKey>,
ClosedGroupMessage
];
expect(previousArgs).to.have.length(2);
const argsClosedGroupMessage = previousArgs[1];
expect(argsClosedGroupMessage instanceof ClosedGroupMessage).to.equal(
true,
'message passed into sendMessageToDevices was not a ClosedGroupMessage'
);
});
describe('sendSyncMessage', () => {
it('should send a message to all our devices', async () => {
hasSessionStub.resolves(false);
it("won't send to invalid groupId", async () => {
const message = TestUtils.generateClosedGroupMessage('invalid-group-id');
const success = await messageQueueStub.sendToGroup(message);
const ourOtherDevices = TestUtils.generateFakePubKeys(2);
const ourDevices = [ourDevice, ...ourOtherDevices];
sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(ourDevices);
// Ensure message parameter passed into sendToGroup is as expected
expect(success).to.equal(
false,
'an invalid groupId was treated as valid'
await messageQueueStub.sendSyncMessage(
new TestSyncMessage({ timestamp: Date.now() })
);
expect(sendToGroupSpy.callCount).to.equal(1);
const argsMessage = sendToGroupSpy.lastCall.args[0];
expect(argsMessage instanceof ClosedGroupMessage).to.equal(
true,
'message passed into sendToGroup was not a ClosedGroupMessage'
expect(pendingMessageCache.getCache()).to.have.length(
ourOtherDevices.length
);
expect(success).to.equal(
false,
'invalid ClosedGroupMessage was propogated through sendToGroup'
expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(
ourOtherDevices.map(d => d.key)
);
});
});
it('wont send message to empty closed group', async () => {
groupMembersStub.resolves(TestUtils.generateFakePubKeys(0));
const message = TestUtils.generateClosedGroupMessage();
const response = await messageQueueStub.sendToGroup(message);
describe('sendToGroup', () => {
describe('closed groups', async () => {
it('can send to closed group', async () => {
const members = TestUtils.generateFakePubKeys(4).map(
p => new PrimaryPubKey(p.key)
);
sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members);
expect(response).to.equal(
false,
'sendToGroup send a message to an empty group'
);
});
const sendUsingMultiDeviceStub = sandbox
.stub(messageQueueStub, 'sendUsingMultiDevice')
.resolves();
it('can send to open group', async () => {
const message = TestUtils.generateOpenGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
const message = TestUtils.generateClosedGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(success).to.equal(true, 'sending to group failed');
expect(sendUsingMultiDeviceStub.callCount).to.equal(members.length);
expect(success).to.equal(true, 'sending to group failed');
});
});
const arg = sendUsingMultiDeviceStub.getCall(0).args;
expect(arg[1] instanceof ClosedGroupMessage).to.equal(
true,
'message sent to group member was not a ClosedGroupMessage'
);
});
describe('events', () => {
it('can send events on message sending success', async () => {
const successSpy = sandbox.spy();
messageQueueStub.events.on('success', successSpy);
it('wont send message to empty closed group', async () => {
sandbox.stub(GroupUtils, 'getGroupMembers').resolves([]);
const sendUsingMultiDeviceStub = sandbox
.stub(messageQueueStub, 'sendUsingMultiDevice')
.resolves();
const device = TestUtils.generateFakePubKey();
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
const message = TestUtils.generateClosedGroupMessage();
const response = await messageQueueStub.sendToGroup(message);
await tick();
expect(successSpy.callCount).to.equal(1);
expect(response).to.equal(
false,
'sendToGroup sent a message to an empty group'
);
expect(sendUsingMultiDeviceStub.callCount).to.equal(0);
});
});
it('can send events on message sending failure', async () => {
sendStub.throws(new Error('Failed to send message.'));
describe('open groups', async () => {
let sendToOpenGroupStub: sinon.SinonStub<
[OpenGroupMessage],
Promise<boolean>
>;
beforeEach(() => {
sendToOpenGroupStub = sandbox
.stub(MessageSender, 'sendToOpenGroup')
.resolves(true);
});
const failureSpy = sandbox.spy();
messageQueueStub.events.on('fail', failureSpy);
it('can send to open group', async () => {
const message = TestUtils.generateOpenGroupMessage();
const success = await messageQueueStub.sendToGroup(message);
expect(sendToOpenGroupStub.callCount).to.equal(1);
expect(success).to.equal(true, 'Sending to open group failed');
});
const device = TestUtils.generateFakePubKey();
const promise = messageQueueStub.processPending(device);
await expect(promise).to.be.fulfilled;
it('should emit a success event when send was successful', async () => {
const message = TestUtils.generateOpenGroupMessage();
const eventPromise = PromiseUtils.waitForTask(complete => {
messageQueueStub.events.once('success', complete);
}, 2000);
await messageQueueStub.sendToGroup(message);
await expect(eventPromise).to.be.fulfilled;
});
it('should emit a fail event if something went wrong', async () => {
sendToOpenGroupStub.resolves(false);
const message = TestUtils.generateOpenGroupMessage();
const eventPromise = PromiseUtils.waitForTask(complete => {
messageQueueStub.events.once('fail', complete);
}, 2000);
await tick();
expect(failureSpy.callCount).to.equal(1);
await messageQueueStub.sendToGroup(message);
await expect(eventPromise).to.be.fulfilled;
});
});
});
});

@ -36,7 +36,6 @@ describe('PendingMessageCache', () => {
});
pendingMessageCacheStub = new PendingMessageCache();
await pendingMessageCacheStub.isReady;
});
afterEach(() => {
@ -44,7 +43,7 @@ describe('PendingMessageCache', () => {
});
it('can initialize cache', async () => {
const cache = pendingMessageCacheStub.getAllPending();
const cache = await pendingMessageCacheStub.getAllPending();
// We expect the cache to initialise as an empty array
expect(cache).to.be.instanceOf(Array);
@ -59,7 +58,7 @@ describe('PendingMessageCache', () => {
await pendingMessageCacheStub.add(device, message);
// Verify that the message is in the cache
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(1);
@ -68,6 +67,22 @@ describe('PendingMessageCache', () => {
expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp);
});
it('can add multiple messages belonging to the same user', async () => {
const device = TestUtils.generateFakePubKey();
await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage());
// We have to timeout here otherwise it's processed too fast and messages start having the same timestamp
await TestUtils.timeout(5);
await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage());
await TestUtils.timeout(5);
await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage());
// Verify that the message is in the cache
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(3);
});
it('can remove from cache', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
@ -75,18 +90,47 @@ describe('PendingMessageCache', () => {
await pendingMessageCacheStub.add(device, message);
const initialCache = pendingMessageCacheStub.getAllPending();
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(1);
// Remove the message
await pendingMessageCacheStub.remove(rawMessage);
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
// Verify that the message was removed
expect(finalCache).to.have.length(0);
});
it('should only remove messages with different timestamp and device', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
const rawMessage = MessageUtils.toRawMessage(device, message);
await pendingMessageCacheStub.add(device, message);
await TestUtils.timeout(5);
const one = await pendingMessageCacheStub.add(
device,
TestUtils.generateChatMessage(message.identifier)
);
const two = await pendingMessageCacheStub.add(
TestUtils.generateFakePubKey(),
message
);
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(3);
// Remove the message
await pendingMessageCacheStub.remove(rawMessage);
const finalCache = await pendingMessageCacheStub.getAllPending();
// Verify that the message was removed
expect(finalCache).to.have.length(2);
expect(finalCache).to.have.deep.members([one, two]);
});
it('can get devices', async () => {
const cacheItems = [
{
@ -103,16 +147,16 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const cache = pendingMessageCacheStub.getAllPending();
const cache = await pendingMessageCacheStub.getAllPending();
expect(cache).to.have.length(cacheItems.length);
// Get list of devices
const devicesKeys = cacheItems.map(item => item.device.key);
const pulledDevices = pendingMessageCacheStub.getDevices();
const pulledDevices = await pendingMessageCacheStub.getDevices();
const pulledDevicesKeys = pulledDevices.map(d => d.key);
// Verify that device list from cache is equivalent to devices added
@ -131,21 +175,21 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const initialCache = pendingMessageCacheStub.getAllPending();
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(cacheItems.length);
// Get pending for each specific device
cacheItems.forEach(item => {
const pendingForDevice = pendingMessageCacheStub.getForDevice(
for (const item of cacheItems) {
const pendingForDevice = await pendingMessageCacheStub.getForDevice(
item.device
);
expect(pendingForDevice).to.have.length(1);
expect(pendingForDevice[0].device).to.equal(item.device.key);
});
}
});
it('can find nothing when empty', async () => {
@ -164,7 +208,7 @@ describe('PendingMessageCache', () => {
await pendingMessageCacheStub.add(device, message);
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(1);
const foundMessage = pendingMessageCacheStub.find(rawMessage);
@ -188,17 +232,17 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const initialCache = pendingMessageCacheStub.getAllPending();
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(cacheItems.length);
// Clear cache
await pendingMessageCacheStub.clear();
const finalCache = pendingMessageCacheStub.getAllPending();
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(0);
});
@ -218,21 +262,20 @@ describe('PendingMessageCache', () => {
},
];
cacheItems.forEach(async item => {
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
});
}
const addedMessages = pendingMessageCacheStub.getAllPending();
const addedMessages = await pendingMessageCacheStub.getAllPending();
expect(addedMessages).to.have.length(cacheItems.length);
// Rebuild from DB
const freshCache = new PendingMessageCache();
await freshCache.isReady;
// Verify messages
const rebuiltMessages = freshCache.getAllPending();
const rebuiltMessages = await freshCache.getAllPending();
rebuiltMessages.forEach((message, index) => {
for (const [index, message] of rebuiltMessages.entries()) {
const addedMessage = addedMessages[index];
// Pull out plainTextBuffer for a separate check
@ -254,6 +297,6 @@ describe('PendingMessageCache', () => {
true,
'cached messages were not rebuilt properly'
);
});
}
});
});

@ -1 +1,2 @@
export * from './ciphers';
export * from './sending';

@ -0,0 +1,7 @@
import { SyncMessage } from '../../../../session/messages/outgoing';
import { SignalService } from '../../../../protobuf';
export class TestSyncMessage extends SyncMessage {
protected syncProto(): SignalService.SyncMessage {
return SignalService.SyncMessage.create({});
}
}

@ -0,0 +1,22 @@
import { PendingMessageCache } from '../../../../session/sending';
import { RawMessage } from '../../../../session/types';
export class PendingMessageCacheStub extends PendingMessageCache {
public dbData: Array<RawMessage>;
constructor(dbData: Array<RawMessage> = []) {
super();
this.dbData = dbData;
}
public getCache(): Readonly<Array<RawMessage>> {
return this.cache;
}
protected async getFromStorage() {
return this.dbData;
}
protected async saveToDB() {
return;
}
}

@ -0,0 +1 @@
export * from './PendingMessageCacheStub';

@ -7,10 +7,10 @@ import { v4 as uuid } from 'uuid';
import { OpenGroup } from '../../../session/types';
import { generateFakePubKey } from './pubkey';
export function generateChatMessage(): ChatMessage {
export function generateChatMessage(identifier?: string): ChatMessage {
return new ChatMessage({
body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit',
identifier: uuid(),
identifier: identifier ?? uuid(),
timestamp: Date.now(),
attachments: undefined,
quote: undefined,

Loading…
Cancel
Save