diff --git a/config/default.json b/config/default.json index 8b7d786f9..0699b7a06 100644 --- a/config/default.json +++ b/config/default.json @@ -3,8 +3,7 @@ "localUrl": "localhost.loki", "cdnUrl": "random.snode", "localServerPort": "8081", - "messageServerPort": "8080", - "swarmServerPort": "8079", + "snodeServerPort": "8080", "disableAutoUpdate": false, "openDevTools": false, "buildExpiration": 0, diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 4ec1b1b41..ca7404871 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -1,121 +1,16 @@ /* eslint-disable no-await-in-loop */ /* eslint-disable no-loop-func */ -/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, libloki */ +/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, textsecure */ -const nodeFetch = require('node-fetch'); const _ = require('lodash'); -const { parse } = require('url'); - -const endpointBase = '/v1/storage_rpc'; -const LOKI_EPHEMKEY_HEADER = 'X-Loki-EphemKey'; - -class HTTPError extends Error { - constructor(response) { - super(response.statusText); - this.name = 'HTTPError'; - this.response = response; - } -} - -class NotFoundError extends Error { - constructor() { - super('ENOTFOUND'); - this.name = 'NotFoundError'; - } -} - -// A small wrapper around node-fetch which deserializes response -const fetch = async (url, options = {}) => { - const timeout = options.timeout || 10000; - const method = options.method || 'GET'; - - const address = parse(url).hostname; - const doEncryptChannel = address.endsWith('.snode'); - if (doEncryptChannel) { - try { - // eslint-disable-next-line no-param-reassign - options.body = await libloki.crypto.snodeCipher.encrypt( - address, - options.body - ); - // eslint-disable-next-line no-param-reassign - options.headers = { - ...options.headers, - 'Content-Type': 'text/plain', - [LOKI_EPHEMKEY_HEADER]: libloki.crypto.snodeCipher.getChannelPublicKeyHex(), - }; - } catch (e) { - log.warn(`Could not encrypt channel for ${address}: `, e); - } - } - - try { - const response = await nodeFetch(url, { - ...options, - timeout, - method, - }); - - if (!response.ok) { - throw new HTTPError(response); - } - - let result; - if (response.headers.get('Content-Type') === 'application/json') { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); - if (doEncryptChannel) { - try { - result = await libloki.crypto.snodeCipher.decrypt(address, result); - } catch (e) { - log.warn(`Could not decrypt response from ${address}`, e); - } - try { - result = JSON.parse(result); - } catch (e) { - log.warn(`Could not parse string to json ${result}`, e); - } - } - } - - return result; - } catch (e) { - if (e.code === 'ENOTFOUND') { - throw new NotFoundError(); - } - - throw e; - } -}; - -// Wrapper for a JSON RPC request -const rpc = (address, port, method, params, options = {}) => { - const headers = options.headers || {}; - const url = `${address}${port}${endpointBase}`; - const body = { - method, - params, - }; - - const fetchOptions = { - method: 'POST', - ...options, - body: JSON.stringify(body), - headers, - }; - - return fetch(url, fetchOptions); -}; +const { rpc } = require('./loki_rpc'); // Will be raised (to 3?) when we get more nodes const MINIMUM_SUCCESSFUL_REQUESTS = 2; class LokiMessageAPI { - constructor({ messageServerPort }) { - this.messageServerPort = messageServerPort ? `:${messageServerPort}` : ''; + constructor({ snodeServerPort }) { + this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; } async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { @@ -195,15 +90,19 @@ class LokiMessageAPI { }; try { - await rpc(nodeUrl, this.messageServerPort, 'store', params); + await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params); nodeComplete(nodeUrl); successfulRequests += 1; } catch (e) { log.warn('Loki send message:', e); - if (e instanceof NotFoundError) { + if (e instanceof textsecure.WrongSwarmError) { + const { newSwarm } = e; + await lokiSnodeAPI.updateSwarmNodes(pubKey, newSwarm); + completedNodes.push(nodeUrl); + } else if (e instanceof textsecure.NotFoundError) { canResolve = false; - } else if (e instanceof HTTPError) { + } else if (e instanceof textsecure.HTTPError) { // We mark the node as complete as we could still reach it nodeComplete(nodeUrl); } else { @@ -270,29 +169,34 @@ class LokiMessageAPI { const doRequest = async (nodeUrl, nodeData) => { const params = { pubKey: ourKey, - lastHash: nodeData.lastHash, + lastHash: nodeData.lastHash || '', }; try { const result = await rpc( - nodeUrl, - this.messageServerPort, + `http://${nodeUrl}`, + this.snodeServerPort, 'retrieve', params ); nodeComplete(nodeUrl); - if (result.lastHash) { - lokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash); + if (Array.isArray(result.messages) && result.messages.length) { + const lastHash = _.last(result.messages).hash; + lokiSnodeAPI.updateLastHash(nodeUrl, lastHash); callback(result.messages); } successfulRequests += 1; } catch (e) { log.warn('Loki retrieve messages:', e); - if (e instanceof NotFoundError) { + if (e instanceof textsecure.WrongSwarmError) { + const { newSwarm } = e; + lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + completedNodes.push(nodeUrl); + } else if (e instanceof textsecure.NotFoundError) { canResolve = false; - } else if (e instanceof HTTPError) { + } else if (e instanceof textsecure.HTTPError) { // We mark the node as complete as we could still reach it nodeComplete(nodeUrl); } else { diff --git a/js/modules/loki_rpc.js b/js/modules/loki_rpc.js new file mode 100644 index 000000000..378f948a7 --- /dev/null +++ b/js/modules/loki_rpc.js @@ -0,0 +1,116 @@ +/* global log, libloki, textsecure */ + +const nodeFetch = require('node-fetch'); +const { parse } = require('url'); + +const LOKI_EPHEMKEY_HEADER = 'X-Loki-EphemKey'; +const endpointBase = '/v1/storage_rpc'; + +// A small wrapper around node-fetch which deserializes response +const fetch = async (url, options = {}) => { + const timeout = options.timeout || 10000; + const method = options.method || 'GET'; + + const address = parse(url).hostname; + const doEncryptChannel = address.endsWith('.snode'); + if (doEncryptChannel) { + try { + // eslint-disable-next-line no-param-reassign + options.body = await libloki.crypto.snodeCipher.encrypt( + address, + options.body + ); + // eslint-disable-next-line no-param-reassign + options.headers = { + ...options.headers, + 'Content-Type': 'text/plain', + [LOKI_EPHEMKEY_HEADER]: libloki.crypto.snodeCipher.getChannelPublicKeyHex(), + }; + } catch (e) { + log.warn(`Could not encrypt channel for ${address}: `, e); + } + } + + try { + const response = await nodeFetch(url, { + ...options, + timeout, + method, + }); + + if (response.status === 421) { + let newSwarm = await response.text(); + if (doEncryptChannel) { + try { + newSwarm = await libloki.crypto.snodeCipher.decrypt( + address, + newSwarm + ); + } catch (e) { + log.warn(`Could not decrypt response from ${address}`, e); + } + try { + newSwarm = newSwarm === '' ? {} : JSON.parse(newSwarm); + } catch (e) { + log.warn(`Could not parse string to json ${newSwarm}`, e); + } + } + throw new textsecure.WrongSwarmError(newSwarm); + } + + if (!response.ok) { + throw new textsecure.HTTPError('Loki_rpc error', response); + } + + let result; + if (response.headers.get('Content-Type') === 'application/json') { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + if (doEncryptChannel) { + try { + result = await libloki.crypto.snodeCipher.decrypt(address, result); + } catch (e) { + log.warn(`Could not decrypt response from ${address}`, e); + } + try { + result = result === '' ? {} : JSON.parse(result); + } catch (e) { + log.warn(`Could not parse string to json ${result}`, e); + } + } + } + + return result; + } catch (e) { + if (e.code === 'ENOTFOUND') { + throw new textsecure.NotFoundError('Failed to resolve address', e); + } + throw e; + } +}; + +// Wrapper for a JSON RPC request +const rpc = (address, port, method, params, options = {}) => { + const headers = options.headers || {}; + const url = `${address}${port}${endpointBase}`; + const body = { + method, + params, + }; + + const fetchOptions = { + method: 'POST', + ...options, + body: JSON.stringify(body), + headers, + }; + + return fetch(url, fetchOptions); +}; + +module.exports = { + rpc, +}; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 0f83e7cc3..8edd758ba 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -1,10 +1,10 @@ /* eslint-disable class-methods-use-this */ /* global window, ConversationController */ -const fetch = require('node-fetch'); const is = require('@sindresorhus/is'); const dns = require('dns'); const process = require('process'); +const { rpc } = require('./loki_rpc'); // Will be raised (to 3?) when we get more nodes const MINIMUM_SWARM_NODES = 1; @@ -33,13 +33,13 @@ const resolveCname = url => }); class LokiSnodeAPI { - constructor({ serverUrl, localUrl, swarmServerPort }) { + constructor({ serverUrl, localUrl, snodeServerPort }) { if (!is.string(serverUrl)) { throw new Error('WebAPI.initialize: Invalid server url'); } this.serverUrl = serverUrl; this.localUrl = localUrl; - this.swarmServerPort = swarmServerPort ? `:${swarmServerPort}` : ''; + this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; this.swarmsPendingReplenish = {}; this.ourSwarmNodes = {}; this.contactSwarmNodes = {}; @@ -139,6 +139,15 @@ class LokiSnodeAPI { } } + updateOurSwarmNodes(newNodes) { + this.ourSwarmNodes = {}; + newNodes.forEach(url => { + this.ourSwarmNodes[url] = { + failureCount: 0, + }; + }); + } + async getOurSwarmNodes() { if ( !this.ourSwarmNodes || @@ -183,65 +192,17 @@ class LokiSnodeAPI { async getSwarmNodes(pubKey) { // TODO: Hit multiple random nodes and merge lists? - const node = await this.getRandomSnodeAddress(); - // TODO: Confirm final API URL and sensible timeout - const options = { - url: `http://${node}${this.swarmServerPort}/json_rpc`, - type: 'POST', - responseType: 'json', - timeout: 10000, - }; - - const body = { - jsonrpc: '2.0', - id: '0', - method: 'get_swarm_list_for_messenger_pubkey', - params: { - pubkey: pubKey, - }, - }; - - const fetchOptions = { - method: options.type, - body: JSON.stringify(body), - headers: { - 'Content-Type': 'application/json', - }, - timeout: options.timeout, - }; - - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - throw new window.textsecure.EmptySwarmError( - pubKey, - 'Could not retrieve swarm nodes' - ); - } + const nodeUrl = await this.getRandomSnodeAddress(); - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); - } - - // TODO: Handle wrong swarm error from snode - - if (!response.ok || !result.nodes || result.nodes === []) { - throw new window.textsecure.EmptySwarmError( + const result = await rpc( + `http://${nodeUrl}`, + this.snodeServerPort, + 'get_snodes_for_pubkey', + { pubKey, - 'Could not retrieve swarm nodes' - ); - } - - return result.nodes; + } + ); + return result.snodes; } } diff --git a/libtextsecure/errors.js b/libtextsecure/errors.js index 12aee4d25..78f9fcfb1 100644 --- a/libtextsecure/errors.js +++ b/libtextsecure/errors.js @@ -179,6 +179,49 @@ appendStack(this, resolutionError); } + function NotFoundError(message, error) { + this.name = 'NotFoundError'; + this.message = message; + this.error = error; + + Error.call(this, message); + + // Maintains proper stack trace, where our error was thrown (only available on V8) + // via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error + if (Error.captureStackTrace) { + Error.captureStackTrace(this); + } + + appendStack(this, error); + } + + function HTTPError(message, response) { + this.name = 'HTTPError'; + this.message = `${response.status} Error: ${message}`; + this.response = response; + + Error.call(this, message); + + // Maintains proper stack trace, where our error was thrown (only available on V8) + // via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error + if (Error.captureStackTrace) { + Error.captureStackTrace(this); + } + } + + function WrongSwarmError(newSwarm) { + this.name = 'WrongSwarmError'; + this.newSwarm = newSwarm; + + Error.call(this, this.name); + + // Maintains proper stack trace, where our error was thrown (only available on V8) + // via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error + if (Error.captureStackTrace) { + Error.captureStackTrace(this); + } + } + window.textsecure.UnregisteredUserError = UnregisteredUserError; window.textsecure.SendMessageNetworkError = SendMessageNetworkError; window.textsecure.IncomingIdentityKeyError = IncomingIdentityKeyError; @@ -191,4 +234,7 @@ window.textsecure.EmptySwarmError = EmptySwarmError; window.textsecure.DNSResolutionError = DNSResolutionError; window.textsecure.LokiIpError = LokiIpError; + window.textsecure.HTTPError = HTTPError; + window.textsecure.NotFoundError = NotFoundError; + window.textsecure.WrongSwarmError = WrongSwarmError; })(); diff --git a/main.js b/main.js index ebbed0fb8..fa4578769 100644 --- a/main.js +++ b/main.js @@ -147,8 +147,7 @@ function prepareURL(pathSegments, moreKeys) { serverUrl: config.get('serverUrl'), localUrl: config.get('localUrl'), cdnUrl: config.get('cdnUrl'), - messageServerPort: config.get('messageServerPort'), - swarmServerPort: config.get('swarmServerPort'), + snodeServerPort: config.get('snodeServerPort'), localServerPort: config.get('localServerPort'), certificateAuthority: config.get('certificateAuthority'), environment: config.environment, diff --git a/preload.js b/preload.js index 0942eaf55..64586c599 100644 --- a/preload.js +++ b/preload.js @@ -292,7 +292,7 @@ const LokiSnodeAPI = require('./js/modules/loki_snode_api'); window.lokiSnodeAPI = new LokiSnodeAPI({ serverUrl: config.serverUrl, localUrl: config.localUrl, - swarmServerPort: config.swarmServerPort, + snodeServerPort: config.snodeServerPort, }); window.LokiP2pAPI = require('./js/modules/loki_p2p_api'); @@ -301,7 +301,7 @@ const LokiMessageAPI = require('./js/modules/loki_message_api'); window.lokiMessageAPI = new LokiMessageAPI({ url: config.serverUrl, - messageServerPort: config.messageServerPort, + snodeServerPort: config.snodeServerPort, }); const LocalLokiServer = require('./libloki/modules/local_loki_server');