diff --git a/config/default.json b/config/default.json index 2d4ec64a7..3b605aa6d 100644 --- a/config/default.json +++ b/config/default.json @@ -1,8 +1,8 @@ { - "serverUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", - "cdnUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", - "messageServerPort": ":8080", - "swarmServerPort": ":8079", + "serverUrl": "random.snode", + "cdnUrl": "random.snode", + "messageServerPort": "8080", + "swarmServerPort": "8079", "disableAutoUpdate": false, "openDevTools": false, "buildExpiration": 0, diff --git a/js/conversation_controller.js b/js/conversation_controller.js index ca0d5658b..3abe9561a 100644 --- a/js/conversation_controller.js +++ b/js/conversation_controller.js @@ -174,7 +174,7 @@ return conversation; } - window.libloki.replenishSwarm(id); + window.LokiSnodeAPI.replenishSwarm(id); try { await window.Signal.Data.saveConversation(conversation.attributes, { Conversation: Whisper.Conversation, diff --git a/js/models/conversations.js b/js/models/conversations.js index c18984757..a360f5f82 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -1200,9 +1200,6 @@ // Add the message sending on another queue so that our UI doesn't get blocked this.queueMessageSend(async () => { - if (this.get('swarmNodes').length === 0) { - await window.libloki.replenishSwarm(destination); - } message.send( this.wrapSend( sendFunction( diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 8f54e7a11..c80b72727 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -1,90 +1,25 @@ +/* eslint-disable no-await-in-loop */ /* global log, dcodeIO, window, callWorker */ const fetch = require('node-fetch'); -const is = require('@sindresorhus/is'); -class LokiServer { +// eslint-disable-next-line +const invert = p => new Promise((res, rej) => p.then(rej, res)); +const firstOf = ps => invert(Promise.all(ps.map(invert))); - constructor({ urls, messageServerPort, swarmServerPort }) { - this.nodes = []; - this.messageServerPort = messageServerPort; - this.swarmServerPort = swarmServerPort; - urls.forEach(url => { - if (!is.string(url)) { - throw new Error('WebAPI.initialize: Invalid server url'); - } - this.nodes.push({ url }); - }); - } - - async loadOurSwarm() { - const ourKey = window.textsecure.storage.user.getNumber(); - const nodeAddresses = await this.getSwarmNodes(ourKey); - this.ourSwarmNodes = []; - nodeAddresses.forEach(url => { - this.ourSwarmNodes.push({ url }); - }) - } - - async getSwarmNodes(pubKey) { - const currentNode = this.nodes[0]; - - const options = { - url: `${currentNode.url}${this.swarmServerPort}/json_rpc`, - type: 'POST', - responseType: 'json', - timeout: undefined, - }; - - const body = { - jsonrpc: '2.0', - id: '0', - method: 'get_swarm_list_for_messenger_pubkey', - params: { - pubkey: pubKey, - }, - } - - const fetchOptions = { - method: options.type, - body: JSON.stringify(body), - headers: { - 'Content-Type': 'application/json', - }, - timeout: options.timeout, - }; +// Will be raised (to 3?) when we get more nodes +const MINIMUM_SUCCESSFUL_REQUESTS = 2; +class LokiMessageAPI { - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - log.error(options.type, options.url, 0, 'Error'); - throw HTTPError('fetch error', 0, e.toString()); - } - - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); - } - - if (response.status >= 0 && response.status < 400) { - return result.nodes; - } - log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('sendMessage: error response', response.status, result); + constructor({ messageServerPort }) { + this.messageServerPort = messageServerPort + ? `:${messageServerPort}` + : ''; } async sendMessage(pubKey, data, messageTimeStamp, ttl) { - const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); + const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey) if (!swarmNodes || swarmNodes.length === 0) { - // TODO: Refresh the swarm nodes list throw Error('No swarm nodes to query!'); } @@ -101,110 +36,143 @@ class LokiServer { nonce = await callWorker('calcPoW', timestamp, ttl, pubKey, data64, development); } catch (err) { // Something went horribly wrong - // TODO: Handle gracefully throw err; } - const options = { - url: `${swarmNodes[0]}${this.messageServerPort}/store`, - type: 'POST', - responseType: undefined, - timeout: undefined, - }; - - const fetchOptions = { - method: options.type, - body: data64, - headers: { - 'X-Loki-pow-nonce': nonce, - 'X-Loki-timestamp': timestamp.toString(), - 'X-Loki-ttl': ttl.toString(), - 'X-Loki-recipient': pubKey, - }, - timeout: options.timeout, - }; - - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - log.error(options.type, options.url, 0, 'Error'); - throw HTTPError('fetch error', 0, e.toString()); - } + const requests = swarmNodes.map(async node => { + const options = { + url: `${node}${this.messageServerPort}/store`, + type: 'POST', + responseType: undefined, + timeout: undefined, + }; + + const fetchOptions = { + method: options.type, + body: data64, + headers: { + 'X-Loki-pow-nonce': nonce, + 'X-Loki-timestamp': timestamp.toString(), + 'X-Loki-ttl': ttl.toString(), + 'X-Loki-recipient': pubKey, + }, + timeout: options.timeout, + }; + + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + log.error(options.type, options.url, 0, 'Error sending message'); + window.LokiSnodeAPI.unreachableNode(pubKey, node); + throw HTTPError('fetch error', 0, e.toString()); + } - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); - } + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } - if (response.status >= 0 && response.status < 400) { + if (response.status >= 0 && response.status < 400) { + return result; + } + log.error(options.type, options.url, response.status, 'Error sending message'); + throw HTTPError('sendMessage: error response', response.status, result); + }); + try { + // TODO: Possibly change this to require more than a single response? + const result = await firstOf(requests); return result; + } catch(err) { + throw err; } - log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('sendMessage: error response', response.status, result); } - async retrieveMessages(pubKey) { - if (!this.ourSwarmNodes || this.ourSwarmNodes.length === 0) { - await this.loadOurSwarm(); - } - const currentNode = this.ourSwarmNodes[0]; - const options = { - url: `${currentNode.url}${this.messageServerPort}/retrieve`, - type: 'GET', - responseType: 'json', - timeout: undefined, - }; - - const headers = { - 'X-Loki-recipient': pubKey, - }; - - if (currentNode.lastHash) { - headers['X-Loki-last-hash'] = currentNode.lastHash; - } + async retrieveMessages(callback) { + let ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + const ourKey = window.textsecure.storage.user.getNumber(); + let completedRequests = 0; + + const doRequest = async (nodeUrl, nodeData) => { + const options = { + url: `${nodeUrl}${this.messageServerPort}/retrieve`, + type: 'GET', + responseType: 'json', + timeout: undefined, + }; + + const headers = { + 'X-Loki-recipient': ourKey, + }; + + if (nodeData.lastHash) { + headers['X-Loki-last-hash'] = nodeData.lastHash; + } - const fetchOptions = { - method: options.type, - headers, - timeout: options.timeout, - }; + const fetchOptions = { + method: options.type, + headers, + timeout: options.timeout, + }; + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + // TODO: Maybe we shouldn't immediately delete? + log.error(options.type, options.url, 0, `Error retrieving messages from ${nodeUrl}`); + window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl); + throw HTTPError('fetch error', 0, e.toString()); + } - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - log.error(options.type, options.url, 0, 'Error'); - throw HTTPError('fetch error', 0, e.toString()); - } + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } + completedRequests += 1; - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); + if (response.status >= 0 && response.status < 400) { + if (result.lastHash) { + window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash); + } + return result; + } + log.error(options.type, options.url, response.status, 'Error'); + throw HTTPError('retrieveMessages: error response', response.status, result); } - if (response.status >= 0 && response.status < 400) { - if (result.lastHash) { - currentNode.lastHash = result.lastHash; + while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) { + const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests; + ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + if (Object.keys(ourSwarmNodes).length < remainingRequests) { + if (completedRequests !== 0) { + // TODO: Decide how to handle some completed requests but not enough + } + return; } - return result; + + const requests = await Promise.all( + Object.entries(ourSwarmNodes) + .splice(0, remainingRequests) + .map(([nodeUrl, lastHash]) => doRequest(nodeUrl, lastHash).catch(() => null)) + ); + // Requests is now an array of null for failed requests and the json for success + requests.filter(v => v !== null && 'messages' in v) + .forEach(v => callback(v.messages)); } - log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('retrieveMessages: error response', response.status, result); } } @@ -223,5 +191,5 @@ function HTTPError(message, providedCode, response, stack) { } module.exports = { - LokiServer, + LokiMessageAPI, }; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js new file mode 100644 index 000000000..ff43940b4 --- /dev/null +++ b/js/modules/loki_snode_api.js @@ -0,0 +1,170 @@ +/* global log, window, Whisper */ + +const fetch = require('node-fetch'); +const is = require('@sindresorhus/is'); +const dns = require('dns'); + +// Will be raised (to 3?) when we get more nodes +const MINIMUM_SWARM_NODES = 1; + +class LokiSnodeAPI { + + constructor({ url, swarmServerPort }) { + if (!is.string(url)) { + throw new Error('WebAPI.initialize: Invalid server url'); + } + this.url = url; + this.swarmServerPort = swarmServerPort + ? `:${swarmServerPort}` + : ''; + this.swarmsPendingReplenish = {}; + this.ourSwarmNodes = {}; + } + + getRandomSnodeAddress() { + /* resolve random snode */ + return new Promise((resolve, reject) => { + dns.resolveCname(this.url, (err, address) => { + if(err) { + reject(err); + } else { + resolve(address[0]); + } + }); + }); + } + + unreachableNode(pubKey, nodeUrl) { + if (pubKey === window.textsecure.storage.user.getNumber()) { + delete this.ourSwarmNodes[nodeUrl]; + } + } + + updateLastHash(nodeUrl, hash) { + if (!this.ourSwarmNodes[nodeUrl]) { + this.ourSwarmNodes[nodeUrl] = { + lastHash: hash, + } + } else { + this.ourSwarmNodes[nodeUrl].lastHash = hash; + } + } + + async getOurSwarmNodes() { + if ( + !this.ourSwarmNodes || + Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES + ) { + // Try refresh our swarm list once + const ourKey = window.textsecure.storage.user.getNumber(); + const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey); + + this.ourSwarmNodes = {}; + nodeAddresses.forEach(url => { + this.ourSwarmNodes[url] = {}; + }) + if (!this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length === 0) { + throw Error('Could not load our swarm') + } + } + return this.ourSwarmNodes; + } + + async getSwarmNodesByPubkey(pubKey) { + const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); + // TODO: Check if swarm list is below a threshold rather than empty + if (swarmNodes && swarmNodes.length !== 0) { + return swarmNodes; + } + return this.replenishSwarm(pubKey); + } + + async replenishSwarm(pubKey) { + const conversation = window.ConversationController.get(pubKey); + if (!(pubKey in this.swarmsPendingReplenish)) { + this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => { + const newSwarmNodes = await this.getSwarmNodes(pubKey); + conversation.set({ swarmNodes: newSwarmNodes }); + await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, { + Conversation: Whisper.Conversation, + }); + resolve(newSwarmNodes); + }); + } + const newSwarmNodes = await this.swarmsPendingReplenish[pubKey]; + delete this.swarmsPendingReplenish[pubKey]; + return newSwarmNodes; + } + + async getSwarmNodes(pubKey) { + const node = await this.getRandomSnodeAddress(); + const options = { + url: `http://${node}${this.swarmServerPort}/json_rpc`, + type: 'POST', + responseType: 'json', + timeout: undefined, + }; + + const body = { + jsonrpc: '2.0', + id: '0', + method: 'get_swarm_list_for_messenger_pubkey', + params: { + pubkey: pubKey, + }, + } + + const fetchOptions = { + method: options.type, + body: JSON.stringify(body), + headers: { + 'Content-Type': 'application/json', + }, + timeout: options.timeout, + }; + + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + log.error(options.type, options.url, 0, `Error getting swarm nodes for ${pubKey}`); + throw HTTPError('fetch error', 0, e.toString()); + } + + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } + + if (response.status >= 0 && response.status < 400) { + return result.nodes; + } + log.error(options.type, options.url, response.status, `Error getting swarm nodes for ${pubKey}`); + throw HTTPError('sendMessage: error response', response.status, result); + } +} + +function HTTPError(message, providedCode, response, stack) { + const code = providedCode > 999 || providedCode < 100 ? -1 : providedCode; + const e = new Error(`${message}; code: ${code}`); + e.name = 'HTTPError'; + e.code = code; + if (stack) { + e.stack += `\nOriginal stack:\n${stack}`; + } + if (response) { + e.response = response; + } + return e; +} + +module.exports = { + LokiSnodeAPI, +}; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index ffeb2c071..df98603bd 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -1,4 +1,4 @@ -/* global window, dcodeIO, textsecure, StringView */ +/* global window, dcodeIO, textsecure */ // eslint-disable-next-line func-names (function () { @@ -62,26 +62,8 @@ }; let connected = false; - this.startPolling = async function pollServer(callBack) { - const myKeys = await textsecure.storage.protocol.getIdentityKeyPair(); - const pubKey = StringView.arrayBufferToHex(myKeys.pubKey) - let result; - try { - result = await server.retrieveMessages(pubKey); - connected = true; - } catch (err) { - connected = false; - setTimeout(() => { pollServer(callBack); }, pollTime); - return; - } - if (typeof callBack === 'function') { - callBack(connected); - } - if (!result.messages) { - setTimeout(() => { pollServer(callBack); }, pollTime); - return; - } - const newMessages = await filterIncomingMessages(result.messages); + const processMessages = async messages => { + const newMessages = await filterIncomingMessages(messages); newMessages.forEach(async message => { const { data } = message; const dataPlaintext = stringToArrayBufferBase64(data); @@ -97,7 +79,17 @@ ); } }); - setTimeout(() => { pollServer(callBack); }, pollTime); + } + + this.startPolling = async function pollServer(callback) { + try { + await server.retrieveMessages(processMessages); + connected = true; + } catch (err) { + connected = false; + } + callback(connected); + setTimeout(() => { pollServer(callback); }, pollTime); }; this.isConnected = function isConnected() { diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index ac61def0f..d0492ab30 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -22,7 +22,7 @@ function MessageReceiver(username, password, signalingKey, options = {}) { this.signalingKey = signalingKey; this.username = username; this.password = password; - this.lokiserver = window.LokiAPI; + this.lokiMessageAPI = window.LokiMessageAPI; if (!options.serverTrustRoot) { throw new Error('Server trust root is required!'); @@ -67,7 +67,7 @@ MessageReceiver.prototype.extend({ } this.hasConnected = true; - this.httpPollingResource = new HttpResource(this.lokiserver, { + this.httpPollingResource = new HttpResource(this.lokiMessageAPI, { handleRequest: this.handleRequest.bind(this), }); this.httpPollingResource.startPolling((connected) => { diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index 79426037a..4a33d9a10 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -34,7 +34,7 @@ function OutgoingMessage( this.callback = callback; this.silent = silent; - this.lokiserver = window.LokiAPI; + this.lokiMessageAPI = window.LokiMessageAPI; this.numbersCompleted = 0; this.errors = []; @@ -184,7 +184,7 @@ OutgoingMessage.prototype = { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { const pubKey = number; try { - const result = await this.lokiserver.sendMessage(pubKey, data, timestamp, ttl); + const result = await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl); return result; } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { diff --git a/preload.js b/preload.js index e9dfa7b5e..7e4ecf73f 100644 --- a/preload.js +++ b/preload.js @@ -265,14 +265,20 @@ window.WebAPI = initializeWebAPI({ proxyUrl: config.proxyUrl, }); -const { LokiServer } = require('./js/modules/loki_message_api'); +const { LokiSnodeAPI } = require('./js/modules/loki_snode_api'); -window.LokiAPI = new LokiServer({ - urls: [config.serverUrl], - messageServerPort: config.messageServerPort, +window.LokiSnodeAPI = new LokiSnodeAPI({ + url: config.serverUrl, swarmServerPort: config.swarmServerPort, }); +const { LokiMessageAPI } = require('./js/modules/loki_message_api'); + +window.LokiMessageAPI = new LokiMessageAPI({ + url: config.serverUrl, + messageServerPort: config.messageServerPort, +}); + window.mnemonic = require('./libloki/mnemonic'); const { WorkerInterface } = require('./js/modules/util_worker_interface');