diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 1176dbad6..ba2a9feb4 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -14,6 +14,8 @@ const snodeHttpsAgent = new https.Agent({ const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3; const SEED_NODE_RETRIES = 3; +const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms)); + class LokiSnodeAPI { constructor({ serverUrl, localUrl }) { if (!is.string(serverUrl)) { @@ -25,7 +27,8 @@ class LokiSnodeAPI { this.swarmsPendingReplenish = {}; this.refreshRandomPoolPromise = false; this.versionPools = {}; - this.versionMap = {}; + this.versionMap = {}; // reverse version look up + this.versionsRetrieved = false; // to mark when it's done getting versions this.onionPaths = []; this.guardNodes = []; @@ -214,7 +217,7 @@ class LokiSnodeAPI { log.warn( `could not find some guard nodes: ${this.guardNodes.length}/${ edKeys.length - }` + } left` ); } } @@ -261,6 +264,7 @@ class LokiSnodeAPI { if (this.randomSnodePool.length === 0) { throw new window.textsecure.SeedNodeError('Invalid seed node response'); } + // FIXME: _.sample? return this.randomSnodePool[ Math.floor(Math.random() * this.randomSnodePool.length) ]; @@ -282,13 +286,15 @@ class LokiSnodeAPI { if (!goodVersions.length) { return false; } + // FIXME: _.sample? const goodVersion = goodVersions[Math.floor(Math.random() * goodVersions.length)]; const pool = this.versionPools[goodVersion]; + // FIXME: _.sample? return pool[Math.floor(Math.random() * pool.length)]; } - async getVersion(node, count, total) { + async getVersion(node) { try { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; const result = await nodeFetch( @@ -297,11 +303,6 @@ class LokiSnodeAPI { ); process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; const data = await result.json(); - log.info( - `${count}/${total} ${node.ip}:${node.port}`, - 'is on', - data.version - ); if (data.version) { if (this.versionPools[data.version] === undefined) { this.versionPools[data.version] = [node]; @@ -312,16 +313,28 @@ class LokiSnodeAPI { this.versionMap[`${node.ip}:${node.port}`] = data.version; } } catch (e) { - this.markRandomNodeUnreachable(node); - const randomNodesLeft = this.getRandomPoolLength(); - log.warn( - 'loki_snode:::getVersion - Error', - e.code, - e.message, - `removing ${node.ip}:${ - node.port - } leaving ${randomNodesLeft} in the randomPool` - ); + // ECONNREFUSED likely means it's just offline... + if (e.code === 'ECONNREFUSED') { + this.markRandomNodeUnreachable(node); + const randomNodesLeft = this.getRandomPoolLength(); + // clean up these error messages to be a little neater + log.warn( + `loki_snode:::getVersion - ${node.ip}:${ + node.port + } is offline, removing, leaving ${randomNodesLeft} in the randomPool` + ); + } else { + // mostly ECONNRESETs + // ENOTFOUND could mean no internet or hiccup + log.warn( + 'loki_snode:::getVersion - Error', + e.code, + e.message, + `on ${node.ip}:${node.port} retrying in 1s` + ); + await timeoutDelay(1000); + await this.getVersion(node); + } } } @@ -366,9 +379,12 @@ class LokiSnodeAPI { snodes = response.result.service_node_states.filter( snode => snode.public_ip !== '0.0.0.0' ); + // make sure order of the list is random, so we get version in a non-deterministic way + snodes = _.shuffle(snodes); // commit changes to be live // we'll update the version (in case they upgrade) every cycle this.versionPools = {}; + this.versionsRetrieved = false; this.randomSnodePool = snodes.map(snode => ({ ip: snode.public_ip, port: snode.storage_port, @@ -389,11 +405,34 @@ class LokiSnodeAPI { // start polling versions resolve(); let c = 0; + const verionStart = Date.now(); const t = this.randomSnodePool.length; - this.randomSnodePool.forEach(async node => { - c += 1; - await this.getVersion(node, c, t); - }); + const noticeEvery = parseInt(t / 10, 10); + const finalPromise = this.randomSnodePool.reduce( + async (p, node) => { + if (p) { + await p; + c += 1; + if (c % noticeEvery === 0) { + // give stats + const diff = Date.now() - verionStart; + log.info( + `${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms` + ); + Object.keys(this.versionPools).forEach(version => { + const nodes = this.versionPools[version].length; + log.info( + `version ${version} has ${nodes.toLocaleString()} snodes` + ); + }); + } + } + return this.getVersion(node); + } + ); + await finalPromise; + log.info('Versions retrieved from network!'); + this.versionsRetrieved = true; } catch (e) { log.warn( 'loki_snodes:::refreshRandomPoolPromise - error',