From 455bfa4ab754c33aef6ff46c2d0719c4fde89fea Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Wed, 18 Mar 2020 17:33:36 -0700 Subject: [PATCH] result guard, mark internal-only intended functions with _ prefix and simplify parameters, logging improvements --- js/modules/loki_message_api.js | 130 +++++++++++++++++++++------------ 1 file changed, 84 insertions(+), 46 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 88b02b7af..838bc5ef6 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -94,6 +94,7 @@ class LokiMessageAPI { await this.refreshSendingSwarm(pubKey, timestamp); } + // send parameters const params = { pubKey, ttl: ttl.toString(), @@ -104,7 +105,7 @@ class LokiMessageAPI { const promises = []; let completedConnections = 0; for (let i = 0; i < numConnections; i += 1) { - const connectionPromise = this.openSendConnection(params).finally(() => { + const connectionPromise = this._openSendConnection(params).finally(() => { completedConnections += 1; if (completedConnections >= numConnections) { delete this.sendingData[timestamp]; @@ -151,7 +152,7 @@ class LokiMessageAPI { 'Ran out of swarm nodes to query' ); } - log.info(`Successful storage message to ${pubKey}`); + log.info(`loki_message: Successfully stored message to ${pubKey}`); } async refreshSendingSwarm(pubKey, timestamp) { @@ -162,16 +163,11 @@ class LokiMessageAPI { return true; } - async openSendConnection(params) { + async _openSendConnection(params) { while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) { const snode = this.sendingData[params.timestamp].swarm.shift(); // TODO: Revert back to using snode address instead of IP - const successfulSend = await this.sendToNode( - snode.ip, - snode.port, - snode, - params - ); + const successfulSend = await this._sendToNode(snode, params); if (successfulSend) { return true; } @@ -189,19 +185,19 @@ class LokiMessageAPI { } await this.sendingData[params.timestamp].refreshPromise; // Retry with a fresh list again - return this.openSendConnection(params); + return this._openSendConnection(params); } return false; } - async sendToNode(address, port, targetNode, params) { + async _sendToNode(targetNode, params) { let successiveFailures = 0; while (successiveFailures < MAX_ACCEPTABLE_FAILURES) { await sleepFor(successiveFailures * 500); try { const result = await lokiRpc( - `https://${address}`, - port, + `https://${targetNode.ip}`, + targetNode.port, 'store', params, {}, @@ -211,17 +207,19 @@ class LokiMessageAPI { // Make sure we aren't doing too much PoW const currentDifficulty = window.storage.get('PoWDifficulty', null); - const newDifficulty = result.difficulty; - if (newDifficulty != null && newDifficulty !== currentDifficulty) { - window.storage.put('PoWDifficulty', newDifficulty); - } + if (result && result.difficulty) { + const newDifficulty = result.difficulty; + if (newDifficulty != null && newDifficulty !== currentDifficulty) { + window.storage.put('PoWDifficulty', newDifficulty); + } + } // else should we return false? return true; } catch (e) { log.warn( - 'Loki send message error:', + 'loki_message: send error:', e.code, e.message, - `from ${address}` + `destination ${targetNode.ip}:${targetNode.port}` ); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; @@ -238,26 +236,35 @@ class LokiMessageAPI { } else if (e instanceof textsecure.NotFoundError) { // TODO: Handle resolution error } else if (e instanceof textsecure.TimestampError) { - log.warn('Timestamp is invalid'); + log.warn('loki_message: Timestamp is invalid'); throw e; } else if (e instanceof textsecure.HTTPError) { // TODO: Handle working connection but error response const body = await e.response.text(); - log.warn('HTTPError body:', body); + log.warn('loki_message: HTTPError body:', body); } successiveFailures += 1; } } - log.error(`Failed to send to node: ${address}`); - await lokiSnodeAPI.unreachableNode(params.pubKey, address); + const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode( + params.pubKey, + targetNode + ); + log.error( + `loki_message: Too many successive failures trying to send to node ${ + targetNode.ip + }:${targetNode.port}, ${remainingSwarmSnodes.lengt} remaining swarm nodes` + ); return false; } - async openRetrieveConnection(stopPollingPromise, callback) { + async _openRetrieveConnection(stopPollingPromise, callback) { let stopPollingResult = false; + // When message_receiver restarts from onoffline/ononline events it closes // http-resources, which will then resolve the stopPollingPromise with true. We then // want to cancel these polling connections because new ones will be created + // eslint-disable-next-line more/no-then stopPollingPromise.then(result => { stopPollingResult = result; @@ -274,9 +281,13 @@ class LokiMessageAPI { ) { await sleepFor(successiveFailures * 1000); + // TODO: Revert back to using snode address instead of IP try { - // TODO: Revert back to using snode address instead of IP - let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); + // in general, I think we want exceptions to bubble up + // so the user facing UI can report unhandled errors + // except in this case of living inside http-resource pollServer + // because it just restarts more connections... + let messages = await this._retrieveNextMessages(nodeData); // this only tracks retrieval failures // won't include parsing failures... successiveFailures = 0; @@ -296,7 +307,7 @@ class LokiMessageAPI { callback(messages); } catch (e) { log.warn( - 'Loki retrieve messages error:', + 'loki_message: retrieve error:', e.code, e.message, `on ${nodeData.ip}:${nodeData.port}` @@ -324,40 +335,49 @@ class LokiMessageAPI { } } if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { + const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode( + this.ourKey, + nodeData + ); log.warn( - `removing ${nodeData.ip}:${ + `loki_message: removing ${nodeData.ip}:${ nodeData.port } from our swarm pool. We have ${ Object.keys(this.ourSwarmNodes).length - } usable swarm nodes left` + } usable swarm nodes left (${ + remainingSwarmSnodes.length + } in local db)` ); - await lokiSnodeAPI.unreachableNode(this.ourKey, address); } } // if not stopPollingResult if (_.isEmpty(this.ourSwarmNodes)) { log.error( - 'We no longer have any swarm nodes available to try in pool, closing retrieve connection' + 'loki_message: We no longer have any swarm nodes available to try in pool, closing retrieve connection' ); return false; } return true; } - async retrieveNextMessages(nodeUrl, nodeData) { + // we don't throw or catch here + // mark private (_ prefix) since no error handling is done here... + async _retrieveNextMessages(nodeData) { const params = { pubKey: this.ourKey, lastHash: nodeData.lastHash || '', }; const options = { timeout: 40000, + ourPubKey: this.ourKey, headers: { [LOKI_LONGPOLL_HEADER]: true, }, }; + // let exceptions bubble up const result = await lokiRpc( - `https://${nodeUrl}`, + `https://${nodeData.ip}`, nodeData.port, 'retrieve', params, @@ -365,34 +385,39 @@ class LokiMessageAPI { '/storage_rpc/v1', nodeData ); + return result.messages || []; } + // we don't throw or catch here async startLongPolling(numConnections, stopPolling, callback) { - log.info('startLongPolling for', numConnections, 'connections'); this.ourSwarmNodes = {}; + // load from local DB let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); - log.info('swarmNodes', nodes.length, 'for', this.ourKey); - Object.keys(nodes).forEach(j => { - const node = nodes[j]; - log.info(`${j} ${node.ip}:${node.port}`); - }); if (nodes.length < numConnections) { log.warn( - 'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' + 'loki_message: Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' ); + // load from blockchain nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); if (nodes.length < numConnections) { log.error( - 'Could not get enough SwarmNodes for our pubkey from blockchain' + 'loki_message: Could not get enough SwarmNodes for our pubkey from blockchain' ); } } log.info( - `There are currently ${ - nodes.length - } swarmNodes for pubKey in our local database` + 'loki_message: startLongPolling for', + numConnections, + 'connections. We have swarmNodes', + nodes.length, + 'for', + this.ourKey ); + Object.keys(nodes).forEach(j => { + const node = nodes[j]; + log.info(`loki_message: ${j} ${node.ip}:${node.port}`); + }); for (let i = 0; i < nodes.length; i += 1) { const lastHash = await window.Signal.Data.getLastHashBySnode( @@ -406,15 +431,28 @@ class LokiMessageAPI { const promises = []; + let unresolved = numConnections; for (let i = 0; i < numConnections; i += 1) { - promises.push(this.openRetrieveConnection(stopPolling, callback)); + promises.push( + // eslint-disable-next-line more/no-then + this._openRetrieveConnection(stopPolling, callback).then(() => { + unresolved -= 1; + log.info( + 'loki_message: There are', + unresolved, + 'open retrieve connections left' + ); + }) + ); } // blocks until numConnections snodes in our swarms have been removed from the list // less than numConnections being active is fine, only need to restart if none per Niels 20/02/13 // or if there is network issues (ENOUTFOUND due to lokinet) await Promise.all(promises); - log.error('All our long poll swarm connections have been removed'); + log.error( + 'loki_message: All our long poll swarm connections have been removed' + ); // should we just call ourself again? // no, our caller already handles this... }