Merge pull request #1170 from Mikunj/message-sender-retry

Message sender retry
pull/1173/head
Mikunj Varsani 5 years ago committed by GitHub
commit 542615961c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -90,6 +90,7 @@
"node-fetch": "2.3.0",
"node-sass": "4.9.3",
"os-locale": "2.1.0",
"p-retry": "^4.2.0",
"pify": "3.0.0",
"protobufjs": "6.8.6",
"rc-slider": "^8.7.1",

@ -4,6 +4,10 @@ import { libloki, libsignal, Signal, textsecure } from '../../window';
import { UserUtil } from '../../util';
import { CipherTextObject } from '../../../libtextsecure/libsignal-protocol';
/**
* Add padding to a message buffer
* @param messageBuffer The buffer to add padding to.
*/
export function padPlainTextBuffer(messageBuffer: Uint8Array): Uint8Array {
const plaintext = new Uint8Array(
getPaddedMessageLength(messageBuffer.byteLength + 1) - 1

@ -6,25 +6,33 @@ import { SignalService } from '../../protobuf';
import { UserUtil } from '../../util';
import { MessageEncrypter } from '../crypto';
import { lokiMessageAPI, lokiPublicChatAPI } from '../../window';
import pRetry from 'p-retry';
// ================ Regular ================
/**
* Check if we can send to service nodes.
*/
export function canSendToSnode(): boolean {
// Seems like lokiMessageAPI is not always guaranteed to be initialized
return Boolean(lokiMessageAPI);
}
export async function send({
device,
plainTextBuffer,
encryption,
timestamp,
ttl,
}: RawMessage): Promise<void> {
/**
* Send a message via service nodes.
*
* @param message The message to send.
* @param attempts The amount of times to attempt sending. Minimum value is 1.
*/
export async function send(
message: RawMessage,
attempts: number = 3
): Promise<void> {
if (!canSendToSnode()) {
throw new Error('lokiMessageAPI is not initialized.');
}
const { device, plainTextBuffer, encryption, timestamp, ttl } = message;
const { envelopeType, cipherText } = await MessageEncrypter.encrypt(
device,
plainTextBuffer,
@ -33,8 +41,13 @@ export async function send({
const envelope = await buildEnvelope(envelopeType, timestamp, cipherText);
const data = wrapEnvelope(envelope);
// TODO: Somehow differentiate between Retryable and Regular erros
return lokiMessageAPI.sendMessage(device, data, timestamp, ttl);
return pRetry(
async () => lokiMessageAPI.sendMessage(device, data, timestamp, ttl),
{
retries: Math.max(attempts - 1, 0),
factor: 1,
}
);
}
async function buildEnvelope(
@ -76,9 +89,18 @@ function wrapEnvelope(envelope: SignalService.Envelope): Uint8Array {
// ================ Open Group ================
/**
* Send a message to an open group.
* @param message The open group message.
*/
export async function sendToOpenGroup(
message: OpenGroupMessage
): Promise<boolean> {
/*
Note: Retrying wasn't added to this but it can be added in the future if needed.
The only problem is that `channelAPI.sendMessage` returns true/false and doesn't throw any error so we can never be sure why sending failed.
This should be fixed and we shouldn't rely on returning true/false, rather return nothing (success) or throw an error (failure)
*/
const { group, quote, attachments, preview, body } = message;
const channelAPI = await lokiPublicChatAPI.findOrCreateChannel(
group.server,
@ -87,7 +109,6 @@ export async function sendToOpenGroup(
);
// Don't think returning true/false on `sendMessage` is a good way
// We should either: return nothing (success) or throw an error (failure)
return channelAPI.sendMessage({
quote,
attachments: attachments || [],

@ -21,11 +21,25 @@ describe('MessageSender', () => {
TestUtils.restoreStubs();
});
describe('canSendToSnode', () => {
it('should return the correct value', () => {
const stub = TestUtils.stubWindow('lokiMessageAPI', undefined);
expect(MessageSender.canSendToSnode()).to.equal(
false,
'We cannot send if lokiMessageAPI is not set'
);
stub.set(sandbox.createStubInstance(LokiMessageAPI));
expect(MessageSender.canSendToSnode()).to.equal(
true,
'We can send if lokiMessageAPI is set'
);
});
});
describe('send', () => {
const ourNumber = 'ourNumber';
let lokiMessageAPIStub: sinon.SinonStubbedInstance<LokiMessageAPI>;
let messageEncyrptReturnEnvelopeType =
SignalService.Envelope.Type.CIPHERTEXT;
let encryptStub: sinon.SinonStub<[string, Uint8Array, EncryptionType]>;
beforeEach(() => {
// We can do this because LokiMessageAPI has a module export in it
@ -34,76 +48,87 @@ describe('MessageSender', () => {
});
TestUtils.stubWindow('lokiMessageAPI', lokiMessageAPIStub);
encryptStub = sandbox.stub(MessageEncrypter, 'encrypt').resolves({
envelopeType: SignalService.Envelope.Type.CIPHERTEXT,
cipherText: crypto.randomBytes(10),
});
sandbox.stub(UserUtil, 'getCurrentDevicePubKey').resolves(ourNumber);
sandbox
.stub(MessageEncrypter, 'encrypt')
.callsFake(async (_device, plainTextBuffer, _type) => ({
envelopeType: messageEncyrptReturnEnvelopeType,
cipherText: plainTextBuffer,
}));
});
it('should pass the correct values to lokiMessageAPI', async () => {
const device = '0';
const timestamp = Date.now();
const ttl = 100;
await MessageSender.send({
describe('retry', () => {
const rawMessage = {
identifier: '1',
device,
device: '0',
plainTextBuffer: crypto.randomBytes(10),
encryption: EncryptionType.Signal,
timestamp,
ttl,
timestamp: Date.now(),
ttl: 100,
};
it('should not retry if an error occurred during encryption', async () => {
encryptStub.throws(new Error('Failed to encrypt.'));
const promise = MessageSender.send(rawMessage);
await expect(promise).is.rejectedWith('Failed to encrypt.');
expect(lokiMessageAPIStub.sendMessage.callCount).to.equal(0);
});
const args = lokiMessageAPIStub.sendMessage.getCall(0).args;
expect(args[0]).to.equal(device);
expect(args[2]).to.equal(timestamp);
expect(args[3]).to.equal(ttl);
});
it('should only call lokiMessageAPI once if no errors occured', async () => {
await MessageSender.send(rawMessage);
expect(lokiMessageAPIStub.sendMessage.callCount).to.equal(1);
});
it('should correctly build the envelope', async () => {
messageEncyrptReturnEnvelopeType = SignalService.Envelope.Type.CIPHERTEXT;
it('should only retry the specified amount of times before throwing', async () => {
lokiMessageAPIStub.sendMessage.throws(new Error('API error'));
const attempts = 2;
const promise = MessageSender.send(rawMessage, attempts);
await expect(promise).is.rejectedWith('API error');
expect(lokiMessageAPIStub.sendMessage.callCount).to.equal(attempts);
});
// This test assumes the encryption stub returns the plainText passed into it.
const plainTextBuffer = crypto.randomBytes(10);
const timestamp = Date.now();
it('should not throw error if successful send occurs within the retry limit', async () => {
lokiMessageAPIStub.sendMessage
.onFirstCall()
.throws(new Error('API error'));
await MessageSender.send(rawMessage, 3);
expect(lokiMessageAPIStub.sendMessage.callCount).to.equal(2);
});
});
await MessageSender.send({
identifier: '1',
device: '0',
plainTextBuffer,
encryption: EncryptionType.Signal,
timestamp,
ttl: 1,
describe('logic', () => {
let messageEncyrptReturnEnvelopeType =
SignalService.Envelope.Type.CIPHERTEXT;
beforeEach(() => {
encryptStub.callsFake(async (_device, plainTextBuffer, _type) => ({
envelopeType: messageEncyrptReturnEnvelopeType,
cipherText: plainTextBuffer,
}));
});
const data = lokiMessageAPIStub.sendMessage.getCall(0).args[1];
const webSocketMessage = SignalService.WebSocketMessage.decode(data);
expect(webSocketMessage.request?.body).to.not.equal(
undefined,
'Request body should not be undefined'
);
expect(webSocketMessage.request?.body).to.not.equal(
null,
'Request body should not be null'
);
it('should pass the correct values to lokiMessageAPI', async () => {
const device = '0';
const timestamp = Date.now();
const ttl = 100;
const envelope = SignalService.Envelope.decode(
webSocketMessage.request?.body as Uint8Array
);
expect(envelope.type).to.equal(SignalService.Envelope.Type.CIPHERTEXT);
expect(envelope.source).to.equal(ourNumber);
expect(envelope.sourceDevice).to.equal(1);
expect(toNumber(envelope.timestamp)).to.equal(timestamp);
expect(envelope.content).to.deep.equal(plainTextBuffer);
});
await MessageSender.send({
identifier: '1',
device,
plainTextBuffer: crypto.randomBytes(10),
encryption: EncryptionType.Signal,
timestamp,
ttl,
});
const args = lokiMessageAPIStub.sendMessage.getCall(0).args;
expect(args[0]).to.equal(device);
expect(args[2]).to.equal(timestamp);
expect(args[3]).to.equal(ttl);
});
describe('UNIDENTIFIED_SENDER', () => {
it('should set the envelope source to be empty', async () => {
it('should correctly build the envelope', async () => {
messageEncyrptReturnEnvelopeType =
SignalService.Envelope.Type.UNIDENTIFIED_SENDER;
SignalService.Envelope.Type.CIPHERTEXT;
// This test assumes the encryption stub returns the plainText passed into it.
const plainTextBuffer = crypto.randomBytes(10);
@ -132,13 +157,53 @@ describe('MessageSender', () => {
const envelope = SignalService.Envelope.decode(
webSocketMessage.request?.body as Uint8Array
);
expect(envelope.type).to.equal(
SignalService.Envelope.Type.UNIDENTIFIED_SENDER
);
expect(envelope.source).to.equal(
'',
'envelope source should be empty in UNIDENTIFIED_SENDER'
);
expect(envelope.type).to.equal(SignalService.Envelope.Type.CIPHERTEXT);
expect(envelope.source).to.equal(ourNumber);
expect(envelope.sourceDevice).to.equal(1);
expect(toNumber(envelope.timestamp)).to.equal(timestamp);
expect(envelope.content).to.deep.equal(plainTextBuffer);
});
describe('UNIDENTIFIED_SENDER', () => {
it('should set the envelope source to be empty', async () => {
messageEncyrptReturnEnvelopeType =
SignalService.Envelope.Type.UNIDENTIFIED_SENDER;
// This test assumes the encryption stub returns the plainText passed into it.
const plainTextBuffer = crypto.randomBytes(10);
const timestamp = Date.now();
await MessageSender.send({
identifier: '1',
device: '0',
plainTextBuffer,
encryption: EncryptionType.Signal,
timestamp,
ttl: 1,
});
const data = lokiMessageAPIStub.sendMessage.getCall(0).args[1];
const webSocketMessage = SignalService.WebSocketMessage.decode(data);
expect(webSocketMessage.request?.body).to.not.equal(
undefined,
'Request body should not be undefined'
);
expect(webSocketMessage.request?.body).to.not.equal(
null,
'Request body should not be null'
);
const envelope = SignalService.Envelope.decode(
webSocketMessage.request?.body as Uint8Array
);
expect(envelope.type).to.equal(
SignalService.Envelope.Type.UNIDENTIFIED_SENDER
);
expect(envelope.source).to.equal(
'',
'envelope source should be empty in UNIDENTIFIED_SENDER'
);
});
});
});
});

@ -398,6 +398,11 @@
dependencies:
redux "^3.6.0"
"@types/retry@^0.12.0":
version "0.12.0"
resolved "https://registry.yarnpkg.com/@types/retry/-/retry-0.12.0.tgz#2b35eccfcee7d38cd72ad99232fbd58bffb3c84d"
integrity sha512-wWKOClTTiizcZhXnPY4wikVAwmdYHp8q6DmC+EJUzAMsycb7HB32Kh9RN4+0gExjmPmZSAQjgURXIGATPegAvA==
"@types/rimraf@2.0.2":
version "2.0.2"
resolved "https://registry.yarnpkg.com/@types/rimraf/-/rimraf-2.0.2.tgz#7f0fc3cf0ff0ad2a99bb723ae1764f30acaf8b6e"
@ -7086,6 +7091,14 @@ p-map@^1.1.1:
resolved "https://registry.yarnpkg.com/p-map/-/p-map-1.2.0.tgz#e4e94f311eabbc8633a1e79908165fca26241b6b"
integrity sha512-r6zKACMNhjPJMTl8KcFH4li//gkrXWfbD6feV8l6doRHlzljFWGJ2AP6iKaCJXyZmAUMOPtvbW7EXkbWO/pLEA==
p-retry@^4.2.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/p-retry/-/p-retry-4.2.0.tgz#ea9066c6b44f23cab4cd42f6147cdbbc6604da5d"
integrity sha512-jPH38/MRh263KKcq0wBNOGFJbm+U6784RilTmHjB/HM9kH9V8WlCpVUcdOmip9cjXOh6MxZ5yk1z2SjDUJfWmA==
dependencies:
"@types/retry" "^0.12.0"
retry "^0.12.0"
p-try@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"
@ -8823,6 +8836,11 @@ ret@~0.1.10:
resolved "https://registry.yarnpkg.com/ret/-/ret-0.1.15.tgz#b8a4825d5bdb1fc3f6f53c2bc33f81388681c7bc"
integrity sha512-TTlYpa+OL+vMMNG24xSlQGEJ3B/RzEfUlLct7b5G/ytav+wPrplCpVMFuwzXbkecJrb6IYo1iFb0S9v37754mg==
retry@^0.12.0:
version "0.12.0"
resolved "https://registry.yarnpkg.com/retry/-/retry-0.12.0.tgz#1b42a6266a21f07421d1b0b54b7dc167b01c013b"
integrity sha1-G0KmJmoh8HQh0bC1S33BZ7AcATs=
rgb2hex@^0.1.9:
version "0.1.10"
resolved "https://registry.yarnpkg.com/rgb2hex/-/rgb2hex-0.1.10.tgz#4fdd432665273e2d5900434940ceba0a04c8a8a8"

Loading…
Cancel
Save