convert reduce back to for...of loop per Maxim, markRandomNodeUnreachable() make handle edge removal cases and optimize snode lookup

pull/1007/head
Ryan Tharp 6 years ago
parent 055ba2aa66
commit 6fdde32948

@ -314,8 +314,11 @@ class LokiSnodeAPI {
} }
} catch (e) { } catch (e) {
// ECONNREFUSED likely means it's just offline... // ECONNREFUSED likely means it's just offline...
// ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline)
// ETIMEDOUT not sure what to do about these
// retry for now but maybe we should be marking bad...
if (e.code === 'ECONNREFUSED') { if (e.code === 'ECONNREFUSED') {
this.markRandomNodeUnreachable(node); this.markRandomNodeUnreachable(node, { versionPoolFailure: true });
const randomNodesLeft = this.getRandomPoolLength(); const randomNodesLeft = this.getRandomPoolLength();
// clean up these error messages to be a little neater // clean up these error messages to be a little neater
log.warn( log.warn(
@ -408,29 +411,25 @@ class LokiSnodeAPI {
const verionStart = Date.now(); const verionStart = Date.now();
const t = this.randomSnodePool.length; const t = this.randomSnodePool.length;
const noticeEvery = parseInt(t / 10, 10); const noticeEvery = parseInt(t / 10, 10);
const finalPromise = this.randomSnodePool.reduce( // eslint-disable-next-line no-restricted-syntax
async (p, node) => { for (const node of this.randomSnodePool) {
if (p) { c += 1;
await p; // eslint-disable-next-line no-await-in-loop
c += 1; await this.getVersion(node);
if (c % noticeEvery === 0) { if (c % noticeEvery === 0) {
// give stats // give stats
const diff = Date.now() - verionStart; const diff = Date.now() - verionStart;
log.info( log.info(
`${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms` `${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms`
); );
Object.keys(this.versionPools).forEach(version => { Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length; const nodes = this.versionPools[version].length;
log.info( log.info(
`version ${version} has ${nodes.toLocaleString()} snodes` `version ${version} has ${nodes.toLocaleString()} snodes`
); );
}); });
}
}
return this.getVersion(node);
} }
); }
await finalPromise;
log.info('Versions retrieved from network!'); log.info('Versions retrieved from network!');
this.versionsRetrieved = true; this.versionsRetrieved = true;
} catch (e) { } catch (e) {
@ -522,21 +521,43 @@ class LokiSnodeAPI {
return filteredNodes; return filteredNodes;
} }
markRandomNodeUnreachable(snode) { markRandomNodeUnreachable(snode, options = {}) {
const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`]; // avoid retries when we can't get the version because they're offline
if (this.versionPools[snodeVersion]) { if (!options.versionPoolFailure) {
this.versionPools[snodeVersion] = _.without( const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`];
this.versionPools[snodeVersion], if (this.versionPools[snodeVersion]) {
_.find(this.versionPools[snodeVersion], { this.versionPools[snodeVersion] = _.without(
ip: snode.ip, this.versionPools[snodeVersion],
port: snode.port, snode
}) );
); } else {
if (snodeVersion) {
// reverse map (versionMap) is out of sync with versionPools
log.error(
'loki_snode:::markRandomNodeUnreachable - No snodes for version',
snodeVersion,
'retrying in 10s'
);
} else {
// we don't know our version yet
log.warn(
'loki_snode:::markRandomNodeUnreachable - No version for snode',
`${snode.ip}:${snode.port}`,
'retrying in 10s'
);
}
// make sure we don't retry past 15 mins (10s * 100 ~ 1000s)
if (options.retries < 100) {
setTimeout(() => {
this.markRandomNodeUnreachable(snode, {
...options,
retries: options.retries + 1,
});
}, 10000);
}
}
} }
this.randomSnodePool = _.without( this.randomSnodePool = _.without(this.randomSnodePool, snode);
this.randomSnodePool,
_.find(this.randomSnodePool, { ip: snode.ip, port: snode.port })
);
} }
async updateLastHash(snode, hash, expiresAt) { async updateLastHash(snode, hash, expiresAt) {

Loading…
Cancel
Save