Merge pull request #1007 from neuroscr/ssvertriage

Storage Server Version Triage
pull/1019/head
Ryan Tharp 5 years ago committed by GitHub
commit 7412b2d6fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,7 +16,7 @@
LokiFileServerAPI.secureRpcPubKey
);
let nextWaitSeconds = 1;
let nextWaitSeconds = 5;
const checkForUpgrades = async () => {
const result = await window.tokenlessFileServerAdnAPI.serverRequest(
'loki/v1/version/client/desktop'
@ -67,9 +67,7 @@
return res(expiredVersion);
}
log.info(
'Delaying sending checks for',
nextWaitSeconds,
's, no version yet'
`Delaying sending checks for ${nextWaitSeconds}s, no version yet`
);
setTimeout(waitForVersion, nextWaitSeconds * 1000);
return true;
@ -85,11 +83,7 @@
window.extension.expired = cb => {
if (expiredVersion === null) {
// just give it another second
log.info(
'Delaying expire banner determination for',
nextWaitSeconds,
's'
);
log.info(`Delaying expire banner determination for ${nextWaitSeconds}s`);
setTimeout(() => {
window.extension.expired(cb);
}, nextWaitSeconds * 1000);

@ -32,6 +32,8 @@ const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false,
});
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
const sendToProxy = async (
srvPubKey,
endpoint,
@ -44,8 +46,6 @@ const sendToProxy = async (
);
return {};
}
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
@ -61,6 +61,7 @@ const sendToProxy = async (
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
let fileUpload = false;
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
@ -74,8 +75,22 @@ const sendToProxy = async (
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
fileUpload = true;
}
// use nodes that support more than 1mb
const randomFunc = fileUpload
? 'getRandomProxySnodeAddress'
: 'getRandomSnodeAddress';
const randSnode = await lokiSnodeAPI[randomFunc]();
if (randSnode === false) {
log.warn('proxy random snode pool is not ready, retrying 10s', endpoint);
// no nodes in the pool yet, give it some time and retry
await timeoutDelay(1000);
return sendToProxy(srvPubKey, endpoint, pFetchOptions, options);
}
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
@ -138,7 +153,7 @@ const sendToProxy = async (
);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return sendToProxy(srvPubKey, endpoint, fetchOptions);
return sendToProxy(srvPubKey, endpoint, fetchOptions, options);
}
let response = {};

@ -136,33 +136,32 @@ const processOnionResponse = async (reqIdx, response, sharedKey, useAesGcm) => {
// detect SNode is not ready (not in swarm; not done syncing)
if (response.status === 503) {
log.warn('Got 503: snode not ready');
log.warn(`(${reqIdx}) [path] Got 503: snode not ready`);
return BAD_PATH;
}
if (response.status === 504) {
log.warn('Got 504: Gateway timeout');
log.warn(`(${reqIdx}) [path] Got 504: Gateway timeout`);
return BAD_PATH;
}
if (response.status === 404) {
// Why would we get this error on testnet?
log.warn('Got 404: Gateway timeout');
log.warn(`(${reqIdx}) [path] Got 404: Gateway timeout`);
return BAD_PATH;
}
if (response.status !== 200) {
log.warn(
'lokiRpc sendToProxy fetch unhandled error code:',
response.status
`(${reqIdx}) [path] fetch unhandled error code: ${response.status}`
);
return false;
}
const ciphertext = await response.text();
if (!ciphertext) {
log.warn('[path]: Target node return empty ciphertext');
log.warn(`(${reqIdx}) [path]: Target node return empty ciphertext`);
return false;
}
@ -183,9 +182,9 @@ const processOnionResponse = async (reqIdx, response, sharedKey, useAesGcm) => {
const textDecoder = new TextDecoder();
plaintext = textDecoder.decode(plaintextBuffer);
} catch (e) {
log.error(`(${reqIdx}) lokiRpc sendToProxy decode error`);
log.error(`(${reqIdx}) [path] decode error`);
if (ciphertextBuffer) {
log.error('ciphertextBuffer', ciphertextBuffer);
log.error(`(${reqIdx}) [path] ciphertextBuffer`, ciphertextBuffer);
}
return false;
}
@ -198,22 +197,13 @@ const processOnionResponse = async (reqIdx, response, sharedKey, useAesGcm) => {
const res = JSON.parse(jsonRes.body);
return res;
} catch (e) {
log.error(
`(${reqIdx}) lokiRpc sendToProxy parse error json: `,
jsonRes.body
);
log.error(`(${reqIdx}) [path] parse error json: `, jsonRes.body);
}
return false;
};
return jsonRes;
} catch (e) {
log.error(
'lokiRpc sendToProxy parse error',
e.code,
e.message,
`json:`,
plaintext
);
log.error('[path] parse error', e.code, e.message, `json:`, plaintext);
return false;
}
};
@ -280,8 +270,9 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
// we got a ton of randomPool nodes, let's just not worry about this one
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength();
const ciphertext = await response.text();
log.warn(
`lokiRpc sendToProxy`,
`lokiRpc:::sendToProxy -`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
@ -297,16 +288,13 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
// detect SNode is not ready (not in swarm; not done syncing)
if (response.status === 503 || response.status === 500) {
const ciphertext = await response.text();
// we shouldn't do these,
// it's seems to be not the random node that's always bad
// but the target node
// we got a ton of randomPool nodes, let's just not worry about this one
// this doesn't mean the random node is bad, it could be the target node
// but we got a ton of randomPool nodes, let's just not worry about this one
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength();
const ciphertext = await response.text();
log.warn(
`lokiRpc sendToProxy`,
`lokiRpc:::sendToProxy -`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
@ -346,7 +334,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
if (response.status !== 200) {
// let us know we need to create handlers for new unhandled codes
log.warn(
'lokiRpc sendToProxy fetch non-200 statusCode',
'lokiRpc:::sendToProxy - fetch non-200 statusCode',
response.status,
`from snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
@ -381,7 +369,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
plaintext = textDecoder.decode(plaintextBuffer);
} catch (e) {
log.error(
'lokiRpc sendToProxy decode error',
'lokiRpc:::sendToProxy - decode error',
e.code,
e.message,
`from ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
@ -403,7 +391,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
return JSON.parse(jsonRes.body);
} catch (e) {
log.error(
'lokiRpc sendToProxy parse error',
'lokiRpc:::sendToProxy - parse error',
e.code,
e.message,
`from ${randSnode.ip}:${randSnode.port} json:`,
@ -414,7 +402,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
};
if (retryNumber) {
log.info(
`lokiRpc sendToProxy request succeeded,`,
`lokiRpc:::sendToProxy - request succeeded,`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
@ -424,7 +412,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
return jsonRes;
} catch (e) {
log.error(
'lokiRpc sendToProxy parse error',
'lokiRpc:::sendToProxy - parse error',
e.code,
e.message,
`from ${randSnode.ip}:${randSnode.port} json:`,
@ -515,7 +503,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => {
fetchOptions.agent = snodeHttpsAgent;
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
} else {
log.info('lokiRpc http communication', url);
log.info('lokirpc:::lokiFetch - http communication', url);
}
const response = await nodeFetch(url, fetchOptions);
// restore TLS checking

@ -3,11 +3,19 @@
const is = require('@sindresorhus/is');
const { lokiRpc } = require('./loki_rpc');
const https = require('https');
const nodeFetch = require('node-fetch');
const semver = require('semver');
const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false,
});
const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const SEED_NODE_RETRIES = 3;
const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) {
if (!is.string(serverUrl)) {
@ -18,6 +26,9 @@ class LokiSnodeAPI {
this.randomSnodePool = [];
this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = false;
this.versionPools = {};
this.versionMap = {}; // reverse version look up
this.versionsRetrieved = false; // to mark when it's done getting versions
this.onionPaths = [];
this.guardNodes = [];
@ -30,6 +41,10 @@ class LokiSnodeAPI {
return this.randomSnodePool;
}
getRandomPoolLength() {
return this.randomSnodePool.length;
}
async testGuardNode(snode) {
log.info('Testing a candidate guard node ', snode);
@ -202,7 +217,7 @@ class LokiSnodeAPI {
log.warn(
`could not find some guard nodes: ${this.guardNodes.length}/${
edKeys.length
}`
} left`
);
}
}
@ -249,11 +264,85 @@ class LokiSnodeAPI {
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
// FIXME: _.sample?
return this.randomSnodePool[
Math.floor(Math.random() * this.randomSnodePool.length)
];
}
// use nodes that support more than 1mb
async getRandomProxySnodeAddress() {
/* resolve random snode */
if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards
await this.refreshRandomPool();
}
if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response');
}
const goodVersions = Object.keys(this.versionPools).filter(version =>
semver.gt(version, '2.0.1')
);
if (!goodVersions.length) {
return false;
}
// FIXME: _.sample?
const goodVersion =
goodVersions[Math.floor(Math.random() * goodVersions.length)];
const pool = this.versionPools[goodVersion];
// FIXME: _.sample?
return pool[Math.floor(Math.random() * pool.length)];
}
// WARNING: this leaks our IP to all snodes but with no other identifying information
// except that a client started up or ran out of random pool snodes
async getVersion(node) {
try {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
const result = await nodeFetch(
`https://${node.ip}:${node.port}/get_stats/v1`,
{ agent: snodeHttpsAgent }
);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
const data = await result.json();
if (data.version) {
if (this.versionPools[data.version] === undefined) {
this.versionPools[data.version] = [node];
} else {
this.versionPools[data.version].push(node);
}
// set up reverse mapping for removal lookup
this.versionMap[`${node.ip}:${node.port}`] = data.version;
}
} catch (e) {
// ECONNREFUSED likely means it's just offline...
// ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline)
// ETIMEDOUT not sure what to do about these
// retry for now but maybe we should be marking bad...
if (e.code === 'ECONNREFUSED') {
this.markRandomNodeUnreachable(node, { versionPoolFailure: true });
const randomNodesLeft = this.getRandomPoolLength();
// clean up these error messages to be a little neater
log.warn(
`loki_snode:::getVersion - ${node.ip}:${
node.port
} is offline, removing, leaving ${randomNodesLeft} in the randomPool`
);
} else {
// mostly ECONNRESETs
// ENOTFOUND could mean no internet or hiccup
log.warn(
'loki_snode:::getVersion - Error',
e.code,
e.message,
`on ${node.ip}:${node.port} retrying in 1s`
);
await timeoutDelay(1000);
await this.getVersion(node);
}
}
}
async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
// if currently not in progress
if (this.refreshRandomPoolPromise === false) {
@ -295,6 +384,12 @@ class LokiSnodeAPI {
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
// make sure order of the list is random, so we get version in a non-deterministic way
snodes = _.shuffle(snodes);
// commit changes to be live
// we'll update the version (in case they upgrade) every cycle
this.versionPools = {};
this.versionsRetrieved = false;
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
@ -312,7 +407,35 @@ class LokiSnodeAPI {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
// start polling versions
resolve();
// now get version for all snodes
// also acts an early online test/purge of bad nodes
let c = 0;
const verionStart = Date.now();
const t = this.randomSnodePool.length;
const noticeEvery = parseInt(t / 10, 10);
// eslint-disable-next-line no-restricted-syntax
for (const node of this.randomSnodePool) {
c += 1;
// eslint-disable-next-line no-await-in-loop
await this.getVersion(node);
if (c % noticeEvery === 0) {
// give stats
const diff = Date.now() - verionStart;
log.info(
`${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms`
);
Object.keys(this.versionPools).forEach(version => {
const nodes = this.versionPools[version].length;
log.info(
`version ${version} has ${nodes.toLocaleString()} snodes`
);
});
}
}
log.info('Versions retrieved from network!');
this.versionsRetrieved = true;
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
@ -402,15 +525,45 @@ class LokiSnodeAPI {
return filteredNodes;
}
markRandomNodeUnreachable(snode) {
this.randomSnodePool = _.without(
this.randomSnodePool,
_.find(this.randomSnodePool, { ip: snode.ip, port: snode.port })
);
}
getRandomPoolLength() {
return this.randomSnodePool.length;
markRandomNodeUnreachable(snode, options = {}) {
// avoid retries when we can't get the version because they're offline
if (!options.versionPoolFailure) {
const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`];
if (this.versionPools[snodeVersion]) {
this.versionPools[snodeVersion] = _.without(
this.versionPools[snodeVersion],
snode
);
} else {
if (snodeVersion) {
// reverse map (versionMap) is out of sync with versionPools
log.error(
'loki_snode:::markRandomNodeUnreachable - No snodes for version',
snodeVersion,
'retrying in 10s'
);
} else {
// we don't know our version yet
// and if we're offline, we'll likely not get it until it restarts if it does...
log.warn(
'loki_snode:::markRandomNodeUnreachable - No version for snode',
`${snode.ip}:${snode.port}`,
'retrying in 10s'
);
}
// make sure we don't retry past 15 mins (10s * 100 ~ 1000s)
const retries = options.retries || 0;
if (retries < 100) {
setTimeout(() => {
this.markRandomNodeUnreachable(snode, {
...options,
retries: retries + 1,
});
}, 10000);
}
}
}
this.randomSnodePool = _.without(this.randomSnodePool, snode);
}
async updateLastHash(snode, hash, expiresAt) {

Loading…
Cancel
Save