diff --git a/js/modules/.eslintrc b/js/modules/.eslintrc index b540ae7df..705203794 100644 --- a/js/modules/.eslintrc +++ b/js/modules/.eslintrc @@ -5,7 +5,8 @@ "node": false }, "globals": { - "console": true + "console": true, + "setTimeout": true }, "parserOptions": { "sourceType": "module" diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index dedb833aa..eb9f8e3b2 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -9,7 +9,7 @@ const { rpc } = require('./loki_rpc'); const MINIMUM_SUCCESSFUL_REQUESTS = 2; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; -async function sleep_for(time) { +function sleepFor(time) { return new Promise(resolve => { setTimeout(() => resolve(), time); }); @@ -177,7 +177,7 @@ class LokiMessageAPI { log.info(`Successful storage message to ${pubKey}`); } - async *retrieveNextMessage(nodeUrl) { + async retrieveNextMessages(nodeUrl, nodeData, ourKey) { const params = { pubKey: ourKey, lastHash: nodeData.lastHash || '', @@ -188,42 +188,52 @@ class LokiMessageAPI { [LOKI_LONGPOLL_HEADER]: true, }, }; - while (true) { - const result = await rpc( - `http://${nodeUrl}`, - this.snodeServerPort, - 'retrieve', - params, - options + + const result = await rpc( + `http://${nodeUrl}`, + this.snodeServerPort, + 'retrieve', + params, + options + ); + if (Array.isArray(result.messages) && result.messages.length) { + const filteredMessages = await this.jobQueue.add(() => + filterIncomingMessages(result.messages) ); - if (Array.isArray(result.messages) && result.messages.length) { - const filteredMessages = await this.jobQueue.add(() => - filterIncomingMessages(result.messages) - ); - if (filteredMessages.length) { - yield filteredMessages; - } + if (filteredMessages.length) { + return filteredMessages; } } + return []; } async openConnection(callback) { + const ourKey = window.textsecure.storage.user.getNumber(); while (this.ourSwarmNodes.length > 0) { - const url = this.ourSwarmNodes.pop(); - const successive_failures = 0; - while (true) { - // loop breaks upon error + const url = Object.keys(this.ourSwarmNodes)[0]; + const nodeData = this.ourSwarmNodes[url]; + delete this.ourSwarmNodes[url]; + let successiveFailures = 0; + while (successiveFailures < 3) { + await sleepFor(successiveFailures * 1000); + try { - for await (let messages of retrieveNextMessages(url)) { - const lastMessage = _.last(message.messages); + const messages = await this.retrieveNextMessages( + url, + nodeData, + ourKey + ); + successiveFailures = 0; + if (messages.length) { + const lastMessage = _.last(messages); + nodeData.lashHash = lastMessage.hash; lokiSnodeAPI.updateLastHash( url, lastMessage.hash, lastMessage.expiration ); - callback(messages); - successive_failures = 0; } + callback(messages); } catch (e) { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.WrongSwarmError) { @@ -233,17 +243,12 @@ class LokiMessageAPI { break; } else if (e instanceof textsecure.NotFoundError) { // DNS/Lokinet error, needs to bubble up - throw new window.textsecure.DNSResolutionError('Retrieving messages'); + throw new window.textsecure.DNSResolutionError( + 'Retrieving messages' + ); } + successiveFailures += 1; } - - successive_failures += 1; - - if (successive_failures >= 3) - // Try another snode - break; - - await sleep_for(successive_failures * 1000); } } } @@ -251,6 +256,8 @@ class LokiMessageAPI { async startLongPolling(numConnections, callback) { this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + const promises = []; + for (let i = 0; i < numConnections; i += 1) promises.push(this.openConnection(callback)); @@ -269,22 +276,6 @@ class LokiMessageAPI { let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); - const filterIncomingMessages = async messages => { - const incomingHashes = messages.map(m => m.hash); - const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( - incomingHashes - ); - const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); - if (newMessages.length) { - const newHashes = newMessages.map(m => ({ - expiresAt: m.expiration, - hash: m.hash, - })); - await window.Signal.Data.saveSeenMessageHashes(newHashes); - } - return newMessages; - }; - const nodeComplete = nodeUrl => { completedNodes.push(nodeUrl); delete ourSwarmNodes[nodeUrl]; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index c3b009740..9f3314c6b 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -83,18 +83,15 @@ // This blocking call will return only when all attempts // at reaching snodes are exhausted or a DNS error occured try { - await server.startLongPolling( - NUM_CONCURRENT_CONNECTIONS, - messages => { - connected = true; - callback(connected); - messages.forEach(message => { - const { data } = message; - this.handleMessage(data); - }); - } - ); - } catch(e) { + await server.startLongPolling(NUM_CONCURRENT_CONNECTIONS, messages => { + connected = true; + callback(connected); + messages.forEach(message => { + const { data } = message; + this.handleMessage(data); + }); + }); + } catch (e) { // we'll try again anyway }