From a2ea02960e7314ccf00394339e213135f430f712 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Mon, 24 May 2021 16:03:52 +1000 Subject: [PATCH] add some retries for not already retries requests --- password_preload.js | 4 +- preload.js | 16 ++-- ts/session/onions/onionPath.ts | 2 +- ts/session/sending/LokiMessageApi.ts | 27 +++--- ts/session/snode_api/SNodeAPI.ts | 131 ++++++++++++++++----------- ts/session/snode_api/lokiRpc.ts | 9 +- ts/session/snode_api/onions.ts | 65 ++++++++++--- 7 files changed, 164 insertions(+), 90 deletions(-) diff --git a/password_preload.js b/password_preload.js index 187cf441b..e0fd3e13d 100644 --- a/password_preload.js +++ b/password_preload.js @@ -35,12 +35,12 @@ window.Signal = { window.Signal.Logs = require('./js/modules/logs'); window.resetDatabase = () => { - window?.log?.info('reset database'); + window.log.info('reset database'); ipcRenderer.send('resetDatabase'); }; window.restart = () => { - window?.log?.info('restart'); + window.log.info('restart'); ipc.send('restart'); }; diff --git a/preload.js b/preload.js index 8d2e0a9af..31dd86729 100644 --- a/preload.js +++ b/preload.js @@ -72,7 +72,7 @@ window.isBeforeVersion = (toCheck, baseVersion) => { try { return semver.lt(toCheck, baseVersion); } catch (error) { - window?.log?.error( + window.log.error( `isBeforeVersion error: toCheck: ${toCheck}, baseVersion: ${baseVersion}`, error && error.stack ? error.stack : error ); @@ -164,11 +164,11 @@ window.open = () => null; window.eval = global.eval = () => null; window.drawAttention = () => { - // window?.log?.debug('draw attention'); + // window.log.debug('draw attention'); ipc.send('draw-attention'); }; window.showWindow = () => { - window?.log?.info('show window'); + window.log.info('show window'); ipc.send('show-window'); }; @@ -177,12 +177,12 @@ window.setAutoHideMenuBar = autoHide => ipc.send('set-auto-hide-menu-bar', autoH window.setMenuBarVisibility = visibility => ipc.send('set-menu-bar-visibility', visibility); window.restart = () => { - window?.log?.info('restart'); + window.log.info('restart'); ipc.send('restart'); }; window.resetDatabase = () => { - window?.log?.info('reset database'); + window.log.info('reset database'); ipc.send('resetDatabase'); }; @@ -198,7 +198,7 @@ window.onUnblockNumber = async number => { const conversation = window.getConversationController().get(number); await conversation.unblock(); } catch (e) { - window?.log?.info('IPC on unblock: failed to fetch conversation for number: ', number); + window.log.info('IPC on unblock: failed to fetch conversation for number: ', number); } } }; @@ -278,7 +278,7 @@ window.setAutoUpdateEnabled = value => ipc.send('set-auto-update-setting', !!val ipc.on('get-ready-for-shutdown', async () => { const { shutdown } = window.Events || {}; if (!shutdown) { - window?.log?.error('preload shutdown handler: shutdown method not found'); + window.log.error('preload shutdown handler: shutdown method not found'); ipc.send('now-ready-for-shutdown'); return; } @@ -299,7 +299,7 @@ window.removeSetupMenuItems = () => ipc.send('remove-setup-menu-items'); require('./js/logging'); if (config.proxyUrl) { - window?.log?.info('Using provided proxy url'); + window.log.info('Using provided proxy url'); } window.nodeSetImmediate = setImmediate; diff --git a/ts/session/onions/onionPath.ts b/ts/session/onions/onionPath.ts index d73168615..32f7800f3 100644 --- a/ts/session/onions/onionPath.ts +++ b/ts/session/onions/onionPath.ts @@ -164,7 +164,7 @@ export async function incrementBadPathCountOrDrop(guardNodeEd25519: string) { // a guard node is dropped when the path is dropped completely (in dropPathStartingWithGuardNode) for (let index = 1; index < pathWithIssues.length; index++) { const snode = pathWithIssues[index]; - await incrementBadSnodeCountOrDrop(snode.pubkey_ed25519); + await incrementBadSnodeCountOrDrop({ snodeEd25519: snode.pubkey_ed25519 }); } if (newPathFailureCount >= pathFailureThreshold) { diff --git a/ts/session/sending/LokiMessageApi.ts b/ts/session/sending/LokiMessageApi.ts index 24e112b0d..eb5d7863e 100644 --- a/ts/session/sending/LokiMessageApi.ts +++ b/ts/session/sending/LokiMessageApi.ts @@ -1,20 +1,10 @@ import _ from 'lodash'; -import { SendParams, storeOnNode } from '../snode_api/SNodeAPI'; -import { getSwarmFor, Snode } from '../snode_api/snodePool'; +import { storeOnNode } from '../snode_api/SNodeAPI'; +import { getSwarmFor } from '../snode_api/snodePool'; import { firstTrue } from '../utils/Promise'; const DEFAULT_CONNECTIONS = 3; -async function openSendConnection(snode: Snode, params: SendParams) { - // TODO: Revert back to using snode address instead of IP - const successfulSend = await storeOnNode(snode, params); - if (successfulSend) { - return snode; - } - // should we mark snode as bad if it can't store our message? - return undefined; -} - /** * Refactor note: We should really clean this up ... it's very messy * @@ -57,7 +47,18 @@ export async function sendMessage( const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS); - const promises = usedNodes.map(snodeConnection => openSendConnection(snodeConnection, params)); + const promises = usedNodes.map(async usedNode => { + // TODO: Revert back to using snode address instead of IP + // No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch. + // the only case we could care about a retry would be when the usedNode is not correct, + // but considering we trigger this request with a few snode in //, this should be fine. + const successfulSend = await storeOnNode(usedNode, params); + if (successfulSend) { + return usedNode; + } + // should we mark snode as bad if it can't store our message? + return undefined; + }); let snode; try { diff --git a/ts/session/snode_api/SNodeAPI.ts b/ts/session/snode_api/SNodeAPI.ts index 281d54f73..20ce08103 100644 --- a/ts/session/snode_api/SNodeAPI.ts +++ b/ts/session/snode_api/SNodeAPI.ts @@ -15,6 +15,7 @@ import { getRandomSnode, getRandomSnodePool, requiredSnodesForAgreement, Snode } import { Constants } from '..'; import { sha256 } from '../crypto'; import _ from 'lodash'; +import pRetry from 'p-retry'; const getSslAgentForSeedNode = (seedNodeHost: string, isSsl = false) => { let filePrefix = ''; @@ -177,50 +178,68 @@ export type SendParams = { }; // get snodes for pubkey from random snode. Uses an existing snode -export async function requestSnodesForPubkey(pubKey: string): Promise> { - let targetNode; - try { - targetNode = await getRandomSnode(); - const result = await snodeRpc( - 'get_snodes_for_pubkey', - { - pubKey, - }, - targetNode, - pubKey + +async function requestSnodesForPubkeyRetryable( + pubKey: string, + targetNode: Snode +): Promise> { + const params = { + pubKey, + }; + const result = await snodeRpc('get_snodes_for_pubkey', params, targetNode, pubKey); + + if (!result) { + window?.log?.warn( + `LokiSnodeAPI::requestSnodesForPubkeyRetryable - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value`, + result ); + throw new Error('requestSnodesForPubkeyRetryable: Invalid result'); + } - if (!result) { + if (result.status !== 200) { + window?.log?.warn('Status is not 200 for get_snodes_for_pubkey'); + throw new Error('requestSnodesForPubkeyRetryable: Invalid status code'); + } + + try { + const json = JSON.parse(result.body); + + if (!json.snodes) { + // we hit this when snode gives 500s window?.log?.warn( - `LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value`, + `LokiSnodeAPI::requestSnodesForPubkeyRetryable - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value for snodes`, result ); - return []; + throw new Error('Invalid json (empty)'); } - if (result.status !== 200) { - window?.log?.warn('Status is not 200 for get_snodes_for_pubkey'); - return []; - } + const snodes = json.snodes.filter((tSnode: any) => tSnode.ip !== '0.0.0.0'); + return snodes; + } catch (e) { + throw new Error('Invalid json'); + } +} - try { - const json = JSON.parse(result.body); +export async function requestSnodesForPubkey(pubKey: string): Promise> { + try { + const targetNode = await getRandomSnode(); - if (!json.snodes) { - // we hit this when snode gives 500s - window?.log?.warn( - `LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${targetNode.ip}:${targetNode.port} returned falsish value for snodes`, - result - ); - return []; + return await pRetry( + async () => { + return requestSnodesForPubkeyRetryable(pubKey, targetNode); + }, + { + retries: 10, // each path can fail 3 times before being dropped, we have 3 paths at most + factor: 2, + minTimeout: 200, + maxTimeout: 4000, + onFailedAttempt: e => { + window?.log?.warn( + `requestSnodesForPubkey attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...` + ); + }, } - - const snodes = json.snodes.filter((tSnode: any) => tSnode.ip !== '0.0.0.0'); - return snodes; - } catch (e) { - window?.log?.warn('Invalid json'); - return []; - } + ); } catch (e) { window?.log?.error('LokiSnodeAPI::requestSnodesForPubkey - error', e); @@ -301,8 +320,7 @@ export async function getSnodePoolFromSnode(targetNode: Snode): Promise { try { + // no retry here. If an issue is with the path this is handled in lokiOnionFetch + // if there is an issue with the targetNode, we still send a few times this request to a few snodes in // already so it's handled const result = await snodeRpc('store', params, targetNode, params.pubKey); if (!result || result.status !== 200) { @@ -356,6 +376,7 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis return false; } +/** */ export async function retrieveNextMessages( targetNode: Snode, lastHash: string, @@ -367,26 +388,32 @@ export async function retrieveNextMessages( }; // let exceptions bubble up - const result = await snodeRpc('retrieve', params, targetNode, pubkey); + try { + // no retry for this one as this a call we do every few seconds while polling for messages + const result = await snodeRpc('retrieve', params, targetNode, pubkey); - if (!result) { - window?.log?.warn( - `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}` - ); - return []; - } + if (!result) { + window?.log?.warn( + `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}` + ); + return []; + } - if (result.status !== 200) { - window.log('retrieve result is not 200'); - return []; - } + if (result.status !== 200) { + window.log('retrieve result is not 200'); + return []; + } - try { - const json = JSON.parse(result.body); - return json.messages || []; - } catch (e) { - window?.log?.warn('exception while parsing json of nextMessage:', e); + try { + const json = JSON.parse(result.body); + return json.messages || []; + } catch (e) { + window?.log?.warn('exception while parsing json of nextMessage:', e); + return []; + } + } catch (e) { + window?.log?.warn('Got an error while retrieving next messages:', e); return []; } } diff --git a/ts/session/snode_api/lokiRpc.ts b/ts/session/snode_api/lokiRpc.ts index bb6d518fa..64cdd3e1a 100644 --- a/ts/session/snode_api/lokiRpc.ts +++ b/ts/session/snode_api/lokiRpc.ts @@ -63,8 +63,13 @@ async function lokiFetch( } } -// Wrapper for a JSON RPC request -// Annoyngly, this is used for Lokid requests too +/** + * This function will throw for a few reasons. + * The loki-important ones are + * -> if we try to make a request to a path which fails too many times => user will need to retry himself + * -> if the targetNode gets too many errors => we will need to try do to this request again with anoter target node + * The + */ export async function snodeRpc( method: string, params: any, diff --git a/ts/session/snode_api/onions.ts b/ts/session/snode_api/onions.ts index 5887b4d37..5de319f1f 100644 --- a/ts/session/snode_api/onions.ts +++ b/ts/session/snode_api/onions.ts @@ -30,6 +30,8 @@ export interface SnodeResponse { status: number; } +const NEXT_NODE_NOT_FOUND_PREFIX = 'Next node not found: '; + // Returns the actual ciphertext, symmetric key that will be used // for decryption, and an ephemeral_key to send to the next hop async function encryptForPubKey(pubKeyX25519hex: string, reqObj: any): Promise { @@ -232,10 +234,10 @@ async function processOnionRequestErrorAtDestination({ process406Error(statusCode); await process421Error(statusCode, body, associatedWith, destinationEd25519); if (destinationEd25519) { - await processAnyOtherErrorAtDestination(statusCode, destinationEd25519, associatedWith); + await processAnyOtherErrorAtDestination(statusCode, body, destinationEd25519, associatedWith); } else { console.warn( - 'processOnionRequestErrorAtDestination: destinationEd25519 unset. was it a group call?', + 'processOnionRequestErrorAtDestination: destinationEd25519 unset. was it an open group call?', statusCode ); } @@ -257,16 +259,21 @@ async function processAnyOtherErrorOnPath( ) { window?.log?.warn(`[path] Got status: ${status}`); // - const prefix = 'Next node not found: '; let nodeNotFound; - if (ciphertext?.startsWith(prefix)) { - nodeNotFound = ciphertext.substr(prefix.length); + if (ciphertext?.startsWith(NEXT_NODE_NOT_FOUND_PREFIX)) { + nodeNotFound = ciphertext.substr(NEXT_NODE_NOT_FOUND_PREFIX.length); } // If we have a specific node in fault we can exclude just this node. // Otherwise we increment the whole path failure count if (nodeNotFound) { - await incrementBadSnodeCountOrDrop(nodeNotFound, associatedWith); + await incrementBadSnodeCountOrDrop({ + snodeEd25519: nodeNotFound, + associatedWith, + isNodeNotFound: true, + }); + + // we are checking errors on the path, a nodeNotFound on the path should trigger a rebuild } else { await incrementBadPathCountOrDrop(guardNodeEd25519); } @@ -276,6 +283,7 @@ async function processAnyOtherErrorOnPath( async function processAnyOtherErrorAtDestination( status: number, + body: string, destinationEd25519: string, associatedWith?: string ) { @@ -289,7 +297,25 @@ async function processAnyOtherErrorAtDestination( ) { window?.log?.warn(`[path] Got status at destination: ${status}`); - await incrementBadSnodeCountOrDrop(destinationEd25519, associatedWith); + let nodeNotFound; + if (body?.startsWith(NEXT_NODE_NOT_FOUND_PREFIX)) { + nodeNotFound = body.substr(NEXT_NODE_NOT_FOUND_PREFIX.length); + + if (nodeNotFound) { + await incrementBadSnodeCountOrDrop({ snodeEd25519: destinationEd25519, associatedWith }); + // if we get a nodeNotFound at the desitnation. it means the targetNode to which we made the request is not found. + // We have to retry with another targetNode so it's not just rebuilding the path. We have to go one lever higher (lokiOnionFetch). + // status is 502 for a node not found + throw new pRetry.AbortError( + `Bad Path handled. Retry this request with another targetNode. Status: ${status}` + ); + } + } + + // If we have a specific node in fault we can exclude just this node. + // Otherwise we increment the whole path failure count + // if (nodeNotFound) { + await incrementBadSnodeCountOrDrop({ snodeEd25519: destinationEd25519, associatedWith }); throw new Error(`Bad Path handled. Retry this request. Status: ${status}`); } @@ -496,14 +522,28 @@ async function handle421InvalidSwarm(snodeEd25519: string, body: string, associa * * @param snodeEd25519 the snode ed25519 which cause issues * @param associatedWith if set, we will drop this snode from the swarm of the pubkey too + * @param isNodeNotFound if set, we will drop this snode right now as this is an invalid node for the network. */ -export async function incrementBadSnodeCountOrDrop(snodeEd25519: string, associatedWith?: string) { +export async function incrementBadSnodeCountOrDrop({ + snodeEd25519, + associatedWith, + isNodeNotFound, +}: { + snodeEd25519: string; + associatedWith?: string; + isNodeNotFound?: boolean; +}) { const oldFailureCount = snodeFailureCount[snodeEd25519] || 0; const newFailureCount = oldFailureCount + 1; snodeFailureCount[snodeEd25519] = newFailureCount; - if (newFailureCount >= snodeFailureThreshold) { - window?.log?.warn(`Failure threshold reached for: ${snodeEd25519}; dropping it.`); + if (newFailureCount >= snodeFailureThreshold || isNodeNotFound) { + if (isNodeNotFound) { + window?.log?.warn(`Node not found reported for: ${snodeEd25519}; dropping it.`); + } else { + window?.log?.warn(`Failure threshold reached for: ${snodeEd25519}; dropping it.`); + } + if (associatedWith) { window?.log?.info(`Dropping ${snodeEd25519} from swarm of ${associatedWith}`); await dropSnodeFromSwarmIfNeeded(associatedWith, snodeEd25519); @@ -753,7 +793,8 @@ export async function lokiOnionFetch( { retries: 10, factor: 1, - minTimeout: 1000, + minTimeout: 200, + maxTimeout: 2000, onFailedAttempt: e => { window?.log?.warn( `onionFetchRetryable attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...` @@ -766,6 +807,6 @@ export async function lokiOnionFetch( } catch (e) { window?.log?.warn('onionFetchRetryable failed ', e); console.warn('error to show to user'); - return undefined; + throw e; } }