From b43978ece153a410f0538e0845b2762894ee21aa Mon Sep 17 00:00:00 2001 From: Beaudan Date: Fri, 31 May 2019 13:58:01 +1000 Subject: [PATCH 1/4] Initial refactoring of sendmessage --- js/modules/loki_message_api.js | 229 +++++++++++++++--------------- js/modules/loki_snode_api.js | 28 +--- libtextsecure/outgoing_message.js | 2 + 3 files changed, 122 insertions(+), 137 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 8b442dfb7..d6c2e65e6 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -31,15 +31,64 @@ const filterIncomingMessages = async messages => { return newMessages; }; +const calcNonce = async (messageEventData, pubKey, data64, timestamp, ttl) => { + // Nonce is returned as a base64 string to include in header + try { + window.Whisper.events.trigger('calculatingPoW', messageEventData); + const development = window.getEnvironment() !== 'production'; + return callWorker( + 'calcPoW', + timestamp, + ttl, + pubKey, + data64, + development + ); + } catch (err) { + // Something went horribly wrong + throw err; + } +} + +const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { + const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); + if (!p2pDetails || (!isPing && !p2pDetails.isOnline)) { + return false; + } + try { + const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; + + await rpc(p2pDetails.address, port, 'store', { + data: data64, + }); + lokiP2pAPI.setContactOnline(pubKey); + window.Whisper.events.trigger('p2pMessageSent', messageEventData); + if (isPing) { + log.info(`Successfully pinged ${pubKey}`); + } else { + log.info(`Successful p2p message to ${pubKey}`); + } + return true; + } catch (e) { + lokiP2pAPI.setContactOffline(pubKey); + if (isPing) { + // If this was just a ping, we don't bother sending to storage server + log.warn('Ping failed, contact marked offline', e); + return true; + } + log.warn('Failed to send P2P message, falling back to storage', e); + return false; + } +} + class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; this.jobQueue = new window.JobQueue(); + this.sendingSwarmNodes = {}; } - async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { - const timestamp = Date.now(); - + async sendMessage(numConnections, pubKey, data, messageTimeStamp, ttl, isPing = false) { // Data required to identify a message in a conversation const messageEventData = { pubKey, @@ -47,134 +96,90 @@ class LokiMessageAPI { }; const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); - const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); - if (p2pDetails && (isPing || p2pDetails.isOnline)) { - try { - const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; - - await rpc(p2pDetails.address, port, 'store', { - data: data64, - }); - lokiP2pAPI.setContactOnline(pubKey); - window.Whisper.events.trigger('p2pMessageSent', messageEventData); - if (isPing) { - log.info(`Successfully pinged ${pubKey}`); - } else { - log.info(`Successful p2p message to ${pubKey}`); - } - return; - } catch (e) { - lokiP2pAPI.setContactOffline(pubKey); - if (isPing) { - // If this was just a ping, we don't bother sending to storage server - log.warn('Ping failed, contact marked offline', e); - return; - } - log.warn('Failed to send P2P message, falling back to storage', e); - } + const p2pSuccess = await trySendP2p(pubKey, data64, isPing, messageEventData); + if (p2pSuccess) { + return; } - // Nonce is returned as a base64 string to include in header - let nonce; - try { - window.Whisper.events.trigger('calculatingPoW', messageEventData); - const development = window.getEnvironment() !== 'production'; - nonce = await callWorker( - 'calcPoW', - timestamp, - ttl, - pubKey, - data64, - development - ); - } catch (err) { - // Something went horribly wrong - throw err; + const timestamp = Date.now(); + const nonce = await calcNonce(messageEventData, pubKey, data64, timestamp, ttl); + // Using timestamp as a unique identifier + this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); + if (this.sendingSwarmNodes[timestamp].length < numConnections) { + const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); + await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes); + this.sendingSwarmNodes[timestamp] = freshNodes; } - const completedNodes = []; - const failedNodes = []; - let successfulRequests = 0; - let canResolve = true; - - let swarmNodes = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); - - const nodeComplete = nodeUrl => { - completedNodes.push(nodeUrl); - swarmNodes = swarmNodes.filter(node => node !== nodeUrl); + const params = { + pubKey, + ttl: ttl.toString(), + nonce, + timestamp: timestamp.toString(), + data: data64, }; + const promises = []; + for (let i = 0; i < numConnections; i += 1) { + promises.push(this.openSendConnection(params)); + } - const doRequest = async nodeUrl => { - const params = { + const results = await Promise.all(promises); + delete this.sendingSwarmNodes[timestamp]; + if (results.every(value => value === false)) { + throw new window.textsecure.EmptySwarmError( pubKey, - ttl: ttl.toString(), - nonce, - timestamp: timestamp.toString(), - data: data64, - }; + 'Ran out of swarm nodes to query' + ); + } + if (results.every(value => value === true)) { + log.info(`Successful storage message to ${pubKey}`); + } else { + log.warn(`Partially successful storage message to ${pubKey}`); + } + } - try { - await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params); + async openSendConnection(params) { + while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) { + const url = this.sendingSwarmNodes[params.timestamp].shift(); + const successfulSend = await this.sendToNode(url, params); + if (successfulSend) { + return true; + } + } + return false; + } - nodeComplete(nodeUrl); - successfulRequests += 1; + async sendToNode(url, params) { + let successiveFailures = 0; + while (successiveFailures < 3) { + await sleepFor(successiveFailures * 500); + try { + await rpc(`http://${url}`, this.snodeServerPort, 'store', params); + return true; } catch (e) { log.warn('Loki send message:', e); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; - await lokiSnodeAPI.updateSwarmNodes(pubKey, newSwarm); - completedNodes.push(nodeUrl); + await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm); + this.sendingSwarmNodes[params.timestamp] = newSwarm; + return false; } else if (e instanceof textsecure.NotFoundError) { - canResolve = false; + // TODO: Handle resolution error + successiveFailures += 1; } else if (e instanceof textsecure.HTTPError) { - // We mark the node as complete as we could still reach it - nodeComplete(nodeUrl); + // TODO: Handle working connection but error response + successiveFailures += 1; } else { - const removeNode = await lokiSnodeAPI.unreachableNode( - pubKey, - nodeUrl - ); - if (removeNode) { - log.error('Loki send message:', e); - nodeComplete(nodeUrl); - failedNodes.push(nodeUrl); - } - } - } - }; - - while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { - if (!canResolve) { - throw new window.textsecure.DNSResolutionError('Sending messages'); - } - if (swarmNodes.length === 0) { - const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); - const goodNodes = _.difference(freshNodes, failedNodes); - await lokiSnodeAPI.updateSwarmNodes(pubKey, goodNodes); - swarmNodes = _.difference(freshNodes, completedNodes); - if (swarmNodes.length === 0) { - if (successfulRequests !== 0) { - // TODO: Decide how to handle some completed requests but not enough - log.warn(`Partially successful storage message to ${pubKey}`); - return; - } - throw new window.textsecure.EmptySwarmError( - pubKey, - 'Ran out of swarm nodes to query' - ); + successiveFailures += 1; } } - - const remainingRequests = - MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests; - - await Promise.all( - swarmNodes - .splice(0, remainingRequests) - .map(nodeUrl => doRequest(nodeUrl)) - ); } - log.info(`Successful storage message to ${pubKey}`); + log.error(`Failed to send to node: ${url}`); + await lokiSnodeAPI.unreachableNode( + params.pubKey, + url + ); + return false; } async retrieveNextMessages(nodeUrl, nodeData, ourKey) { diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 79667f13f..88a92b35d 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -73,29 +73,10 @@ class LokiSnodeAPI { async unreachableNode(pubKey, nodeUrl) { if (pubKey === window.textsecure.storage.user.getNumber()) { - if (!this.ourSwarmNodes[nodeUrl]) { - this.ourSwarmNodes[nodeUrl] = { - failureCount: 1, - }; - } else { - this.ourSwarmNodes[nodeUrl].failureCount += 1; - } - if (this.ourSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) { - return false; - } delete this.ourSwarmNodes[nodeUrl]; - return true; - } - if (!this.contactSwarmNodes[nodeUrl]) { - this.contactSwarmNodes[nodeUrl] = { - failureCount: 1, - }; - } else { - this.contactSwarmNodes[nodeUrl].failureCount += 1; - } - if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) { - return false; + return; } + const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; if (swarmNodes.includes(nodeUrl)) { @@ -103,14 +84,12 @@ class LokiSnodeAPI { await conversation.updateSwarmNodes(filteredNodes); delete this.contactSwarmNodes[nodeUrl]; } - return true; } async updateLastHash(nodeUrl, lastHash, expiresAt) { await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); if (!this.ourSwarmNodes[nodeUrl]) { this.ourSwarmNodes[nodeUrl] = { - failureCount: 0, lastHash, }; } else { @@ -118,7 +97,7 @@ class LokiSnodeAPI { } } - async getSwarmNodesForPubKey(pubKey) { + getSwarmNodesForPubKey(pubKey) { try { const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; @@ -146,7 +125,6 @@ class LokiSnodeAPI { const ps = newNodes.map(async url => { const lastHash = await window.Signal.Data.getLastHashBySnode(url); this.ourSwarmNodes[url] = { - failureCount: 0, lastHash, }; }); diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index bf30cdf69..5d2610f17 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -187,7 +187,9 @@ OutgoingMessage.prototype = { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60 * 1000) { const pubKey = number; try { + // TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant await lokiMessageAPI.sendMessage( + 2, pubKey, data, timestamp, From 709db4bf54b7f6a7e2ad08db2c95b04afd0f513e Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 3 Jun 2019 10:26:42 +1000 Subject: [PATCH 2/4] Make sendMessage take options, remove redundant retrieve function and get constant --- js/modules/loki_message_api.js | 114 +----------------------------- libtextsecure/outgoing_message.js | 8 ++- 2 files changed, 9 insertions(+), 113 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index d6c2e65e6..a53fcefca 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -5,8 +5,7 @@ const _ = require('lodash'); const { rpc } = require('./loki_rpc'); -// Will be raised (to 3?) when we get more nodes -const MINIMUM_SUCCESSFUL_REQUESTS = 2; +const DEFAULT_CONNECTIONS = 2; const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; function sleepFor(time) { @@ -88,7 +87,8 @@ class LokiMessageAPI { this.sendingSwarmNodes = {}; } - async sendMessage(numConnections, pubKey, data, messageTimeStamp, ttl, isPing = false) { + async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) { + const { isPing = false, numConnections = DEFAULT_CONNECTIONS } = options; // Data required to identify a message in a conversation const messageEventData = { pubKey, @@ -267,114 +267,6 @@ class LokiMessageAPI { await Promise.all(promises); } - // stale function, kept around to reduce diff noise - // TODO: remove - async retrieveMessages(callback) { - const ourKey = window.textsecure.storage.user.getNumber(); - const completedNodes = []; - let canResolve = true; - let successfulRequests = 0; - - let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); - - const nodeComplete = nodeUrl => { - completedNodes.push(nodeUrl); - delete ourSwarmNodes[nodeUrl]; - }; - - const doRequest = async (nodeUrl, nodeData) => { - const params = { - pubKey: ourKey, - lastHash: nodeData.lastHash || '', - }; - const options = { - timeout: 40000, - headers: { - [LOKI_LONGPOLL_HEADER]: true, - }, - }; - - try { - const result = await rpc( - `http://${nodeUrl}`, - this.snodeServerPort, - 'retrieve', - params, - options - ); - - nodeComplete(nodeUrl); - successfulRequests += 1; - - if (Array.isArray(result.messages) && result.messages.length) { - const lastMessage = _.last(result.messages); - lokiSnodeAPI.updateLastHash( - nodeUrl, - lastMessage.hash, - lastMessage.expiration - ); - const filteredMessages = await this.jobQueue.add(() => - filterIncomingMessages(result.messages) - ); - if (filteredMessages.length) { - callback(filteredMessages); - } - } - } catch (e) { - log.warn('Loki retrieve messages:', e); - if (e instanceof textsecure.WrongSwarmError) { - const { newSwarm } = e; - await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); - completedNodes.push(nodeUrl); - } else if (e instanceof textsecure.NotFoundError) { - canResolve = false; - } else if (e instanceof textsecure.HTTPError) { - // We mark the node as complete as we could still reach it - nodeComplete(nodeUrl); - } else { - const removeNode = await lokiSnodeAPI.unreachableNode( - ourKey, - nodeUrl - ); - if (removeNode) { - log.error('Loki retrieve messages:', e); - nodeComplete(nodeUrl); - } - } - } - }; - - while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { - if (!canResolve) { - throw new window.textsecure.DNSResolutionError('Retrieving messages'); - } - if (Object.keys(ourSwarmNodes).length === 0) { - ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); - // Filter out the nodes we have already got responses from - completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]); - - if (Object.keys(ourSwarmNodes).length === 0) { - if (successfulRequests !== 0) { - // TODO: Decide how to handle some completed requests but not enough - return; - } - throw new window.textsecure.EmptySwarmError( - ourKey, - 'Ran out of swarm nodes to query' - ); - } - } - - const remainingRequests = - MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests; - - await Promise.all( - Object.entries(ourSwarmNodes) - .splice(0, remainingRequests) - .map(([nodeUrl, nodeData]) => doRequest(nodeUrl, nodeData)) - ); - } - } } module.exports = LokiMessageAPI; diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index 5d2610f17..26b66bc35 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -12,6 +12,7 @@ /* eslint-disable more/no-then */ /* eslint-disable no-unreachable */ +const NUM_SEND_CONNECTIONS = 2; function OutgoingMessage( server, @@ -188,13 +189,16 @@ OutgoingMessage.prototype = { const pubKey = number; try { // TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant + const options = { + numConnections: NUM_SEND_CONNECTIONS, + isPing: this.isPing, + } await lokiMessageAPI.sendMessage( - 2, pubKey, data, timestamp, ttl, - this.isPing + options ); } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { From c02d5d4053e5cd3850c9cec8d737b43b7e9765f3 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 3 Jun 2019 10:29:11 +1000 Subject: [PATCH 3/4] Lint --- js/modules/loki_message_api.js | 53 ++++++++++++++----------------- js/modules/loki_snode_api.js | 1 - libtextsecure/outgoing_message.js | 10 ++---- 3 files changed, 25 insertions(+), 39 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index a53fcefca..be09c24c6 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -32,22 +32,10 @@ const filterIncomingMessages = async messages => { const calcNonce = async (messageEventData, pubKey, data64, timestamp, ttl) => { // Nonce is returned as a base64 string to include in header - try { - window.Whisper.events.trigger('calculatingPoW', messageEventData); - const development = window.getEnvironment() !== 'production'; - return callWorker( - 'calcPoW', - timestamp, - ttl, - pubKey, - data64, - development - ); - } catch (err) { - // Something went horribly wrong - throw err; - } -} + window.Whisper.events.trigger('calculatingPoW', messageEventData); + const development = window.getEnvironment() !== 'production'; + return callWorker('calcPoW', timestamp, ttl, pubKey, data64, development); +}; const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); @@ -78,7 +66,7 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { log.warn('Failed to send P2P message, falling back to storage', e); return false; } -} +}; class LokiMessageAPI { constructor({ snodeServerPort }) { @@ -96,15 +84,28 @@ class LokiMessageAPI { }; const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); - const p2pSuccess = await trySendP2p(pubKey, data64, isPing, messageEventData); + const p2pSuccess = await trySendP2p( + pubKey, + data64, + isPing, + messageEventData + ); if (p2pSuccess) { return; } const timestamp = Date.now(); - const nonce = await calcNonce(messageEventData, pubKey, data64, timestamp, ttl); + const nonce = await calcNonce( + messageEventData, + pubKey, + data64, + timestamp, + ttl + ); // Using timestamp as a unique identifier - this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); + this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey( + pubKey + ); if (this.sendingSwarmNodes[timestamp].length < numConnections) { const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes); @@ -175,10 +176,7 @@ class LokiMessageAPI { } } log.error(`Failed to send to node: ${url}`); - await lokiSnodeAPI.unreachableNode( - params.pubKey, - url - ); + await lokiSnodeAPI.unreachableNode(params.pubKey, url); return false; } @@ -215,11 +213,7 @@ class LokiMessageAPI { await sleepFor(successiveFailures * 1000); try { - let messages = await this.retrieveNextMessages( - url, - nodeData, - ourKey - ); + let messages = await this.retrieveNextMessages(url, nodeData, ourKey); successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); @@ -266,7 +260,6 @@ class LokiMessageAPI { // or if there is network issues (ENOUTFOUND due to lokinet) await Promise.all(promises); } - } module.exports = LokiMessageAPI; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 88a92b35d..1432e20a6 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -8,7 +8,6 @@ const { rpc } = require('./loki_rpc'); // Will be raised (to 3?) when we get more nodes const MINIMUM_SWARM_NODES = 1; -const FAILURE_THRESHOLD = 3; const resolve4 = url => new Promise((resolve, reject) => { diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index 26b66bc35..5700b860d 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -192,14 +192,8 @@ OutgoingMessage.prototype = { const options = { numConnections: NUM_SEND_CONNECTIONS, isPing: this.isPing, - } - await lokiMessageAPI.sendMessage( - pubKey, - data, - timestamp, - ttl, - options - ); + }; + await lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl, options); } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { // 409 and 410 should bubble and be handled by doSendMessage From 18a87d5463ba80af57805da0088ccf532482ad2d Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 3 Jun 2019 13:35:58 +1000 Subject: [PATCH 4/4] Review changes --- js/modules/loki_message_api.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index be09c24c6..b5e0b9ffe 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -30,7 +30,7 @@ const filterIncomingMessages = async messages => { return newMessages; }; -const calcNonce = async (messageEventData, pubKey, data64, timestamp, ttl) => { +const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => { // Nonce is returned as a base64 string to include in header window.Whisper.events.trigger('calculatingPoW', messageEventData); const development = window.getEnvironment() !== 'production';