diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index d6c2e65e6..a53fcefca 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -5,8 +5,7 @@ const _ = require('lodash'); const { rpc } = require('./loki_rpc'); -// Will be raised (to 3?) when we get more nodes -const MINIMUM_SUCCESSFUL_REQUESTS = 2; +const DEFAULT_CONNECTIONS = 2; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; function sleepFor(time) { @@ -88,7 +87,8 @@ class LokiMessageAPI { this.sendingSwarmNodes = {}; } - async sendMessage(numConnections, pubKey, data, messageTimeStamp, ttl, isPing = false) { + async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) { + const { isPing = false, numConnections = DEFAULT_CONNECTIONS } = options; // Data required to identify a message in a conversation const messageEventData = { pubKey, @@ -267,114 +267,6 @@ class LokiMessageAPI { await Promise.all(promises); } - // stale function, kept around to reduce diff noise - // TODO: remove - async retrieveMessages(callback) { - const ourKey = window.textsecure.storage.user.getNumber(); - const completedNodes = []; - let canResolve = true; - let successfulRequests = 0; - - let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); - - const nodeComplete = nodeUrl => { - completedNodes.push(nodeUrl); - delete ourSwarmNodes[nodeUrl]; - }; - - const doRequest = async (nodeUrl, nodeData) => { - const params = { - pubKey: ourKey, - lastHash: nodeData.lastHash || '', - }; - const options = { - timeout: 40000, - headers: { - [LOKI_LONGPOLL_HEADER]: true, - }, - }; - - try { - const result = await rpc( - `http://${nodeUrl}`, - this.snodeServerPort, - 'retrieve', - params, - options - ); - - nodeComplete(nodeUrl); - successfulRequests += 1; - - if (Array.isArray(result.messages) && result.messages.length) { - const lastMessage = _.last(result.messages); - lokiSnodeAPI.updateLastHash( - nodeUrl, - lastMessage.hash, - lastMessage.expiration - ); - const filteredMessages = await this.jobQueue.add(() => - filterIncomingMessages(result.messages) - ); - if (filteredMessages.length) { - callback(filteredMessages); - } - } - } catch (e) { - log.warn('Loki retrieve messages:', e); - if (e instanceof textsecure.WrongSwarmError) { - const { newSwarm } = e; - await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); - completedNodes.push(nodeUrl); - } else if (e instanceof textsecure.NotFoundError) { - canResolve = false; - } else if (e instanceof textsecure.HTTPError) { - // We mark the node as complete as we could still reach it - nodeComplete(nodeUrl); - } else { - const removeNode = await lokiSnodeAPI.unreachableNode( - ourKey, - nodeUrl - ); - if (removeNode) { - log.error('Loki retrieve messages:', e); - nodeComplete(nodeUrl); - } - } - } - }; - - while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { - if (!canResolve) { - throw new window.textsecure.DNSResolutionError('Retrieving messages'); - } - if (Object.keys(ourSwarmNodes).length === 0) { - ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); - // Filter out the nodes we have already got responses from - completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]); - - if (Object.keys(ourSwarmNodes).length === 0) { - if (successfulRequests !== 0) { - // TODO: Decide how to handle some completed requests but not enough - return; - } - throw new window.textsecure.EmptySwarmError( - ourKey, - 'Ran out of swarm nodes to query' - ); - } - } - - const remainingRequests = - MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests; - - await Promise.all( - Object.entries(ourSwarmNodes) - .splice(0, remainingRequests) - .map(([nodeUrl, nodeData]) => doRequest(nodeUrl, nodeData)) - ); - } - } } module.exports = LokiMessageAPI; diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index 5d2610f17..26b66bc35 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -12,6 +12,7 @@ /* eslint-disable more/no-then */ /* eslint-disable no-unreachable */ +const NUM_SEND_CONNECTIONS = 2; function OutgoingMessage( server, @@ -188,13 +189,16 @@ OutgoingMessage.prototype = { const pubKey = number; try { // TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant + const options = { + numConnections: NUM_SEND_CONNECTIONS, + isPing: this.isPing, + } await lokiMessageAPI.sendMessage( - 2, pubKey, data, timestamp, ttl, - this.isPing + options ); } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {