Split logic for getting swarm nodes from db, getting from lokinet and saving to db. Now tracking successfulRequests when making queries on top of the completedNodes list

pull/147/head
Beaudan 6 years ago
parent 9ece4e1fd8
commit f6d167eda1

@ -181,8 +181,10 @@
return conversation;
}
window.LokiSnodeAPI.replenishSwarm(id);
try {
conversation.attributes.swarmNodes = await window.LokiSnodeAPI.getFreshSwarmNodes(
id
);
await window.Signal.Data.saveConversation(conversation.attributes, {
Conversation: Whisper.Conversation,
});

@ -1,4 +1,5 @@
/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker */
const fetch = require('node-fetch');
@ -35,6 +36,7 @@ class LokiMessageAPI {
throw err;
}
const completedNodes = [];
let successfulRequests = 0;
let canResolve = true;
const doRequest = async nodeUrl => {
@ -43,7 +45,7 @@ class LokiMessageAPI {
url: `${nodeUrl}${this.messageServerPort}/store`,
type: 'POST',
responseType: undefined,
timeout: 5000,
timeout: 10000,
};
const fetchOptions = {
@ -70,6 +72,7 @@ class LokiMessageAPI {
log.error(options.type, options.url, 0, 'Error sending message');
if (window.LokiSnodeAPI.unreachableNode(pubKey, nodeUrl)) {
completedNodes.push(nodeUrl);
swarmNodes = swarmNodes.filter(node => node !== nodeUrl);
}
return;
}
@ -88,6 +91,8 @@ class LokiMessageAPI {
if (response.status >= 0 && response.status < 400) {
completedNodes.push(nodeUrl);
swarmNodes = swarmNodes.filter(node => node !== nodeUrl);
successfulRequests += 1;
return;
}
log.error(
@ -100,27 +105,30 @@ class LokiMessageAPI {
};
let swarmNodes;
while (completedNodes.length < MINIMUM_SUCCESSFUL_REQUESTS) {
try {
swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey);
} catch (e) {
throw new window.textsecure.EmptySwarmError(pubKey, e);
}
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Sending messages');
}
try {
swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey);
swarmNodes = swarmNodes.filter(node => !(node in completedNodes));
} catch (e) {
throw new window.textsecure.EmptySwarmError(pubKey, e);
}
if (!swarmNodes || swarmNodes.length === 0) {
if (completedNodes.length !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
swarmNodes = await window.LokiSnodeAPI.getFreshSwarmNodes(pubKey);
swarmNodes = swarmNodes.filter(node => !(node in completedNodes));
if (!swarmNodes || swarmNodes.length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
pubKey,
new Error('Ran out of swarm nodes to query')
);
}
throw new window.textsecure.EmptySwarmError(
pubKey,
new Error('Ran out of swarm nodes to query')
);
await window.LokiSnodeAPI.saveSwarmNodes(pubKey, swarmNodes);
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - completedNodes.length;
await Promise.all(
@ -135,6 +143,7 @@ class LokiMessageAPI {
const ourKey = window.textsecure.storage.user.getNumber();
const completedNodes = [];
let canResolve = true;
let successfulRequests = 0;
const doRequest = async (nodeUrl, nodeData) => {
// TODO: Confirm sensible timeout
@ -142,7 +151,7 @@ class LokiMessageAPI {
url: `${nodeUrl}${this.messageServerPort}/retrieve`,
type: 'GET',
responseType: 'json',
timeout: 5000,
timeout: 10000,
};
const headers = {
@ -167,8 +176,6 @@ class LokiMessageAPI {
canResolve = false;
return;
}
// TODO: Maybe we shouldn't immediately delete?
// And differentiate between different connectivity issues
log.error(
options.type,
options.url,
@ -177,6 +184,7 @@ class LokiMessageAPI {
);
if (window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl)) {
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
}
return;
}
@ -193,51 +201,54 @@ class LokiMessageAPI {
result = await response.text();
}
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
if (response.status === 200) {
if (result.lastHash) {
window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash);
callback(result.messages);
}
successfulRequests += 1;
return;
}
// Handle error from snode
log.error(options.type, options.url, response.status, 'Error');
};
while (completedNodes.length < MINIMUM_SUCCESSFUL_REQUESTS) {
let ourSwarmNodes;
try {
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
} catch (e) {
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
e
);
}
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Retrieving messages');
}
let ourSwarmNodes;
try {
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
// Filter out the nodes we have already got responses from
ourSwarmNodes = Object.keys(ourSwarmNodes)
.filter(node => !(node in completedNodes))
.reduce(
// eslint-disable-next-line no-loop-func
(obj, node) => ({
...obj,
[node]: ourSwarmNodes[node],
}),
{}
if (Object.keys(ourSwarmNodes).length === 0) {
try {
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
// Filter out the nodes we have already got responses from
completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]);
} catch (e) {
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
e
);
}
if (Object.keys(ourSwarmNodes).length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
new Error('Ran out of swarm nodes to query')
);
} catch (e) {
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
e
);
}
if (!ourSwarmNodes || Object.keys(ourSwarmNodes).length === 0) {
if (completedNodes.length !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
new Error('Ran out of swarm nodes to query')
);
}
const remainingRequests =

@ -1,3 +1,4 @@
/* eslint-disable class-methods-use-this */
/* global log, window, Whisper */
const fetch = require('node-fetch');
@ -107,15 +108,25 @@ class LokiSnodeAPI {
async getSwarmNodesByPubkey(pubKey) {
const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey);
// TODO: Check if swarm list is below a threshold rather than empty
if (swarmNodes && swarmNodes.length !== 0) {
if (swarmNodes) {
return swarmNodes;
}
return this.replenishSwarm(pubKey);
return [];
}
async replenishSwarm(pubKey) {
async saveSwarmNodes(pubKey, swarmNodes) {
const conversation = window.ConversationController.get(pubKey);
conversation.set({ swarmNodes });
await window.Signal.Data.updateConversation(
conversation.id,
conversation.attributes,
{
Conversation: Whisper.Conversation,
}
);
}
async getFreshSwarmNodes(pubKey) {
if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async resolve => {
let newSwarmNodes;
@ -125,14 +136,6 @@ class LokiSnodeAPI {
// TODO: Handle these errors sensibly
newSwarmNodes = [];
}
conversation.set({ swarmNodes: newSwarmNodes });
await window.Signal.Data.updateConversation(
conversation.id,
conversation.attributes,
{
Conversation: Whisper.Conversation,
}
);
resolve(newSwarmNodes);
});
}
@ -149,7 +152,7 @@ class LokiSnodeAPI {
url: `http://${node}${this.swarmServerPort}/json_rpc`,
type: 'POST',
responseType: 'json',
timeout: 5000,
timeout: 10000,
};
const body = {

Loading…
Cancel
Save