Sending messages now in a while loop like retrieving messages, added a failureCount and only delete node urls if they fail 3 times

pull/147/head
Beaudan 6 years ago
parent e6ab7010da
commit 561d60cfd5

@ -3,10 +3,6 @@
const fetch = require('node-fetch');
// eslint-disable-next-line
const invert = p => new Promise((res, rej) => p.then(rej, res));
const firstOf = ps => invert(Promise.all(ps.map(invert)));
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
class LokiMessageAPI {
@ -15,11 +11,6 @@ class LokiMessageAPI {
}
async sendMessage(pubKey, data, messageTimeStamp, ttl) {
const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey);
if (!swarmNodes || swarmNodes.size === 0) {
throw Error('No swarm nodes to query!');
}
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000);
// Nonce is returned as a base64 string to include in header
@ -42,11 +33,12 @@ class LokiMessageAPI {
// Something went horribly wrong
throw err;
}
let completedRequests = 0;
const requests = Array.from(swarmNodes).map(async node => {
const doRequest = async nodeUrl => {
// TODO: Confirm sensible timeout
const options = {
url: `${node}${this.messageServerPort}/store`,
url: `${nodeUrl}${this.messageServerPort}/store`,
type: 'POST',
responseType: undefined,
timeout: 5000,
@ -69,8 +61,8 @@ class LokiMessageAPI {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(options.type, options.url, 0, 'Error sending message');
window.LokiSnodeAPI.unreachableNode(pubKey, node);
throw HTTPError('fetch error', 0, e.toString());
window.LokiSnodeAPI.unreachableNode(pubKey, nodeUrl);
return;
}
let result;
@ -86,7 +78,8 @@ class LokiMessageAPI {
}
if (response.status >= 0 && response.status < 400) {
return result;
completedRequests += 1;
return;
}
log.error(
options.type,
@ -95,13 +88,32 @@ class LokiMessageAPI {
'Error sending message'
);
throw HTTPError('sendMessage: error response', response.status, result);
});
try {
// TODO: Possibly change this to require more than a single response?
const result = await firstOf(requests);
return result;
} catch (err) {
throw err;
};
let swarmNodes;
while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
try {
swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey);
} catch (e) {
throw new window.textsecure.EmptySwarmError(pubKey, e);
}
if (!swarmNodes || swarmNodes.size === 0) {
if (completedRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
pubKey,
new Error('Ran out of swarm nodes to query')
);
}
const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
await Promise.all(
Array.from(swarmNodes)
.splice(0, remainingRequests)
.map(nodeUrl => doRequest(nodeUrl))
);
}
}
@ -172,15 +184,27 @@ class LokiMessageAPI {
};
while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
if (Object.keys(ourSwarmNodes).length < remainingRequests) {
// This means we don't have enough swarm nodes to meet the minimum threshold
let ourSwarmNodes;
try {
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
} catch (e) {
throw window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
e
);
}
if (!ourSwarmNodes || Object.keys(ourSwarmNodes).length === 0) {
if (completedRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
new Error('Ran out of swarm nodes to query')
);
}
const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
await Promise.all(
Object.entries(ourSwarmNodes)
.splice(0, remainingRequests)

@ -6,6 +6,7 @@ const dns = require('dns');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1;
const FAILURE_THRESHOLD = 3;
class LokiSnodeAPI {
constructor({ url, swarmServerPort }) {
@ -16,6 +17,7 @@ class LokiSnodeAPI {
this.swarmServerPort = swarmServerPort ? `:${swarmServerPort}` : '';
this.swarmsPendingReplenish = {};
this.ourSwarmNodes = {};
this.contactSwarmNodes = {};
}
getRandomSnodeAddress() {
@ -33,7 +35,26 @@ class LokiSnodeAPI {
async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) {
delete this.ourSwarmNodes[nodeUrl];
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.ourSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.ourSwarmNodes[nodeUrl].failureCount >= FAILURE_THRESHOLD) {
delete this.ourSwarmNodes[nodeUrl];
}
return;
}
if (!this.contactSwarmNodes[nodeUrl]) {
this.contactSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.contactSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return;
}
const conversation = window.ConversationController.get(pubKey);
@ -47,6 +68,7 @@ class LokiSnodeAPI {
Conversation: Whisper.Conversation,
}
);
delete this.contactSwarmNodes[nodeUrl];
}
}
@ -74,7 +96,9 @@ class LokiSnodeAPI {
}
nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {};
this.ourSwarmNodes[url] = {
failureCount: 0,
};
});
}
return this.ourSwarmNodes;
@ -155,7 +179,7 @@ class LokiSnodeAPI {
0,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('fetch error', 0, e.toString());
throw HTTPError('getSwarmNodes fetch error', 0, e.toString());
}
let result;
@ -179,7 +203,7 @@ class LokiSnodeAPI {
response.status,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('sendMessage: error response', response.status, result);
throw HTTPError('getSwarmNodes: error response', response.status, result);
}
}

@ -127,6 +127,21 @@
}
inherit(Error, UnregisteredUserError);
function EmptySwarmError(number, error) {
// eslint-disable-next-line prefer-destructuring
this.number = number.split('.')[0];
ReplayableError.call(this, {
name: 'EmptySwarmError',
message: 'Could not get any swarm nodes to query',
});
if (error) {
appendStack(this, error);
}
}
inherit(ReplayableError, PoWError);
function PoWError(number, error) {
// eslint-disable-next-line prefer-destructuring
this.number = number.split('.')[0];
@ -151,4 +166,5 @@
window.textsecure.MessageError = MessageError;
window.textsecure.SignedPreKeyRotationError = SignedPreKeyRotationError;
window.textsecure.PoWError = PoWError;
window.textsecure.EmptySwarmError = EmptySwarmError;
})();

@ -186,13 +186,7 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) {
const pubKey = number;
try {
const result = await this.lokiMessageAPI.sendMessage(
pubKey,
data,
timestamp,
ttl
);
return result;
await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl);
} catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
// 409 and 410 should bubble and be handled by doSendMessage

Loading…
Cancel
Save