Modify periodic check implementation

pull/1185/head
Mikunj 5 years ago
parent d862269f8d
commit 3bf5796cd5

@ -63,7 +63,9 @@ export class MessageQueue implements MessageQueueInterface {
const ourDevices = await MultiDeviceProtocol.getOurDevices();
// Remove our devices from currentDevices
currentDevices = currentDevices.filter(device => !ourDevices.some(d => device.isEqual(d)));
currentDevices = currentDevices.filter(
device => !ourDevices.some(d => device.isEqual(d))
);
}
const promises = currentDevices.map(async device => {
@ -129,7 +131,9 @@ export class MessageQueue implements MessageQueueInterface {
}
const ourDevices = await MultiDeviceProtocol.getOurDevices();
const promises = ourDevices.map(async device => this.process(device, message));
const promises = ourDevices.map(async device =>
this.process(device, message)
);
return Promise.all(promises);
}

@ -67,7 +67,11 @@ export class PendingMessageCache {
// Remove item from cache and sync with database
const updatedCache = this.cache.filter(
cached => !(cached.device === message.device && cached.timestamp === message.timestamp)
cached =>
!(
cached.device === message.device &&
cached.timestamp === message.timestamp
)
);
this.cache = updatedCache;
await this.saveToDB();

@ -0,0 +1,118 @@
type SimpleFunction<T> = (arg: T) => void;
type Return<T> = Promise<T> | T;
async function toPromise<T>(value: Return<T>): Promise<T> {
return value instanceof Promise ? value : Promise.resolve(value);
}
/**
* Create a promise which waits until `done` is called or until `timeout` period is reached.
* If `timeout` is reached then this will throw an Error.
*
* @param task The task to wait for.
* @param timeout The timeout period.
*/
export async function waitForTask<T>(
task: (done: SimpleFunction<T>) => Return<void>,
timeout: number = 2000
): Promise<T> {
const timeoutPromise = new Promise<T>((_, rej) => {
const wait = setTimeout(() => {
clearTimeout(wait);
rej(new Error('Task timed out.'));
}, timeout);
});
const taskPromise = new Promise(async (res, rej) => {
try {
await toPromise(task(res));
} catch (e) {
rej(e);
}
});
return Promise.race([timeoutPromise, taskPromise]) as Promise<T>;
}
interface PollOptions {
timeout: number;
interval: number;
}
/**
* Creates a promise which calls the `task` every `interval` until `done` is called or until `timeout` period is reached.
* If `timeout` is reached then this will throw an Error.
*
* @param check The check which runs every `interval` ms.
* @param options The polling options.
*/
export async function poll(
task: (done: SimpleFunction<void>) => Return<void>,
options: Partial<PollOptions> = {}
): Promise<void> {
const defaults: PollOptions = {
timeout: 2000,
interval: 1000,
};
const { timeout, interval } = {
...defaults,
...options,
};
const endTime = Date.now() + timeout;
let stop = false;
const finish = () => {
stop = true;
};
const _poll = async (resolve: any, reject: any) => {
if (stop) {
resolve();
} else if (Date.now() >= endTime) {
finish();
reject(new Error('Periodic check timeout'));
} else {
try {
await toPromise(task(finish));
} catch (e) {
finish();
reject(e);
return;
}
setTimeout(() => {
void _poll(resolve, reject);
}, interval);
}
};
return new Promise((resolve, reject) => {
void _poll(resolve, reject);
});
}
/**
* Creates a promise which waits until `check` returns `true` or rejects if `timeout` preiod is reached.
* If `timeout` is reached then this will throw an Error.
*
* @param check The boolean check.
* @param timeout The time before an error is thrown.
*/
export async function waitUntil(
check: () => Return<boolean>,
timeout: number = 2000
) {
// This is causing unhandled promise rejection somewhere in MessageQueue tests
return poll(
async done => {
const result = await toPromise(check());
if (result) {
done();
}
},
{
timeout,
}
);
}

@ -2,8 +2,15 @@ import * as MessageUtils from './Messages';
import * as GroupUtils from './Groups';
import * as SyncMessageUtils from './SyncMessageUtils';
import * as StringUtils from './String';
import * as PromiseUtils from './Promise';
export * from './TypedEmitter';
export * from './JobQueue';
export { MessageUtils, SyncMessageUtils, GroupUtils, StringUtils };
export {
MessageUtils,
SyncMessageUtils,
GroupUtils,
StringUtils,
PromiseUtils,
};

@ -1,7 +1,11 @@
import chai from 'chai';
import * as sinon from 'sinon';
import * as _ from 'lodash';
import { GroupUtils, SyncMessageUtils } from '../../../session/utils';
import _ from 'lodash';
import {
GroupUtils,
PromiseUtils,
SyncMessageUtils,
} from '../../../session/utils';
import { Stubs, TestUtils } from '../../../test/test-utils';
import { MessageQueue } from '../../../session/sending/MessageQueue';
import {
@ -81,25 +85,27 @@ describe('MessageQueue', () => {
describe('processPending', () => {
it('will send session request message if no session', async () => {
hasSessionStub.resolves(false);
isMediumGroupStub.resolves(false);
isMediumGroupStub.returns(false);
const device = TestUtils.generateFakePubKey();
const stubCallPromise = TestUtils.waitUntil(() => sendSessionRequestIfNeededStub.callCount === 1);
await messageQueueStub.processPending(device);
expect(stubCallPromise).to.be.fulfilled;
const stubCallPromise = PromiseUtils.waitUntil(
() => sendSessionRequestIfNeededStub.callCount === 1
);
await expect(stubCallPromise).to.be.fulfilled;
});
it('will send message if session exists', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
isMediumGroupStub.returns(false);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const successPromise = TestUtils.waitForTask(done => {
const successPromise = PromiseUtils.waitForTask(done => {
messageQueueStub.events.once('success', done);
});
@ -112,13 +118,13 @@ describe('MessageQueue', () => {
});
it('will send message if sending to medium group', async () => {
isMediumGroupStub.resolves(true);
isMediumGroupStub.returns(true);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const successPromise = TestUtils.waitForTask(done => {
const successPromise = PromiseUtils.waitForTask(done => {
messageQueueStub.events.once('success', done);
});
@ -132,7 +138,7 @@ describe('MessageQueue', () => {
it('should remove message from cache', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
isMediumGroupStub.returns(false);
const events = ['success', 'fail'];
for (const event of events) {
@ -149,25 +155,27 @@ describe('MessageQueue', () => {
expect(initialMessages).to.have.length(1);
await messageQueueStub.processPending(device);
const promise = TestUtils.waitUntil(async () => {
const promise = PromiseUtils.waitUntil(async () => {
const messages = await pendingMessageCache.getForDevice(device);
return messages.length === 0;
});
expect(promise).to.be.fulfilled;
await expect(promise).to.be.fulfilled;
}
});
describe('events', () => {
it('should send a success event if message was sent', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
isMediumGroupStub.returns(false);
sendStub.resolves();
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message);
const eventPromise = TestUtils.waitForTask<RawMessage | OpenGroupMessage>(complete => {
const eventPromise = PromiseUtils.waitForTask<
RawMessage | OpenGroupMessage
>(complete => {
messageQueueStub.events.once('success', complete);
});
@ -180,7 +188,7 @@ describe('MessageQueue', () => {
it('should send a fail event if something went wrong while sending', async () => {
hasSessionStub.resolves(true);
isMediumGroupStub.resolves(false);
isMediumGroupStub.returns(false);
sendStub.throws(new Error('failure'));
const spy = sandbox.spy();
@ -190,7 +198,9 @@ describe('MessageQueue', () => {
const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message);
const eventPromise = TestUtils.waitForTask<[RawMessage | OpenGroupMessage, Error]>(complete => {
const eventPromise = PromiseUtils.waitForTask<
[RawMessage | OpenGroupMessage, Error]
>(complete => {
messageQueueStub.events.once('fail', (...args) => {
complete(args);
});
@ -231,8 +241,7 @@ describe('MessageQueue', () => {
const message = TestUtils.generateChatMessage();
await messageQueueStub.sendMessageToDevices(devices, message);
const promise = TestUtils.waitUntil(() => pendingMessageCache.getCache().length === devices.length);
await expect(promise).to.be.fulfilled;
expect(pendingMessageCache.getCache()).to.have.length(devices.length);
});
it('should send sync message if possible', async () => {
@ -268,7 +277,12 @@ describe('MessageQueue', () => {
true,
'sendSyncMessage was not called.'
);
expect(pendingMessageCache.getCache().map(c => c.device)).to.not.have.members(ourDevices.map(d => d.key), 'Sending regular messages to our own device is not allowed.');
expect(
pendingMessageCache.getCache().map(c => c.device)
).to.not.have.members(
ourDevices.map(d => d.key),
'Sending regular messages to our own device is not allowed.'
);
expect(pendingMessageCache.getCache()).to.have.length(
devices.length - ourDevices.length,
'Messages should not be sent to our devices.'
@ -288,8 +302,12 @@ describe('MessageQueue', () => {
new TestSyncMessage({ timestamp: Date.now() })
);
expect(pendingMessageCache.getCache()).to.have.length(ourOtherDevices.length);
expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(ourOtherDevices.map(d => d.key));
expect(pendingMessageCache.getCache()).to.have.length(
ourOtherDevices.length
);
expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(
ourOtherDevices.map(d => d.key)
);
});
});
@ -354,7 +372,7 @@ describe('MessageQueue', () => {
it('should emit a success event when send was successful', async () => {
const message = TestUtils.generateOpenGroupMessage();
const eventPromise = TestUtils.waitForTask(complete => {
const eventPromise = PromiseUtils.waitForTask(complete => {
messageQueueStub.events.once('success', complete);
}, 2000);
@ -365,7 +383,7 @@ describe('MessageQueue', () => {
it('should emit a fail event if something went wrong', async () => {
sendToOpenGroupStub.resolves(false);
const message = TestUtils.generateOpenGroupMessage();
const eventPromise = TestUtils.waitForTask(complete => {
const eventPromise = PromiseUtils.waitForTask(complete => {
messageQueueStub.events.once('fail', complete);
}, 2000);

@ -3,7 +3,6 @@ import * as _ from 'lodash';
import { MessageUtils } from '../../../session/utils';
import { TestUtils, timeout } from '../../../test/test-utils';
import { PendingMessageCache } from '../../../session/sending/PendingMessageCache';
import { initial } from 'lodash';
// Equivalent to Data.StorageItem
interface StorageItem {
@ -271,7 +270,7 @@ describe('PendingMessageCache', () => {
// Verify messages
const rebuiltMessages = await freshCache.getAllPending();
// tslint:disable-next-line: no-for-in no-for-in-array
for (const [index, message] of rebuiltMessages.entries()) {
const addedMessage = addedMessages[index];

@ -124,83 +124,3 @@ export function generateClosedGroupMessage(
chatMessage: generateChatMessage(),
});
}
type ArgFunction<T> = (arg: T) => void;
type MaybePromise<T> = Promise<T> | T;
/**
* Create a promise which waits until `done` is called or until timeout period is reached.
* @param task The task to wait for.
* @param timeout The timeout period.
*/
// tslint:disable-next-line: no-shadowed-variable
export async function waitForTask<T>(task: (done: ArgFunction<T>) => MaybePromise<void>, timeout: number = 2000): Promise<T> {
const timeoutPromise = new Promise<T>((_, rej) => {
const wait = setTimeout(() => {
clearTimeout(wait);
rej(new Error('Task timed out.'));
}, timeout);
});
// tslint:disable-next-line: no-shadowed-variable
const taskPromise = new Promise(async (res, rej) => {
try {
const taskReturn = task(res);
return taskReturn instanceof Promise ? taskReturn : Promise.resolve(taskReturn);
} catch (e) {
rej(e);
}
});
return Promise.race([timeoutPromise, taskPromise]) as Promise<T>;
}
/**
* Creates a promise which periodically calls the `check` until `done` is called or until timeout period is reached.
* @param check The check which runs every 100ms.
* @param timeout The time before an error is thrown.
*/
// tslint:disable-next-line: no-shadowed-variable
export async function periodicallyCheck(check: (done: ArgFunction<void>) => MaybePromise<void>, timeout: number = 1000): Promise<void> {
return waitForTask(complete => {
let interval: NodeJS.Timeout | undefined;
const cleanup = () => {
if (interval) {
clearInterval(interval);
interval = undefined;
}
};
setTimeout(cleanup, timeout);
const onDone = () => {
complete();
cleanup();
};
interval = setInterval(async () => {
try {
await toPromise(check(onDone));
} catch (e) {
cleanup();
throw e;
}
}, 100);
}, timeout);
}
/**
* Creates a promise which waits until `check` returns `true` or rejects if timeout preiod is reached.
* @param check The boolean check.
* @param timeout The time before an error is thrown.
*/
export async function waitUntil(check: () => MaybePromise<boolean>, timeout: number = 2000) {
return periodicallyCheck(async done => {
const result = await toPromise(check());
if (result) {
done();
}
}, timeout);
}
async function toPromise<T>(maybePromise: MaybePromise<T>): Promise<T> {
return maybePromise instanceof Promise ? maybePromise : Promise.resolve(maybePromise);
}

Loading…
Cancel
Save