diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index bccd59d4d..7dfc99d35 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -66,28 +66,6 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { } }; -const retrieveNextMessages = async (nodeUrl, nodeData, ourKey) => { - const params = { - pubKey: ourKey, - lastHash: nodeData.lastHash || '', - }; - const options = { - timeout: 40000, - headers: { - [LOKI_LONGPOLL_HEADER]: true, - }, - }; - - const result = await rpc( - `https://${nodeUrl}`, - nodeData.port, - 'retrieve', - params, - options - ); - return result.messages || []; -}; - class LokiMessageAPI { constructor() { this.jobQueue = new window.JobQueue(); @@ -228,7 +206,7 @@ class LokiMessageAPI { return false; } - async openConnection(callback) { + async openRetrieveConnection(callback) { while (!_.isEmpty(this.ourSwarmNodes)) { const address = Object.keys(this.ourSwarmNodes)[0]; const nodeData = this.ourSwarmNodes[address]; @@ -239,12 +217,12 @@ class LokiMessageAPI { try { // TODO: Revert back to using snode address instead of IP - let messages = await retrieveNextMessages(nodeData.ip, nodeData); + let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); - nodeData.lashHash = lastMessage.hash; - lokiSnodeAPI.updateLastHash( + nodeData.lastHash = lastMessage.hash; + await lokiSnodeAPI.updateLastHash( address, lastMessage.hash, lastMessage.expiration @@ -264,7 +242,7 @@ class LokiMessageAPI { const lastHash = await window.Signal.Data.getLastHashBySnode( newSwarm[i] ); - this.ourSwarmnewSwarm[newSwarm[i]] = { + this.ourSwarmNodes[newSwarm[i]] = { lastHash, }; } @@ -279,9 +257,34 @@ class LokiMessageAPI { successiveFailures += 1; } } + if (successiveFailures >= 3) { + await lokiSnodeAPI.unreachableNode(this.ourKey, address); + } } } + async retrieveNextMessages(nodeUrl, nodeData) { + const params = { + pubKey: this.ourKey, + lastHash: nodeData.lastHash || '', + }; + const options = { + timeout: 40000, + headers: { + [LOKI_LONGPOLL_HEADER]: true, + }, + }; + + const result = await rpc( + `https://${nodeUrl}`, + nodeData.port, + 'retrieve', + params, + options + ); + return result.messages || []; + }; + async startLongPolling(numConnections, callback) { this.ourSwarmNodes = {}; let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); @@ -290,16 +293,18 @@ class LokiMessageAPI { nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); } for (let i = 0; i < nodes.length; i += 1) { - const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i]); - this.ourSwarmNodes[nodes[i]] = { + const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i].address); + this.ourSwarmNodes[nodes[i].address] = { lastHash, + ip: nodes[i].ip, + port: nodes[i].port, }; } const promises = []; for (let i = 0; i < numConnections; i += 1) - promises.push(this.openConnection(callback)); + promises.push(this.openRetrieveConnection(callback)); // blocks until all snodes in our swarms have been removed from the list // or if there is network issues (ENOUTFOUND due to lokinet) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 9347ef7d2..4a23208c3 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -89,10 +89,8 @@ class LokiSnodeAPI { async unreachableNode(pubKey, nodeUrl) { const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; - if (swarmNodes.includes(nodeUrl)) { - const filteredNodes = swarmNodes.filter(node => node !== nodeUrl); - await conversation.updateSwarmNodes(filteredNodes); - } + const filteredNodes = swarmNodes.filter(node => node.address !== nodeUrl); + await conversation.updateSwarmNodes(filteredNodes); } async updateLastHash(nodeUrl, lastHash, expiresAt) {