Purge retrieving snodes

pull/311/head
Beaudan 6 years ago
parent d12f6b6d32
commit c5c01b0ac8

@ -66,28 +66,6 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
} }
}; };
const retrieveNextMessages = async (nodeUrl, nodeData, ourKey) => {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
const result = await rpc(
`https://${nodeUrl}`,
nodeData.port,
'retrieve',
params,
options
);
return result.messages || [];
};
class LokiMessageAPI { class LokiMessageAPI {
constructor() { constructor() {
this.jobQueue = new window.JobQueue(); this.jobQueue = new window.JobQueue();
@ -228,7 +206,7 @@ class LokiMessageAPI {
return false; return false;
} }
async openConnection(callback) { async openRetrieveConnection(callback) {
while (!_.isEmpty(this.ourSwarmNodes)) { while (!_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0]; const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address]; const nodeData = this.ourSwarmNodes[address];
@ -239,12 +217,12 @@ class LokiMessageAPI {
try { try {
// TODO: Revert back to using snode address instead of IP // TODO: Revert back to using snode address instead of IP
let messages = await retrieveNextMessages(nodeData.ip, nodeData); let messages = await this.retrieveNextMessages(nodeData.ip, nodeData);
successiveFailures = 0; successiveFailures = 0;
if (messages.length) { if (messages.length) {
const lastMessage = _.last(messages); const lastMessage = _.last(messages);
nodeData.lashHash = lastMessage.hash; nodeData.lastHash = lastMessage.hash;
lokiSnodeAPI.updateLastHash( await lokiSnodeAPI.updateLastHash(
address, address,
lastMessage.hash, lastMessage.hash,
lastMessage.expiration lastMessage.expiration
@ -264,7 +242,7 @@ class LokiMessageAPI {
const lastHash = await window.Signal.Data.getLastHashBySnode( const lastHash = await window.Signal.Data.getLastHashBySnode(
newSwarm[i] newSwarm[i]
); );
this.ourSwarmnewSwarm[newSwarm[i]] = { this.ourSwarmNodes[newSwarm[i]] = {
lastHash, lastHash,
}; };
} }
@ -279,9 +257,34 @@ class LokiMessageAPI {
successiveFailures += 1; successiveFailures += 1;
} }
} }
if (successiveFailures >= 3) {
await lokiSnodeAPI.unreachableNode(this.ourKey, address);
}
} }
} }
async retrieveNextMessages(nodeUrl, nodeData) {
const params = {
pubKey: this.ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
const result = await rpc(
`https://${nodeUrl}`,
nodeData.port,
'retrieve',
params,
options
);
return result.messages || [];
};
async startLongPolling(numConnections, callback) { async startLongPolling(numConnections, callback) {
this.ourSwarmNodes = {}; this.ourSwarmNodes = {};
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
@ -290,16 +293,18 @@ class LokiMessageAPI {
nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
} }
for (let i = 0; i < nodes.length; i += 1) { for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i]); const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i].address);
this.ourSwarmNodes[nodes[i]] = { this.ourSwarmNodes[nodes[i].address] = {
lastHash, lastHash,
ip: nodes[i].ip,
port: nodes[i].port,
}; };
} }
const promises = []; const promises = [];
for (let i = 0; i < numConnections; i += 1) for (let i = 0; i < numConnections; i += 1)
promises.push(this.openConnection(callback)); promises.push(this.openRetrieveConnection(callback));
// blocks until all snodes in our swarms have been removed from the list // blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)

@ -89,10 +89,8 @@ class LokiSnodeAPI {
async unreachableNode(pubKey, nodeUrl) { async unreachableNode(pubKey, nodeUrl) {
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')]; const swarmNodes = [...conversation.get('swarmNodes')];
if (swarmNodes.includes(nodeUrl)) { const filteredNodes = swarmNodes.filter(node => node.address !== nodeUrl);
const filteredNodes = swarmNodes.filter(node => node !== nodeUrl); await conversation.updateSwarmNodes(filteredNodes);
await conversation.updateSwarmNodes(filteredNodes);
}
} }
async updateLastHash(nodeUrl, lastHash, expiresAt) { async updateLastHash(nodeUrl, lastHash, expiresAt) {

Loading…
Cancel
Save