refactor refreshRandomPool into small functions, exception clean up

pull/1061/head
Ryan Tharp 5 years ago
parent 2e3ebc0c8a
commit 90d2bbb338

@ -16,6 +16,131 @@ const SEED_NODE_RETRIES = 3;
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
// just get the filtered list
async function tryGetSnodeListFromLokidSeednode(
seedNodes = [...window.seedNodeList]
) {
// Removed limit until there is a way to get snode info
// for individual nodes (needed for guard nodes); this way
// we get all active nodes
const params = {
active_only: true,
fields: {
public_ip: true,
storage_port: true,
pubkey_x25519: true,
pubkey_ed25519: true,
},
};
// FIXME: use sample
const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
// throw before clearing the lock, so the retries can kick in
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
return snodes;
} catch (e) {
log.warn(
'loki_snodes:::tryGetSnodeListFromLokidSeednode - error',
e.code,
e.message
);
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
}
return [];
}
async function getSnodeListFromLokidSeednode(
seedNodes = [...window.seedNodeList],
retries = 0
) {
let snodes = [];
try {
snodes = await tryGetSnodeListFromLokidSeednode(seedNodes);
} catch (e) {
log.warn(
'loki_snodes:::getSnodeListFromLokidSeednode - error',
e.code,
e.message
);
// handle retries in case of temporary hiccups
if (retries < SEED_NODE_RETRIES) {
setTimeout(() => {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #',
retries
);
getSnodeListFromLokidSeednode(seedNodes, retries + 1);
}, retries * retries * 5000);
} else {
log.error('loki_snodes:::getSnodeListFromLokidSeednode - failing');
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
}
}
return snodes;
}
// FIXME: move out to more generic adv promise library
const snodeGlobalLocks = {};
async function allowOnlyOneAtATime(name, process, timeout) {
// if currently not in progress
if (snodeGlobalLocks[name] === undefined) {
// set lock
snodeGlobalLocks[name] = new Promise(async (resolve, reject) => {
// set up timeout feature
let timeoutTimer = null;
if (timeout) {
timeoutTimer = setTimeout(() => {
log.warn(
`loki_snodes:::allowOnlyOneAtATime - TIMEDOUT after ${timeout}s`
);
delete snodeGlobalLocks[name]; // clear lock
reject();
}, timeout);
}
// do actual work
await process();
// clear timeout timer
if (timeout) {
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
}
delete snodeGlobalLocks[name]; // clear lock
// release the kraken
resolve();
});
}
try {
await snodeGlobalLocks[name];
} catch (e) {
// we will throw for each time initialiseRandomPool has been called in parallel
log.error('loki_snodes:::allowOnlyOneAtATime - error', e.code, e.message);
throw e;
}
log.info('loki_snodes:::allowOnlyOneAtATime - RESOLVED');
}
class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) {
if (!is.string(serverUrl)) {
@ -25,7 +150,7 @@ class LokiSnodeAPI {
this.localUrl = localUrl; // localhost.loki
this.randomSnodePool = [];
this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = false;
this.refreshRandomPoolPromise = undefined;
this.versionPools = {};
this.versionMap = {}; // reverse version look up
this.versionsRetrieved = false; // to mark when it's done getting versions
@ -36,7 +161,12 @@ class LokiSnodeAPI {
async getRandomSnodePool() {
if (this.randomSnodePool.length === 0) {
await this.refreshRandomPool();
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await this.refreshRandomPool();
} catch (e) {
throw e;
}
}
return this.randomSnodePool;
}
@ -97,14 +227,14 @@ class LokiSnodeAPI {
async selectGuardNodes() {
const _ = window.Lodash;
let nodePool = await this.getRandomSnodePool();
const nodePool = await this.getRandomSnodePool();
if (nodePool.length === 0) {
log.error(`Could not select guarn nodes: node pool is empty`);
return [];
}
let shuffled = _.shuffle(nodePool);
const shuffled = _.shuffle(nodePool);
let guardNodes = [];
@ -127,7 +257,6 @@ class LokiSnodeAPI {
return [];
}
}
// The use of await inside while is intentional:
// we only want to repeat if the await fails
// eslint-disable-next-line-no-await-in-loop
@ -274,13 +403,17 @@ class LokiSnodeAPI {
}
async getRandomSnodeAddress() {
/* resolve random snode */
if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards
await this.refreshRandomPool();
}
// resolve random snode
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await this.refreshRandomPool();
} catch (e) {
throw e;
}
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
}
// FIXME: _.sample?
return this.randomSnodePool[
@ -300,13 +433,18 @@ class LokiSnodeAPI {
// use nodes that support more than 1mb
async getRandomProxySnodeAddress() {
/* resolve random snode */
// resolve random snode
if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards
// allow exceptions to pass through upwards without the unhandled promise rejection
try {
await this.refreshRandomPool();
} catch (e) {
throw e;
}
await this.refreshRandomPool();
}
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
}
const goodVersions = Object.keys(this.versionPools).filter(version =>
semver.gt(version, '2.0.1')
@ -324,6 +462,7 @@ class LokiSnodeAPI {
// WARNING: this leaks our IP to all snodes but with no other identifying information
// except that a client started up or ran out of random pool snodes
// and the order of the list is randomized, so a snode can't tell if it just started or not
async getVersion(node) {
try {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
@ -371,47 +510,94 @@ class LokiSnodeAPI {
}
}
async getAllVerionsForRandomSnodePool() {
// now get version for all snodes
// also acts an early online test/purge of bad nodes
let count = 0;
const verionStart = Date.now();
const total = this.randomSnodePool.length;
const noticeEvery = parseInt(total / 10, 10);
// eslint-disable-next-line no-restricted-syntax
for (const node of this.randomSnodePool) {
count += 1;
// eslint-disable-next-line no-await-in-loop
await this.getVersion(node);
if (count % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.info(
`loki_snode:::getAllVerionsForRandomSnodePool - ${count}/${total} pool version status update, has taken ${diff.toLocaleString()}ms`
);
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.info(
`loki_snode:::getAllVerionsForRandomSnodePool - version ${version} has ${nodes.toLocaleString()} snodes`
);
});
}
}
log.info('Versions retrieved from network!');
this.versionsRetrieved = true;
}
async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
await allowOnlyOneAtATime('refreshRandomPool', async () => {
let snodes = [];
try {
snodes = await getSnodeListFromLokidSeednode(seedNodes);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.versionPools = {};
this.versionsRetrieved = false;
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// start polling versions but no need to await it
this.getAllVerionsForRandomSnodePool();
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
/*
log.error(
'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node'
);
*/
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
});
}
async refreshRandomPool2(seedNodes = [...window.seedNodeList]) {
// if currently not in progress
if (this.refreshRandomPoolPromise === false) {
if (this.refreshRandomPoolPromise) {
// set lock
this.refreshRandomPoolPromise = new Promise(async (resolve, reject) => {
let timeoutTimer = null;
// private retry container
const trySeedNode = async (consecutiveErrors = 0) => {
// Removed limit until there is a way to get snode info
// for individual nodes (needed for guard nodes); this way
// we get all active nodes
const params = {
active_only: true,
fields: {
public_ip: true,
storage_port: true,
pubkey_x25519: true,
pubkey_ed25519: true,
},
};
const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshing random snode pool'
);
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
snodes = await getSnodeListFromLokidSeednode(seedNodes);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
@ -430,46 +616,21 @@ class LokiSnodeAPI {
'snodes'
);
// clear lock
this.refreshRandomPoolPromise = null;
delete this.refreshRandomPoolPromise;
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
// start polling versions
// start polling versions but no need to await it
this.getAllVerionsForRandomSnodePool();
resolve();
// now get version for all snodes
// also acts an early online test/purge of bad nodes
let c = 0;
const verionStart = Date.now();
const t = this.randomSnodePool.length;
const noticeEvery = parseInt(t / 10, 10);
// eslint-disable-next-line no-restricted-syntax
for (const node of this.randomSnodePool) {
c += 1;
// eslint-disable-next-line no-await-in-loop
await this.getVersion(node);
if (c % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.info(
`${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms`
);
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.info(
`version ${version} has ${nodes.toLocaleString()} snodes`
);
});
}
}
log.info('Versions retrieved from network!');
this.versionsRetrieved = true;
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
// handle retries in case of temporary hiccups
if (consecutiveErrors < SEED_NODE_RETRIES) {
// retry after a possible delay
setTimeout(() => {
@ -489,7 +650,9 @@ class LokiSnodeAPI {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
reject();
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
}
@ -515,10 +678,9 @@ class LokiSnodeAPI {
e.code,
e.message
);
throw new window.textsecure.SeedNodeError('Failed to contact seed node');
throw e;
}
log.info('loki_snodes:::refreshRandomPoolPromise - RESOLVED');
delete this.refreshRandomPoolPromise; // clear any lock
}
// unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
@ -564,12 +726,13 @@ class LokiSnodeAPI {
snode
);
} else {
const retries = options.retries || 0;
if (snodeVersion) {
// reverse map (versionMap) is out of sync with versionPools
log.error(
'loki_snode:::markRandomNodeUnreachable - No snodes for version',
snodeVersion,
'retrying in 10s'
`try #${retries} retrying in 10s`
);
} else {
// we don't know our version yet
@ -577,18 +740,17 @@ class LokiSnodeAPI {
log.warn(
'loki_snode:::markRandomNodeUnreachable - No version for snode',
`${snode.ip}:${snode.port}`,
'retrying in 10s'
`try #${retries} retrying in 10s`
);
}
// make sure we don't retry past 15 mins (10s * 100 ~ 1000s)
const retries = options.retries || 0;
if (retries < 100) {
setTimeout(() => {
this.markRandomNodeUnreachable(snode, {
...options,
retries: retries + 1,
});
}, 10000);
}, 10 * 1000);
}
}
}

Loading…
Cancel
Save