Refactor long polling for better concurrent requests

pull/270/head
sachaaaaa 6 years ago
parent e99cec9c09
commit 09a9cfbf37

@ -9,6 +9,28 @@ const { rpc } = require('./loki_rpc');
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
async function sleep_for(time) {
return new Promise(resolve => {
setTimeout(() => resolve(), time);
});
}
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
class LokiMessageAPI {
constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
@ -155,6 +177,90 @@ class LokiMessageAPI {
log.info(`Successful storage message to ${pubKey}`);
}
async *retrieveNextMessage(nodeUrl) {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
while (true) {
const result = await rpc(
`http://${nodeUrl}`,
this.snodeServerPort,
'retrieve',
params,
options
);
if (Array.isArray(result.messages) && result.messages.length) {
const filteredMessages = await this.jobQueue.add(() =>
filterIncomingMessages(result.messages)
);
if (filteredMessages.length) {
yield filteredMessages;
}
}
}
}
async openConnection(callback) {
while (this.ourSwarmNodes.length > 0) {
const url = this.ourSwarmNodes.pop();
const successive_failures = 0;
while (true) {
// loop breaks upon error
try {
for await (let messages of retrieveNextMessages(url)) {
const lastMessage = _.last(message.messages);
lokiSnodeAPI.updateLastHash(
url,
lastMessage.hash,
lastMessage.expiration
);
callback(messages);
successive_failures = 0;
}
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
// Try another snode
break;
} else if (e instanceof textsecure.NotFoundError) {
// DNS/Lokinet error, needs to bubble up
throw new window.textsecure.DNSResolutionError('Retrieving messages');
}
}
successive_failures += 1;
if (successive_failures >= 3)
// Try another snode
break;
await sleep_for(successive_failures * 1000);
}
}
}
async startLongPolling(numConnections, callback) {
this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
for (let i = 0; i < numConnections; i += 1)
promises.push(this.openConnection(callback));
// blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises);
}
// stale function, kept around to reduce diff noise
// TODO: remove
async retrieveMessages(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
const completedNodes = [];

@ -3,8 +3,8 @@
// eslint-disable-next-line func-names
(function() {
let server;
const SUCCESS_POLL_TIME = 100;
const FAIL_POLL_TIME = 2000;
const EXHAUSTED_SNODES_RETRY_DELAY = 5000;
const NUM_CONCURRENT_CONNECTIONS = 3;
function stringToArrayBufferBase64(string) {
return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer();
@ -78,24 +78,31 @@
}
};
// Note: calling callback(false) is currently not necessary
this.pollServer = async callback => {
// This blocking call will return only when all attempts
// at reaching snodes are exhausted or a DNS error occured
try {
await server.retrieveMessages(messages => {
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
});
connected = true;
} catch (err) {
window.log.error('Polling error: ', err);
connected = false;
await server.startLongPolling(
NUM_CONCURRENT_CONNECTIONS,
messages => {
connected = true;
callback(connected);
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
}
);
} catch(e) {
// we'll try again anyway
}
const pollTime = connected ? SUCCESS_POLL_TIME : FAIL_POLL_TIME;
callback(connected);
connected = false;
// Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => {
this.pollServer(callback);
}, pollTime);
}, EXHAUSTED_SNODES_RETRY_DELAY);
};
this.isConnected = function isConnected() {

Loading…
Cancel
Save