From 37ba762312f5c16e9f72625721d4b45b3e96189d Mon Sep 17 00:00:00 2001 From: Beaudan Date: Wed, 9 Jan 2019 12:33:21 +1100 Subject: [PATCH 1/7] First stuff for contacting specific nodes for each contact. Hard coded to hit the same bootstrap node for now plus doesn't handle unreachable nodes/errors well yet --- app/sql.js | 24 +++++++++- config/default.json | 6 ++- js/conversation_controller.js | 1 + js/models/conversations.js | 25 +++++++++- js/modules/data.js | 6 +++ js/modules/loki_message_api.js | 88 ++++++++++++++++++++++++++++++---- main.js | 2 + preload.js | 2 + 8 files changed, 139 insertions(+), 15 deletions(-) diff --git a/app/sql.js b/app/sql.js index c58e7c240..20be73737 100644 --- a/app/sql.js +++ b/app/sql.js @@ -81,6 +81,8 @@ module.exports = { removeSessionsByNumber, removeAllSessions, + getSwarmNodesByPubkey, + getConversationCount, saveConversation, saveConversations, @@ -385,6 +387,7 @@ async function updateToSchemaVersion4(currentVersion, instance) { type STRING, members TEXT, name TEXT, + swarmNodes TEXT, profileName TEXT );` ); @@ -1025,6 +1028,18 @@ async function removeAllFromTable(table) { // Conversations +async function getSwarmNodesByPubkey(pubkey) { + const row = await db.get('SELECT * FROM conversations WHERE id = $pubkey;', { + $pubkey: pubkey, + }); + + if (!row) { + return null; + } + + return jsonToObject(row.json).swarmNodes; +} + async function getConversationCount() { const row = await db.get('SELECT count(*) from conversations;'); @@ -1037,7 +1052,7 @@ async function getConversationCount() { async function saveConversation(data) { // eslint-disable-next-line camelcase - const { id, active_at, type, members, name, friendRequestStatus, profileName } = data; + const { id, active_at, type, members, name, friendRequestStatus, swarmNodes, profileName } = data; await db.run( `INSERT INTO conversations ( @@ -1049,6 +1064,7 @@ async function saveConversation(data) { members, name, friendRequestStatus, + swarmNodes, profileName ) values ( $id, @@ -1059,6 +1075,7 @@ async function saveConversation(data) { $members, $name, $friendRequestStatus, + $swarmNodes, $profileName );`, { @@ -1070,6 +1087,7 @@ async function saveConversation(data) { $members: members ? members.join(' ') : null, $name: name, $friendRequestStatus: friendRequestStatus, + $swarmNodes: swarmNodes ? swarmNodes.join(' ') : null, $profileName: profileName, } ); @@ -1093,7 +1111,7 @@ async function saveConversations(arrayOfConversations) { async function updateConversation(data) { // eslint-disable-next-line camelcase - const { id, active_at, type, members, name, friendRequestStatus, profileName } = data; + const { id, active_at, type, members, name, friendRequestStatus, swarmNodes, profileName } = data; await db.run( `UPDATE conversations SET @@ -1104,6 +1122,7 @@ async function updateConversation(data) { members = $members, name = $name, friendRequestStatus = $friendRequestStatus, + swarmNodes = $swarmNodes, profileName = $profileName WHERE id = $id;`, { @@ -1115,6 +1134,7 @@ async function updateConversation(data) { $members: members ? members.join(' ') : null, $name: name, $friendRequestStatus: friendRequestStatus, + $swarmNodes: swarmNodes ? swarmNodes.join(' ') : null, $profileName: profileName, } ); diff --git a/config/default.json b/config/default.json index cb8a907fa..2d4ec64a7 100644 --- a/config/default.json +++ b/config/default.json @@ -1,6 +1,8 @@ { - "serverUrl": "http://localhost:8080", - "cdnUrl": "http://localhost", + "serverUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", + "cdnUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", + "messageServerPort": ":8080", + "swarmServerPort": ":8079", "disableAutoUpdate": false, "openDevTools": false, "buildExpiration": 0, diff --git a/js/conversation_controller.js b/js/conversation_controller.js index 37c1eb6a6..2a72fa2fe 100644 --- a/js/conversation_controller.js +++ b/js/conversation_controller.js @@ -174,6 +174,7 @@ return conversation; } + conversation.refreshSwarmNodes(); try { await window.Signal.Data.saveConversation(conversation.attributes, { Conversation: Whisper.Conversation, diff --git a/js/models/conversations.js b/js/models/conversations.js index af8ce2409..0e052c310 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -86,6 +86,8 @@ friendRequestStatus: FriendRequestStatusEnum.none, unlockTimestamp: null, // Timestamp used for expiring friend requests. sessionResetStatus: SessionResetEnum.none, + swarmNodes: [], + refreshPromise: null, }; }, @@ -579,6 +581,22 @@ throw new Error('Invalid friend request state'); } }, + async refreshSwarmNodes() { + // Refresh promise to ensure that we are only doing a single query at a time + let refreshPromise = this.get('refreshPromise'); + if (refreshPromise === null) { + refreshPromise = (async () => { + const newSwarmNodes = await window.LokiAPI.getSwarmNodes(this.id); + this.set({ swarmNodes: _.union(this.get('swarmNodes'), newSwarmNodes) }); + await window.Signal.Data.updateConversation(this.id, this.attributes, { + Conversation: Whisper.Conversation, + }); + })(); + this.set({ refreshPromise }); + } + await refreshPromise; + this.set({ refreshPromise: null }); + }, async setFriendRequestStatus(newStatus) { // Ensure that the new status is a valid FriendStatusEnum value if (!(newStatus in Object.values(FriendRequestStatusEnum))) @@ -1198,7 +1216,10 @@ options.messageType = message.get('type'); // Add the message sending on another queue so that our UI doesn't get blocked - this.queueMessageSend(async () => + this.queueMessageSend(async () => { + if (this.get('swarmNodes').length === 0) { + await this.refreshSwarmNodes(); + } message.send( this.wrapSend( sendFunction( @@ -1213,7 +1234,7 @@ ) ) ) - ); + }); return true; }); diff --git a/js/modules/data.js b/js/modules/data.js index e7eb0614f..214d114b1 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -108,6 +108,8 @@ module.exports = { removeSessionsByNumber, removeAllSessions, + getSwarmNodesByPubkey, + getConversationCount, saveConversation, saveConversations, @@ -654,6 +656,10 @@ async function removeAllSessions(id) { // Conversation +async function getSwarmNodesByPubkey(pubkey) { + return channels.getSwarmNodesByPubkey(pubkey); +} + async function getConversationCount() { return channels.getConversationCount(); } diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index a72717ce4..8f54e7a11 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -5,8 +5,10 @@ const is = require('@sindresorhus/is'); class LokiServer { - constructor({ urls }) { + constructor({ urls, messageServerPort, swarmServerPort }) { this.nodes = []; + this.messageServerPort = messageServerPort; + this.swarmServerPort = swarmServerPort; urls.forEach(url => { if (!is.string(url)) { throw new Error('WebAPI.initialize: Invalid server url'); @@ -15,11 +17,78 @@ class LokiServer { }); } - async sendMessage(pubKey, data, messageTimeStamp, ttl) { - const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); - // Hardcoded to use a single node/server for now + async loadOurSwarm() { + const ourKey = window.textsecure.storage.user.getNumber(); + const nodeAddresses = await this.getSwarmNodes(ourKey); + this.ourSwarmNodes = []; + nodeAddresses.forEach(url => { + this.ourSwarmNodes.push({ url }); + }) + } + + async getSwarmNodes(pubKey) { const currentNode = this.nodes[0]; + const options = { + url: `${currentNode.url}${this.swarmServerPort}/json_rpc`, + type: 'POST', + responseType: 'json', + timeout: undefined, + }; + + const body = { + jsonrpc: '2.0', + id: '0', + method: 'get_swarm_list_for_messenger_pubkey', + params: { + pubkey: pubKey, + }, + } + + const fetchOptions = { + method: options.type, + body: JSON.stringify(body), + headers: { + 'Content-Type': 'application/json', + }, + timeout: options.timeout, + }; + + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + log.error(options.type, options.url, 0, 'Error'); + throw HTTPError('fetch error', 0, e.toString()); + } + + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } + + if (response.status >= 0 && response.status < 400) { + return result.nodes; + } + log.error(options.type, options.url, response.status, 'Error'); + throw HTTPError('sendMessage: error response', response.status, result); + } + + async sendMessage(pubKey, data, messageTimeStamp, ttl) { + const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); + if (!swarmNodes || swarmNodes.length === 0) { + // TODO: Refresh the swarm nodes list + throw Error('No swarm nodes to query!'); + } + + const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const timestamp = Math.floor(Date.now() / 1000); // Nonce is returned as a base64 string to include in header let nonce; @@ -37,7 +106,7 @@ class LokiServer { } const options = { - url: `${currentNode.url}/store`, + url: `${swarmNodes[0]}${this.messageServerPort}/store`, type: 'POST', responseType: undefined, timeout: undefined, @@ -83,11 +152,12 @@ class LokiServer { } async retrieveMessages(pubKey) { - // Hardcoded to use a single node/server for now - const currentNode = this.nodes[0]; - + if (!this.ourSwarmNodes || this.ourSwarmNodes.length === 0) { + await this.loadOurSwarm(); + } + const currentNode = this.ourSwarmNodes[0]; const options = { - url: `${currentNode.url}/retrieve`, + url: `${currentNode.url}${this.messageServerPort}/retrieve`, type: 'GET', responseType: 'json', timeout: undefined, diff --git a/main.js b/main.js index c29c01ca5..9bf782da2 100644 --- a/main.js +++ b/main.js @@ -144,6 +144,8 @@ function prepareURL(pathSegments, moreKeys) { buildExpiration: config.get('buildExpiration'), serverUrl: config.get('serverUrl'), cdnUrl: config.get('cdnUrl'), + messageServerPort: config.get('messageServerPort'), + swarmServerPort: config.get('swarmServerPort'), certificateAuthority: config.get('certificateAuthority'), environment: config.environment, node_version: process.versions.node, diff --git a/preload.js b/preload.js index 189f41cd3..e9dfa7b5e 100644 --- a/preload.js +++ b/preload.js @@ -269,6 +269,8 @@ const { LokiServer } = require('./js/modules/loki_message_api'); window.LokiAPI = new LokiServer({ urls: [config.serverUrl], + messageServerPort: config.messageServerPort, + swarmServerPort: config.swarmServerPort, }); window.mnemonic = require('./libloki/mnemonic'); From f09f0f5721dc7825205df268213ce910fabd9398 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Wed, 9 Jan 2019 14:43:58 +1100 Subject: [PATCH 2/7] Moved replenishSwarm into libloki-protocol and attached to the window --- js/conversation_controller.js | 2 +- js/models/conversations.js | 19 +------------------ 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/js/conversation_controller.js b/js/conversation_controller.js index 2a72fa2fe..ca0d5658b 100644 --- a/js/conversation_controller.js +++ b/js/conversation_controller.js @@ -174,7 +174,7 @@ return conversation; } - conversation.refreshSwarmNodes(); + window.libloki.replenishSwarm(id); try { await window.Signal.Data.saveConversation(conversation.attributes, { Conversation: Whisper.Conversation, diff --git a/js/models/conversations.js b/js/models/conversations.js index 0e052c310..c18984757 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -87,7 +87,6 @@ unlockTimestamp: null, // Timestamp used for expiring friend requests. sessionResetStatus: SessionResetEnum.none, swarmNodes: [], - refreshPromise: null, }; }, @@ -581,22 +580,6 @@ throw new Error('Invalid friend request state'); } }, - async refreshSwarmNodes() { - // Refresh promise to ensure that we are only doing a single query at a time - let refreshPromise = this.get('refreshPromise'); - if (refreshPromise === null) { - refreshPromise = (async () => { - const newSwarmNodes = await window.LokiAPI.getSwarmNodes(this.id); - this.set({ swarmNodes: _.union(this.get('swarmNodes'), newSwarmNodes) }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); - })(); - this.set({ refreshPromise }); - } - await refreshPromise; - this.set({ refreshPromise: null }); - }, async setFriendRequestStatus(newStatus) { // Ensure that the new status is a valid FriendStatusEnum value if (!(newStatus in Object.values(FriendRequestStatusEnum))) @@ -1218,7 +1201,7 @@ // Add the message sending on another queue so that our UI doesn't get blocked this.queueMessageSend(async () => { if (this.get('swarmNodes').length === 0) { - await this.refreshSwarmNodes(); + await window.libloki.replenishSwarm(destination); } message.send( this.wrapSend( From 714a5ab8b107d0bc43d19de79cd498c58db325b2 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Fri, 11 Jan 2019 13:48:58 +1100 Subject: [PATCH 3/7] Update consolidateLists function to take a selector function and updated tests --- libloki/service_nodes.js | 20 ++++++++++++------- libloki/test/service_nodes_test.js | 32 ++++++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/libloki/service_nodes.js b/libloki/service_nodes.js index 58698bd7e..1c0ff8aaf 100644 --- a/libloki/service_nodes.js +++ b/libloki/service_nodes.js @@ -4,30 +4,36 @@ (function () { window.libloki = window.libloki || {}; - function consolidateLists(lists, threshold = 1){ + function consolidateLists(lists, threshold, selector = (x) => x){ if (typeof threshold !== 'number') { throw Error('Provided threshold is not a number'); } + if (typeof selector !== 'function') { + throw Error('Provided selector is not a function'); + } // calculate list size manually since `Set` // does not have a `length` attribute let numLists = 0; const occurences = {}; + const values = {}; lists.forEach(list => { numLists += 1; list.forEach(item => { - if (!(item in occurences)) { - occurences[item] = 1; + const key = selector(item); + if (!(key in occurences)) { + occurences[key] = 1; + values[key] = item; } else { - occurences[item] += 1; + occurences[key] += 1; } }); }); const scaledThreshold = numLists * threshold; - return Object.entries(occurences) - .filter(keyValue => keyValue[1] >= scaledThreshold) - .map(keyValue => keyValue[0]); + return Object.keys(occurences) + .filter(key => occurences[key] >= scaledThreshold) + .map(key => values[key]); } window.libloki.serviceNodes = { diff --git a/libloki/test/service_nodes_test.js b/libloki/test/service_nodes_test.js index 1d72d0b92..59636743a 100644 --- a/libloki/test/service_nodes_test.js +++ b/libloki/test/service_nodes_test.js @@ -17,13 +17,22 @@ describe('ServiceNodes', () => { ); }); + it('should throw when provided a non-function selector', () => { + [1, 'a', 0xffffffff, { really: 'not a function' }].forEach(x => { + assert.throws(() => + libloki.serviceNodes.consolidateLists([], 1, x), + 'Provided selector is not a function' + ) + }); + }); + it('should return an empty array when the input is an empty array', () => { - const result = libloki.serviceNodes.consolidateLists([]); + const result = libloki.serviceNodes.consolidateLists([], 1); assert.deepEqual(result, []); }); it('should return the input when only 1 list is provided', () => { - const result = libloki.serviceNodes.consolidateLists([['a', 'b', 'c']]); + const result = libloki.serviceNodes.consolidateLists([['a', 'b', 'c']], 1); assert.deepEqual(result, ['a', 'b', 'c']); }); @@ -36,6 +45,25 @@ describe('ServiceNodes', () => { assert.deepEqual(result.sort(), ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']); }); + it('should use the selector to identify the elements', () => { + const result = libloki.serviceNodes.consolidateLists([ + [{ id: 1, val: 'a'}, { id: 2, val: 'b'}, { id: 3, val: 'c'}, { id: 8, val: 'h'}], + [{ id: 4, val: 'd'}, { id: 5, val: 'e'}, { id: 6, val: 'f'}, { id: 7, val: 'g'}], + [{ id: 7, val: 'g'}, { id: 8, val: 'h'}], + ], 0, x => x.id); + const expected = [ + { id: 1, val: 'a'}, + { id: 2, val: 'b'}, + { id: 3, val: 'c'}, + { id: 4, val: 'd'}, + { id: 5, val: 'e'}, + { id: 6, val: 'f'}, + { id: 7, val: 'g'}, + { id: 8, val: 'h'}, + ]; + assert.deepEqual(result.sort((a, b) => a.val > b.val), expected); + }); + it('should return the intersection of all lists when threshold is 1', () => { const result = libloki.serviceNodes.consolidateLists([ ['a', 'b', 'c', 'd'], From 7b1799c418b474a38fbea10ee4ce6e4745ef335d Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 14 Jan 2019 15:41:57 +1100 Subject: [PATCH 4/7] Big ol' hunk o' chunk. Now using random.snode to populate swarm lists, now making multiple requests from said lists and they are processed as they complete rather than waiting for all to resolve --- config/default.json | 8 +- js/conversation_controller.js | 2 +- js/models/conversations.js | 3 - js/modules/loki_message_api.js | 300 +++++++++++++----------------- js/modules/loki_snode_api.js | 170 +++++++++++++++++ libtextsecure/http-resources.js | 36 ++-- libtextsecure/message_receiver.js | 4 +- libtextsecure/outgoing_message.js | 4 +- preload.js | 14 +- 9 files changed, 337 insertions(+), 204 deletions(-) create mode 100644 js/modules/loki_snode_api.js diff --git a/config/default.json b/config/default.json index 2d4ec64a7..3b605aa6d 100644 --- a/config/default.json +++ b/config/default.json @@ -1,8 +1,8 @@ { - "serverUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", - "cdnUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", - "messageServerPort": ":8080", - "swarmServerPort": ":8079", + "serverUrl": "random.snode", + "cdnUrl": "random.snode", + "messageServerPort": "8080", + "swarmServerPort": "8079", "disableAutoUpdate": false, "openDevTools": false, "buildExpiration": 0, diff --git a/js/conversation_controller.js b/js/conversation_controller.js index ca0d5658b..3abe9561a 100644 --- a/js/conversation_controller.js +++ b/js/conversation_controller.js @@ -174,7 +174,7 @@ return conversation; } - window.libloki.replenishSwarm(id); + window.LokiSnodeAPI.replenishSwarm(id); try { await window.Signal.Data.saveConversation(conversation.attributes, { Conversation: Whisper.Conversation, diff --git a/js/models/conversations.js b/js/models/conversations.js index c18984757..a360f5f82 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -1200,9 +1200,6 @@ // Add the message sending on another queue so that our UI doesn't get blocked this.queueMessageSend(async () => { - if (this.get('swarmNodes').length === 0) { - await window.libloki.replenishSwarm(destination); - } message.send( this.wrapSend( sendFunction( diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 8f54e7a11..c80b72727 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -1,90 +1,25 @@ +/* eslint-disable no-await-in-loop */ /* global log, dcodeIO, window, callWorker */ const fetch = require('node-fetch'); -const is = require('@sindresorhus/is'); -class LokiServer { +// eslint-disable-next-line +const invert = p => new Promise((res, rej) => p.then(rej, res)); +const firstOf = ps => invert(Promise.all(ps.map(invert))); - constructor({ urls, messageServerPort, swarmServerPort }) { - this.nodes = []; - this.messageServerPort = messageServerPort; - this.swarmServerPort = swarmServerPort; - urls.forEach(url => { - if (!is.string(url)) { - throw new Error('WebAPI.initialize: Invalid server url'); - } - this.nodes.push({ url }); - }); - } - - async loadOurSwarm() { - const ourKey = window.textsecure.storage.user.getNumber(); - const nodeAddresses = await this.getSwarmNodes(ourKey); - this.ourSwarmNodes = []; - nodeAddresses.forEach(url => { - this.ourSwarmNodes.push({ url }); - }) - } - - async getSwarmNodes(pubKey) { - const currentNode = this.nodes[0]; - - const options = { - url: `${currentNode.url}${this.swarmServerPort}/json_rpc`, - type: 'POST', - responseType: 'json', - timeout: undefined, - }; - - const body = { - jsonrpc: '2.0', - id: '0', - method: 'get_swarm_list_for_messenger_pubkey', - params: { - pubkey: pubKey, - }, - } - - const fetchOptions = { - method: options.type, - body: JSON.stringify(body), - headers: { - 'Content-Type': 'application/json', - }, - timeout: options.timeout, - }; +// Will be raised (to 3?) when we get more nodes +const MINIMUM_SUCCESSFUL_REQUESTS = 2; +class LokiMessageAPI { - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - log.error(options.type, options.url, 0, 'Error'); - throw HTTPError('fetch error', 0, e.toString()); - } - - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); - } - - if (response.status >= 0 && response.status < 400) { - return result.nodes; - } - log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('sendMessage: error response', response.status, result); + constructor({ messageServerPort }) { + this.messageServerPort = messageServerPort + ? `:${messageServerPort}` + : ''; } async sendMessage(pubKey, data, messageTimeStamp, ttl) { - const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); + const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey) if (!swarmNodes || swarmNodes.length === 0) { - // TODO: Refresh the swarm nodes list throw Error('No swarm nodes to query!'); } @@ -101,110 +36,143 @@ class LokiServer { nonce = await callWorker('calcPoW', timestamp, ttl, pubKey, data64, development); } catch (err) { // Something went horribly wrong - // TODO: Handle gracefully throw err; } - const options = { - url: `${swarmNodes[0]}${this.messageServerPort}/store`, - type: 'POST', - responseType: undefined, - timeout: undefined, - }; - - const fetchOptions = { - method: options.type, - body: data64, - headers: { - 'X-Loki-pow-nonce': nonce, - 'X-Loki-timestamp': timestamp.toString(), - 'X-Loki-ttl': ttl.toString(), - 'X-Loki-recipient': pubKey, - }, - timeout: options.timeout, - }; - - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - log.error(options.type, options.url, 0, 'Error'); - throw HTTPError('fetch error', 0, e.toString()); - } + const requests = swarmNodes.map(async node => { + const options = { + url: `${node}${this.messageServerPort}/store`, + type: 'POST', + responseType: undefined, + timeout: undefined, + }; + + const fetchOptions = { + method: options.type, + body: data64, + headers: { + 'X-Loki-pow-nonce': nonce, + 'X-Loki-timestamp': timestamp.toString(), + 'X-Loki-ttl': ttl.toString(), + 'X-Loki-recipient': pubKey, + }, + timeout: options.timeout, + }; + + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + log.error(options.type, options.url, 0, 'Error sending message'); + window.LokiSnodeAPI.unreachableNode(pubKey, node); + throw HTTPError('fetch error', 0, e.toString()); + } - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); - } + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } - if (response.status >= 0 && response.status < 400) { + if (response.status >= 0 && response.status < 400) { + return result; + } + log.error(options.type, options.url, response.status, 'Error sending message'); + throw HTTPError('sendMessage: error response', response.status, result); + }); + try { + // TODO: Possibly change this to require more than a single response? + const result = await firstOf(requests); return result; + } catch(err) { + throw err; } - log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('sendMessage: error response', response.status, result); } - async retrieveMessages(pubKey) { - if (!this.ourSwarmNodes || this.ourSwarmNodes.length === 0) { - await this.loadOurSwarm(); - } - const currentNode = this.ourSwarmNodes[0]; - const options = { - url: `${currentNode.url}${this.messageServerPort}/retrieve`, - type: 'GET', - responseType: 'json', - timeout: undefined, - }; - - const headers = { - 'X-Loki-recipient': pubKey, - }; - - if (currentNode.lastHash) { - headers['X-Loki-last-hash'] = currentNode.lastHash; - } + async retrieveMessages(callback) { + let ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + const ourKey = window.textsecure.storage.user.getNumber(); + let completedRequests = 0; + + const doRequest = async (nodeUrl, nodeData) => { + const options = { + url: `${nodeUrl}${this.messageServerPort}/retrieve`, + type: 'GET', + responseType: 'json', + timeout: undefined, + }; + + const headers = { + 'X-Loki-recipient': ourKey, + }; + + if (nodeData.lastHash) { + headers['X-Loki-last-hash'] = nodeData.lastHash; + } - const fetchOptions = { - method: options.type, - headers, - timeout: options.timeout, - }; + const fetchOptions = { + method: options.type, + headers, + timeout: options.timeout, + }; + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + // TODO: Maybe we shouldn't immediately delete? + log.error(options.type, options.url, 0, `Error retrieving messages from ${nodeUrl}`); + window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl); + throw HTTPError('fetch error', 0, e.toString()); + } - let response; - try { - response = await fetch(options.url, fetchOptions); - } catch (e) { - log.error(options.type, options.url, 0, 'Error'); - throw HTTPError('fetch error', 0, e.toString()); - } + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } + completedRequests += 1; - let result; - if ( - options.responseType === 'json' && - response.headers.get('Content-Type') === 'application/json' - ) { - result = await response.json(); - } else if (options.responseType === 'arraybuffer') { - result = await response.buffer(); - } else { - result = await response.text(); + if (response.status >= 0 && response.status < 400) { + if (result.lastHash) { + window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash); + } + return result; + } + log.error(options.type, options.url, response.status, 'Error'); + throw HTTPError('retrieveMessages: error response', response.status, result); } - if (response.status >= 0 && response.status < 400) { - if (result.lastHash) { - currentNode.lastHash = result.lastHash; + while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) { + const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests; + ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + if (Object.keys(ourSwarmNodes).length < remainingRequests) { + if (completedRequests !== 0) { + // TODO: Decide how to handle some completed requests but not enough + } + return; } - return result; + + const requests = await Promise.all( + Object.entries(ourSwarmNodes) + .splice(0, remainingRequests) + .map(([nodeUrl, lastHash]) => doRequest(nodeUrl, lastHash).catch(() => null)) + ); + // Requests is now an array of null for failed requests and the json for success + requests.filter(v => v !== null && 'messages' in v) + .forEach(v => callback(v.messages)); } - log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('retrieveMessages: error response', response.status, result); } } @@ -223,5 +191,5 @@ function HTTPError(message, providedCode, response, stack) { } module.exports = { - LokiServer, + LokiMessageAPI, }; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js new file mode 100644 index 000000000..ff43940b4 --- /dev/null +++ b/js/modules/loki_snode_api.js @@ -0,0 +1,170 @@ +/* global log, window, Whisper */ + +const fetch = require('node-fetch'); +const is = require('@sindresorhus/is'); +const dns = require('dns'); + +// Will be raised (to 3?) when we get more nodes +const MINIMUM_SWARM_NODES = 1; + +class LokiSnodeAPI { + + constructor({ url, swarmServerPort }) { + if (!is.string(url)) { + throw new Error('WebAPI.initialize: Invalid server url'); + } + this.url = url; + this.swarmServerPort = swarmServerPort + ? `:${swarmServerPort}` + : ''; + this.swarmsPendingReplenish = {}; + this.ourSwarmNodes = {}; + } + + getRandomSnodeAddress() { + /* resolve random snode */ + return new Promise((resolve, reject) => { + dns.resolveCname(this.url, (err, address) => { + if(err) { + reject(err); + } else { + resolve(address[0]); + } + }); + }); + } + + unreachableNode(pubKey, nodeUrl) { + if (pubKey === window.textsecure.storage.user.getNumber()) { + delete this.ourSwarmNodes[nodeUrl]; + } + } + + updateLastHash(nodeUrl, hash) { + if (!this.ourSwarmNodes[nodeUrl]) { + this.ourSwarmNodes[nodeUrl] = { + lastHash: hash, + } + } else { + this.ourSwarmNodes[nodeUrl].lastHash = hash; + } + } + + async getOurSwarmNodes() { + if ( + !this.ourSwarmNodes || + Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES + ) { + // Try refresh our swarm list once + const ourKey = window.textsecure.storage.user.getNumber(); + const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey); + + this.ourSwarmNodes = {}; + nodeAddresses.forEach(url => { + this.ourSwarmNodes[url] = {}; + }) + if (!this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length === 0) { + throw Error('Could not load our swarm') + } + } + return this.ourSwarmNodes; + } + + async getSwarmNodesByPubkey(pubKey) { + const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); + // TODO: Check if swarm list is below a threshold rather than empty + if (swarmNodes && swarmNodes.length !== 0) { + return swarmNodes; + } + return this.replenishSwarm(pubKey); + } + + async replenishSwarm(pubKey) { + const conversation = window.ConversationController.get(pubKey); + if (!(pubKey in this.swarmsPendingReplenish)) { + this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => { + const newSwarmNodes = await this.getSwarmNodes(pubKey); + conversation.set({ swarmNodes: newSwarmNodes }); + await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, { + Conversation: Whisper.Conversation, + }); + resolve(newSwarmNodes); + }); + } + const newSwarmNodes = await this.swarmsPendingReplenish[pubKey]; + delete this.swarmsPendingReplenish[pubKey]; + return newSwarmNodes; + } + + async getSwarmNodes(pubKey) { + const node = await this.getRandomSnodeAddress(); + const options = { + url: `http://${node}${this.swarmServerPort}/json_rpc`, + type: 'POST', + responseType: 'json', + timeout: undefined, + }; + + const body = { + jsonrpc: '2.0', + id: '0', + method: 'get_swarm_list_for_messenger_pubkey', + params: { + pubkey: pubKey, + }, + } + + const fetchOptions = { + method: options.type, + body: JSON.stringify(body), + headers: { + 'Content-Type': 'application/json', + }, + timeout: options.timeout, + }; + + let response; + try { + response = await fetch(options.url, fetchOptions); + } catch (e) { + log.error(options.type, options.url, 0, `Error getting swarm nodes for ${pubKey}`); + throw HTTPError('fetch error', 0, e.toString()); + } + + let result; + if ( + options.responseType === 'json' && + response.headers.get('Content-Type') === 'application/json' + ) { + result = await response.json(); + } else if (options.responseType === 'arraybuffer') { + result = await response.buffer(); + } else { + result = await response.text(); + } + + if (response.status >= 0 && response.status < 400) { + return result.nodes; + } + log.error(options.type, options.url, response.status, `Error getting swarm nodes for ${pubKey}`); + throw HTTPError('sendMessage: error response', response.status, result); + } +} + +function HTTPError(message, providedCode, response, stack) { + const code = providedCode > 999 || providedCode < 100 ? -1 : providedCode; + const e = new Error(`${message}; code: ${code}`); + e.name = 'HTTPError'; + e.code = code; + if (stack) { + e.stack += `\nOriginal stack:\n${stack}`; + } + if (response) { + e.response = response; + } + return e; +} + +module.exports = { + LokiSnodeAPI, +}; diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index ffeb2c071..df98603bd 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -1,4 +1,4 @@ -/* global window, dcodeIO, textsecure, StringView */ +/* global window, dcodeIO, textsecure */ // eslint-disable-next-line func-names (function () { @@ -62,26 +62,8 @@ }; let connected = false; - this.startPolling = async function pollServer(callBack) { - const myKeys = await textsecure.storage.protocol.getIdentityKeyPair(); - const pubKey = StringView.arrayBufferToHex(myKeys.pubKey) - let result; - try { - result = await server.retrieveMessages(pubKey); - connected = true; - } catch (err) { - connected = false; - setTimeout(() => { pollServer(callBack); }, pollTime); - return; - } - if (typeof callBack === 'function') { - callBack(connected); - } - if (!result.messages) { - setTimeout(() => { pollServer(callBack); }, pollTime); - return; - } - const newMessages = await filterIncomingMessages(result.messages); + const processMessages = async messages => { + const newMessages = await filterIncomingMessages(messages); newMessages.forEach(async message => { const { data } = message; const dataPlaintext = stringToArrayBufferBase64(data); @@ -97,7 +79,17 @@ ); } }); - setTimeout(() => { pollServer(callBack); }, pollTime); + } + + this.startPolling = async function pollServer(callback) { + try { + await server.retrieveMessages(processMessages); + connected = true; + } catch (err) { + connected = false; + } + callback(connected); + setTimeout(() => { pollServer(callback); }, pollTime); }; this.isConnected = function isConnected() { diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index ac61def0f..d0492ab30 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -22,7 +22,7 @@ function MessageReceiver(username, password, signalingKey, options = {}) { this.signalingKey = signalingKey; this.username = username; this.password = password; - this.lokiserver = window.LokiAPI; + this.lokiMessageAPI = window.LokiMessageAPI; if (!options.serverTrustRoot) { throw new Error('Server trust root is required!'); @@ -67,7 +67,7 @@ MessageReceiver.prototype.extend({ } this.hasConnected = true; - this.httpPollingResource = new HttpResource(this.lokiserver, { + this.httpPollingResource = new HttpResource(this.lokiMessageAPI, { handleRequest: this.handleRequest.bind(this), }); this.httpPollingResource.startPolling((connected) => { diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index 79426037a..4a33d9a10 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -34,7 +34,7 @@ function OutgoingMessage( this.callback = callback; this.silent = silent; - this.lokiserver = window.LokiAPI; + this.lokiMessageAPI = window.LokiMessageAPI; this.numbersCompleted = 0; this.errors = []; @@ -184,7 +184,7 @@ OutgoingMessage.prototype = { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { const pubKey = number; try { - const result = await this.lokiserver.sendMessage(pubKey, data, timestamp, ttl); + const result = await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl); return result; } catch (e) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { diff --git a/preload.js b/preload.js index e9dfa7b5e..7e4ecf73f 100644 --- a/preload.js +++ b/preload.js @@ -265,14 +265,20 @@ window.WebAPI = initializeWebAPI({ proxyUrl: config.proxyUrl, }); -const { LokiServer } = require('./js/modules/loki_message_api'); +const { LokiSnodeAPI } = require('./js/modules/loki_snode_api'); -window.LokiAPI = new LokiServer({ - urls: [config.serverUrl], - messageServerPort: config.messageServerPort, +window.LokiSnodeAPI = new LokiSnodeAPI({ + url: config.serverUrl, swarmServerPort: config.swarmServerPort, }); +const { LokiMessageAPI } = require('./js/modules/loki_message_api'); + +window.LokiMessageAPI = new LokiMessageAPI({ + url: config.serverUrl, + messageServerPort: config.messageServerPort, +}); + window.mnemonic = require('./libloki/mnemonic'); const { WorkerInterface } = require('./js/modules/util_worker_interface'); From 24553e29e75850b19147257a4256ef7f3474ee1a Mon Sep 17 00:00:00 2001 From: Beaudan Date: Tue, 15 Jan 2019 13:38:41 +1100 Subject: [PATCH 5/7] Made swarm requests ACTUALLY not wait for them all to finish and stop saving swarmNodes as DB column --- app/sql.js | 10 ++-------- js/modules/loki_message_api.js | 20 +++++++++----------- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/app/sql.js b/app/sql.js index 20be73737..cea84fd08 100644 --- a/app/sql.js +++ b/app/sql.js @@ -387,7 +387,6 @@ async function updateToSchemaVersion4(currentVersion, instance) { type STRING, members TEXT, name TEXT, - swarmNodes TEXT, profileName TEXT );` ); @@ -1052,7 +1051,7 @@ async function getConversationCount() { async function saveConversation(data) { // eslint-disable-next-line camelcase - const { id, active_at, type, members, name, friendRequestStatus, swarmNodes, profileName } = data; + const { id, active_at, type, members, name, friendRequestStatus, profileName } = data; await db.run( `INSERT INTO conversations ( @@ -1064,7 +1063,6 @@ async function saveConversation(data) { members, name, friendRequestStatus, - swarmNodes, profileName ) values ( $id, @@ -1075,7 +1073,6 @@ async function saveConversation(data) { $members, $name, $friendRequestStatus, - $swarmNodes, $profileName );`, { @@ -1087,7 +1084,6 @@ async function saveConversation(data) { $members: members ? members.join(' ') : null, $name: name, $friendRequestStatus: friendRequestStatus, - $swarmNodes: swarmNodes ? swarmNodes.join(' ') : null, $profileName: profileName, } ); @@ -1111,7 +1107,7 @@ async function saveConversations(arrayOfConversations) { async function updateConversation(data) { // eslint-disable-next-line camelcase - const { id, active_at, type, members, name, friendRequestStatus, swarmNodes, profileName } = data; + const { id, active_at, type, members, name, friendRequestStatus, profileName } = data; await db.run( `UPDATE conversations SET @@ -1122,7 +1118,6 @@ async function updateConversation(data) { members = $members, name = $name, friendRequestStatus = $friendRequestStatus, - swarmNodes = $swarmNodes, profileName = $profileName WHERE id = $id;`, { @@ -1134,7 +1129,6 @@ async function updateConversation(data) { $members: members ? members.join(' ') : null, $name: name, $friendRequestStatus: friendRequestStatus, - $swarmNodes: swarmNodes ? swarmNodes.join(' ') : null, $profileName: profileName, } ); diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index c80b72727..db8b9997f 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -96,7 +96,6 @@ class LokiMessageAPI { } async retrieveMessages(callback) { - let ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); const ourKey = window.textsecure.storage.user.getNumber(); let completedRequests = 0; @@ -126,9 +125,10 @@ class LokiMessageAPI { response = await fetch(options.url, fetchOptions); } catch (e) { // TODO: Maybe we shouldn't immediately delete? + // And differentiate between different connectivity issues log.error(options.type, options.url, 0, `Error retrieving messages from ${nodeUrl}`); window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl); - throw HTTPError('fetch error', 0, e.toString()); + return; } let result; @@ -144,19 +144,20 @@ class LokiMessageAPI { } completedRequests += 1; - if (response.status >= 0 && response.status < 400) { + if (response.status === 200) { if (result.lastHash) { window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash); + callback(result.messages); } - return result; + return; } + // Handle error from snode log.error(options.type, options.url, response.status, 'Error'); - throw HTTPError('retrieveMessages: error response', response.status, result); } while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) { const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests; - ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); + const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); if (Object.keys(ourSwarmNodes).length < remainingRequests) { if (completedRequests !== 0) { // TODO: Decide how to handle some completed requests but not enough @@ -164,14 +165,11 @@ class LokiMessageAPI { return; } - const requests = await Promise.all( + await Promise.all( Object.entries(ourSwarmNodes) .splice(0, remainingRequests) - .map(([nodeUrl, lastHash]) => doRequest(nodeUrl, lastHash).catch(() => null)) + .map(([nodeUrl, lastHash]) => doRequest(nodeUrl, lastHash)) ); - // Requests is now an array of null for failed requests and the json for success - requests.filter(v => v !== null && 'messages' in v) - .forEach(v => callback(v.messages)); } } } From bdbdf15469a00db4263e0eed80edbc380e064bd6 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Wed, 16 Jan 2019 14:37:33 +1100 Subject: [PATCH 6/7] Added timeouts for requests so they don't for ages. Changed swarmNodes to be a set to work property with merge, now removing contact swarmNodes if they timeout --- app/sql.js | 1 - js/models/conversations.js | 2 +- js/modules/data.js | 38 +++++++++++++++++++++++++++------- js/modules/loki_message_api.js | 12 ++++++----- js/modules/loki_snode_api.js | 29 ++++++++++++++++++-------- 5 files changed, 59 insertions(+), 23 deletions(-) diff --git a/app/sql.js b/app/sql.js index cea84fd08..2134db456 100644 --- a/app/sql.js +++ b/app/sql.js @@ -1,4 +1,3 @@ -const fs = require('fs'); const path = require('path'); const mkdirp = require('mkdirp'); const rimraf = require('rimraf'); diff --git a/js/models/conversations.js b/js/models/conversations.js index a360f5f82..2e3b70b7e 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -86,7 +86,7 @@ friendRequestStatus: FriendRequestStatusEnum.none, unlockTimestamp: null, // Timestamp used for expiring friend requests. sessionResetStatus: SessionResetEnum.none, - swarmNodes: [], + swarmNodes: new Set([]), }; }, diff --git a/js/modules/data.js b/js/modules/data.js index 214d114b1..b02728108 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -656,8 +656,21 @@ async function removeAllSessions(id) { // Conversation +function setifyProperty(data, propertyName) { + if (!data) return data; + const returnData = data; + if (returnData[propertyName]) { + returnData[propertyName] = new Set(returnData[propertyName]); + } + return returnData; +} + async function getSwarmNodesByPubkey(pubkey) { - return channels.getSwarmNodesByPubkey(pubkey); + let swarmNodes = await channels.getSwarmNodesByPubkey(pubkey); + if (Array.isArray(swarmNodes)) { + swarmNodes = new Set(swarmNodes); + } + return swarmNodes; } async function getConversationCount() { @@ -665,7 +678,11 @@ async function getConversationCount() { } async function saveConversation(data) { - await channels.saveConversation(data); + const storeData = data; + if (storeData.swarmNodes) { + storeData.swarmNodes = Array.from(storeData.swarmNodes); + } + await channels.saveConversation(storeData); } async function saveConversations(data) { @@ -673,7 +690,7 @@ async function saveConversations(data) { } async function getConversationById(id, { Conversation }) { - const data = await channels.getConversationById(id); + const data = setifyProperty(await channels.getConversationById(id), 'swarmNodes'); return new Conversation(data); } @@ -684,6 +701,9 @@ async function updateConversation(id, data, { Conversation }) { } const merged = merge({}, existing.attributes, data); + if (merged.swarmNodes instanceof Set) { + merged.swarmNodes = Array.from(merged.swarmNodes); + } await channels.updateConversation(merged); } @@ -704,7 +724,8 @@ async function _removeConversations(ids) { } async function getAllConversations({ ConversationCollection }) { - const conversations = await channels.getAllConversations(); + const conversations = (await channels.getAllConversations()) + .map(c => setifyProperty(c, 'swarmNodes')); const collection = new ConversationCollection(); collection.add(conversations); @@ -717,7 +738,8 @@ async function getAllConversationIds() { } async function getAllPrivateConversations({ ConversationCollection }) { - const conversations = await channels.getAllPrivateConversations(); + const conversations = (await channels.getAllPrivateConversations()) + .map(c => setifyProperty(c, 'swarmNodes')); const collection = new ConversationCollection(); collection.add(conversations); @@ -725,7 +747,8 @@ async function getAllPrivateConversations({ ConversationCollection }) { } async function getAllGroupsInvolvingId(id, { ConversationCollection }) { - const conversations = await channels.getAllGroupsInvolvingId(id); + const conversations = (await channels.getAllGroupsInvolvingId(id)) + .map(c => setifyProperty(c, 'swarmNodes')); const collection = new ConversationCollection(); collection.add(conversations); @@ -733,7 +756,8 @@ async function getAllGroupsInvolvingId(id, { ConversationCollection }) { } async function searchConversations(query, { ConversationCollection }) { - const conversations = await channels.searchConversations(query); + const conversations = (await channels.searchConversations(query)) + .map(c => setifyProperty(c, 'swarmNodes')); const collection = new ConversationCollection(); collection.add(conversations); diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index db8b9997f..dbeb50dd7 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -19,7 +19,7 @@ class LokiMessageAPI { async sendMessage(pubKey, data, messageTimeStamp, ttl) { const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey) - if (!swarmNodes || swarmNodes.length === 0) { + if (!swarmNodes || swarmNodes.size === 0) { throw Error('No swarm nodes to query!'); } @@ -39,12 +39,13 @@ class LokiMessageAPI { throw err; } - const requests = swarmNodes.map(async node => { + const requests = Array.from(swarmNodes).map(async node => { + // TODO: Confirm sensible timeout const options = { url: `${node}${this.messageServerPort}/store`, type: 'POST', responseType: undefined, - timeout: undefined, + timeout: 5000, }; const fetchOptions = { @@ -100,11 +101,12 @@ class LokiMessageAPI { let completedRequests = 0; const doRequest = async (nodeUrl, nodeData) => { + // TODO: Confirm sensible timeout const options = { url: `${nodeUrl}${this.messageServerPort}/retrieve`, type: 'GET', responseType: 'json', - timeout: undefined, + timeout: 5000, }; const headers = { @@ -159,10 +161,10 @@ class LokiMessageAPI { const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests; const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes(); if (Object.keys(ourSwarmNodes).length < remainingRequests) { + // This means we don't have enough swarm nodes to meet the minimum threshold if (completedRequests !== 0) { // TODO: Decide how to handle some completed requests but not enough } - return; } await Promise.all( diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index ff43940b4..5cbab41af 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -34,9 +34,18 @@ class LokiSnodeAPI { }); } - unreachableNode(pubKey, nodeUrl) { + async unreachableNode(pubKey, nodeUrl) { if (pubKey === window.textsecure.storage.user.getNumber()) { delete this.ourSwarmNodes[nodeUrl]; + return; + } + const conversation = window.ConversationController.get(pubKey); + const swarmNodes = conversation.get('swarmNodes'); + if (swarmNodes.delete(nodeUrl)) { + conversation.set({ swarmNodes }); + await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, { + Conversation: Whisper.Conversation, + }); } } @@ -55,17 +64,17 @@ class LokiSnodeAPI { !this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES ) { + this.ourSwarmNodes = {}; // Try refresh our swarm list once const ourKey = window.textsecure.storage.user.getNumber(); const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey); + if (!nodeAddresses || nodeAddresses.length === 0) { + throw Error('Could not load our swarm') + } - this.ourSwarmNodes = {}; nodeAddresses.forEach(url => { this.ourSwarmNodes[url] = {}; - }) - if (!this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length === 0) { - throw Error('Could not load our swarm') - } + }); } return this.ourSwarmNodes; } @@ -73,7 +82,7 @@ class LokiSnodeAPI { async getSwarmNodesByPubkey(pubKey) { const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); // TODO: Check if swarm list is below a threshold rather than empty - if (swarmNodes && swarmNodes.length !== 0) { + if (swarmNodes && swarmNodes.size !== 0) { return swarmNodes; } return this.replenishSwarm(pubKey); @@ -83,7 +92,7 @@ class LokiSnodeAPI { const conversation = window.ConversationController.get(pubKey); if (!(pubKey in this.swarmsPendingReplenish)) { this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => { - const newSwarmNodes = await this.getSwarmNodes(pubKey); + const newSwarmNodes = new Set(await this.getSwarmNodes(pubKey)); conversation.set({ swarmNodes: newSwarmNodes }); await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, { Conversation: Whisper.Conversation, @@ -97,12 +106,14 @@ class LokiSnodeAPI { } async getSwarmNodes(pubKey) { + // TODO: Hit multiple random nodes and merge lists? const node = await this.getRandomSnodeAddress(); + // TODO: Confirm final API URL and sensible timeout const options = { url: `http://${node}${this.swarmServerPort}/json_rpc`, type: 'POST', responseType: 'json', - timeout: undefined, + timeout: 5000, }; const body = { From badaf40ca84952e3a5364348f6ba2981d9130385 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Wed, 16 Jan 2019 15:15:23 +1100 Subject: [PATCH 7/7] bit of readability and catching error (but just silencing for now) --- js/modules/data.js | 3 ++- js/modules/loki_snode_api.js | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/js/modules/data.js b/js/modules/data.js index b02728108..9a1d34649 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -690,7 +690,8 @@ async function saveConversations(data) { } async function getConversationById(id, { Conversation }) { - const data = setifyProperty(await channels.getConversationById(id), 'swarmNodes'); + const rawData = await channels.getConversationById(id) + const data = setifyProperty(rawData, 'swarmNodes'); return new Conversation(data); } diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 5cbab41af..d4e0d1553 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -92,7 +92,13 @@ class LokiSnodeAPI { const conversation = window.ConversationController.get(pubKey); if (!(pubKey in this.swarmsPendingReplenish)) { this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => { - const newSwarmNodes = new Set(await this.getSwarmNodes(pubKey)); + let newSwarmNodes + try { + newSwarmNodes = new Set(await this.getSwarmNodes(pubKey)); + } catch (e) { + // TODO: Handle these errors sensibly + newSwarmNodes = new Set([]); + } conversation.set({ swarmNodes: newSwarmNodes }); await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, { Conversation: Whisper.Conversation,