diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 102ea3090..dedb833aa 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -9,6 +9,28 @@ const { rpc } = require('./loki_rpc'); const MINIMUM_SUCCESSFUL_REQUESTS = 2; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; +async function sleep_for(time) { + return new Promise(resolve => { + setTimeout(() => resolve(), time); + }); +} + +const filterIncomingMessages = async messages => { + const incomingHashes = messages.map(m => m.hash); + const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( + incomingHashes + ); + const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); + if (newMessages.length) { + const newHashes = newMessages.map(m => ({ + expiresAt: m.expiration, + hash: m.hash, + })); + await window.Signal.Data.saveSeenMessageHashes(newHashes); + } + return newMessages; +}; + class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; @@ -155,6 +177,90 @@ class LokiMessageAPI { log.info(`Successful storage message to ${pubKey}`); } + async *retrieveNextMessage(nodeUrl) { + const params = { + pubKey: ourKey, + lastHash: nodeData.lastHash || '', + }; + const options = { + timeout: 40000, + headers: { + [LOKI_LONGPOLL_HEADER]: true, + }, + }; + while (true) { + const result = await rpc( + `http://${nodeUrl}`, + this.snodeServerPort, + 'retrieve', + params, + options + ); + if (Array.isArray(result.messages) && result.messages.length) { + const filteredMessages = await this.jobQueue.add(() => + filterIncomingMessages(result.messages) + ); + if (filteredMessages.length) { + yield filteredMessages; + } + } + } + } + + async openConnection(callback) { + while (this.ourSwarmNodes.length > 0) { + const url = this.ourSwarmNodes.pop(); + const successive_failures = 0; + while (true) { + // loop breaks upon error + try { + for await (let messages of retrieveNextMessages(url)) { + const lastMessage = _.last(message.messages); + lokiSnodeAPI.updateLastHash( + url, + lastMessage.hash, + lastMessage.expiration + ); + callback(messages); + successive_failures = 0; + } + } catch (e) { + log.warn('Loki retrieve messages:', e); + if (e instanceof textsecure.WrongSwarmError) { + const { newSwarm } = e; + await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + // Try another snode + break; + } else if (e instanceof textsecure.NotFoundError) { + // DNS/Lokinet error, needs to bubble up + throw new window.textsecure.DNSResolutionError('Retrieving messages'); + } + } + + successive_failures += 1; + + if (successive_failures >= 3) + // Try another snode + break; + + await sleep_for(successive_failures * 1000); + } + } + } + + async startLongPolling(numConnections, callback) { + this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + + for (let i = 0; i < numConnections; i += 1) + promises.push(this.openConnection(callback)); + + // blocks until all snodes in our swarms have been removed from the list + // or if there is network issues (ENOUTFOUND due to lokinet) + await Promise.all(promises); + } + + // stale function, kept around to reduce diff noise + // TODO: remove async retrieveMessages(callback) { const ourKey = window.textsecure.storage.user.getNumber(); const completedNodes = []; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index e70901349..c3b009740 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -3,8 +3,8 @@ // eslint-disable-next-line func-names (function() { let server; - const SUCCESS_POLL_TIME = 100; - const FAIL_POLL_TIME = 2000; + const EXHAUSTED_SNODES_RETRY_DELAY = 5000; + const NUM_CONCURRENT_CONNECTIONS = 3; function stringToArrayBufferBase64(string) { return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer(); @@ -78,24 +78,31 @@ } }; + // Note: calling callback(false) is currently not necessary this.pollServer = async callback => { + // This blocking call will return only when all attempts + // at reaching snodes are exhausted or a DNS error occured try { - await server.retrieveMessages(messages => { - messages.forEach(message => { - const { data } = message; - this.handleMessage(data); - }); - }); - connected = true; - } catch (err) { - window.log.error('Polling error: ', err); - connected = false; + await server.startLongPolling( + NUM_CONCURRENT_CONNECTIONS, + messages => { + connected = true; + callback(connected); + messages.forEach(message => { + const { data } = message; + this.handleMessage(data); + }); + } + ); + } catch(e) { + // we'll try again anyway } - const pollTime = connected ? SUCCESS_POLL_TIME : FAIL_POLL_TIME; - callback(connected); + + connected = false; + // Exhausted all our snodes urls, trying again later from scratch setTimeout(() => { this.pollServer(callback); - }, pollTime); + }, EXHAUSTED_SNODES_RETRY_DELAY); }; this.isConnected = function isConnected() {