this.versionsRetrieved, handle ECONNREFUSED as bad otherwise retry in 1s, randomize list of snodes, only report status every 10%

pull/1007/head
Ryan Tharp 5 years ago
parent c8b097c60e
commit 40951f0579

@ -14,6 +14,8 @@ const snodeHttpsAgent = new https.Agent({
const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const SEED_NODE_RETRIES = 3;
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) {
if (!is.string(serverUrl)) {
@ -25,7 +27,8 @@ class LokiSnodeAPI {
this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = false;
this.versionPools = {};
this.versionMap = {};
this.versionMap = {}; // reverse version look up
this.versionsRetrieved = false; // to mark when it's done getting versions
this.onionPaths = [];
this.guardNodes = [];
@ -214,7 +217,7 @@ class LokiSnodeAPI {
log.warn(
`could not find some guard nodes: ${this.guardNodes.length}/${
edKeys.length
}`
} left`
);
}
}
@ -261,6 +264,7 @@ class LokiSnodeAPI {
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
// FIXME: _.sample?
return this.randomSnodePool[
Math.floor(Math.random() * this.randomSnodePool.length)
];
@ -282,13 +286,15 @@ class LokiSnodeAPI {
if (!goodVersions.length) {
return false;
}
// FIXME: _.sample?
const goodVersion =
goodVersions[Math.floor(Math.random() * goodVersions.length)];
const pool = this.versionPools[goodVersion];
// FIXME: _.sample?
return pool[Math.floor(Math.random() * pool.length)];
}
async getVersion(node, count, total) {
async getVersion(node) {
try {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
const result = await nodeFetch(
@ -297,11 +303,6 @@ class LokiSnodeAPI {
);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
const data = await result.json();
log.info(
`${count}/${total} ${node.ip}:${node.port}`,
'is on',
data.version
);
if (data.version) {
if (this.versionPools[data.version] === undefined) {
this.versionPools[data.version] = [node];
@ -312,16 +313,28 @@ class LokiSnodeAPI {
this.versionMap[`${node.ip}:${node.port}`] = data.version;
}
} catch (e) {
this.markRandomNodeUnreachable(node);
const randomNodesLeft = this.getRandomPoolLength();
log.warn(
'loki_snode:::getVersion - Error',
e.code,
e.message,
`removing ${node.ip}:${
node.port
} leaving ${randomNodesLeft} in the randomPool`
);
// ECONNREFUSED likely means it's just offline...
if (e.code === 'ECONNREFUSED') {
this.markRandomNodeUnreachable(node);
const randomNodesLeft = this.getRandomPoolLength();
// clean up these error messages to be a little neater
log.warn(
`loki_snode:::getVersion - ${node.ip}:${
node.port
} is offline, removing, leaving ${randomNodesLeft} in the randomPool`
);
} else {
// mostly ECONNRESETs
// ENOTFOUND could mean no internet or hiccup
log.warn(
'loki_snode:::getVersion - Error',
e.code,
e.message,
`on ${node.ip}:${node.port} retrying in 1s`
);
await timeoutDelay(1000);
await this.getVersion(node);
}
}
}
@ -366,9 +379,12 @@ class LokiSnodeAPI {
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.versionPools = {};
this.versionsRetrieved = false;
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
@ -389,11 +405,34 @@ class LokiSnodeAPI {
// start polling versions
resolve();
let c = 0;
const verionStart = Date.now();
const t = this.randomSnodePool.length;
this.randomSnodePool.forEach(async node => {
c += 1;
await this.getVersion(node, c, t);
});
const noticeEvery = parseInt(t / 10, 10);
const finalPromise = this.randomSnodePool.reduce(
async (p, node) => {
if (p) {
await p;
c += 1;
if (c % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.info(
`${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms`
);
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.info(
`version ${version} has ${nodes.toLocaleString()} snodes`
);
});
}
}
return this.getVersion(node);
}
);
await finalPromise;
log.info('Versions retrieved from network!');
this.versionsRetrieved = true;
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',

Loading…
Cancel
Save