@ -15,59 +15,22 @@ import { sendOnionRequestLsrpcDest, snodeHttpsAgent, SnodeResponse } from './oni
export { sendOnionRequestLsrpcDest } ;
import { getRandomSnodeAddress , markNodeUnreachable , Snode , updateSnodesFor } from './snodePool' ;
import {
getRandomSnodeAddress ,
getRandomSnodePool ,
getSwarm ,
markNodeUnreachable ,
requiredSnodesForAgreement ,
Snode ,
updateSnodesFor ,
} from './snodePool' ;
import { Constants } from '..' ;
import { sleepFor } from '../utils/Promise' ;
import { sha256 } from '../crypto' ;
import pRetry from 'p-retry' ;
import _ from 'lodash' ;
/ * *
* Currently unused . If we need it again , be sure to update it to onion routing rather
* than using a plain nodeFetch
* /
export async function getVersion ( node : Snode , retries : number = 0 ) : Promise < string | boolean > {
const SNODE_VERSION_RETRIES = 3 ;
const { log } = window ;
try {
window . log . warn ( 'insecureNodeFetch => plaintext for getVersion' ) ;
const result = await insecureNodeFetch ( ` https:// ${ node . ip } : ${ node . port } /get_stats/v1 ` , {
agent : snodeHttpsAgent ,
} ) ;
const data = await result . json ( ) ;
if ( data . version ) {
return data . version ;
} else {
return false ;
}
} catch ( e ) {
// ECONNREFUSED likely means it's just offline...
// ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline)
// ETIMEDOUT not sure what to do about these
// retry for now but maybe we should be marking bad...
if ( e . code === 'ECONNREFUSED' ) {
markNodeUnreachable ( node ) ;
// clean up these error messages to be a little neater
log . warn ( ` LokiSnodeAPI::_getVersion - ${ node . ip } : ${ node . port } is offline, removing ` ) ;
// if not ECONNREFUSED, it's mostly ECONNRESETs
// ENOTFOUND could mean no internet or hiccup
} else if ( retries < SNODE_VERSION_RETRIES ) {
log . warn (
'LokiSnodeAPI::_getVersion - Error' ,
e . code ,
e . message ,
` on ${ node . ip } : ${ node . port } retrying in 1s `
) ;
await sleepFor ( 1000 ) ;
return getVersion ( node , retries + 1 ) ;
} else {
markNodeUnreachable ( node ) ;
log . warn ( ` LokiSnodeAPI::_getVersion - failing to get version for ${ node . ip } : ${ node . port } ` ) ;
}
// maybe throw?
return false ;
}
}
const maxAcceptableFailuresStoreOnNode = 10 ;
const getSslAgentForSeedNode = ( seedNodeHost : string , isSsl = false ) = > {
let filePrefix = '' ;
@ -235,39 +198,37 @@ export type SendParams = {
export async function requestSnodesForPubkey ( pubKey : string ) : Promise < Array < Snode > > {
const { log } = window ;
let sn ode;
let targetN ode;
try {
sn ode = await getRandomSnodeAddress ( ) ;
targetN ode = await getRandomSnodeAddress ( ) ;
const result = await snodeRpc (
'get_snodes_for_pubkey' ,
{
pubKey ,
} ,
sn ode
targetN ode
) ;
if ( ! result ) {
log . warn (
` LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${ snode. ip } : ${ sn ode. port } returned falsish value ` ,
` LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${ targetNode. ip } : ${ targetN ode. port } returned falsish value ` ,
result
) ;
return [ ] ;
}
const res = result as SnodeResponse ;
if ( res . status !== 200 ) {
if ( result . status !== 200 ) {
log . warn ( 'Status is not 200 for get_snodes_for_pubkey' ) ;
return [ ] ;
}
try {
const json = JSON . parse ( res . body ) ;
const json = JSON . parse ( res ult . body ) ;
if ( ! json . snodes ) {
// we hit this when snode gives 500s
log . warn (
` LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${ snode. ip } : ${ sn ode. port } returned falsish value for snodes ` ,
` LokiSnodeAPI::requestSnodesForPubkey - lokiRpc on ${ targetNode. ip } : ${ targetN ode. port } returned falsish value for snodes ` ,
result
) ;
return [ ] ;
@ -282,19 +243,18 @@ export async function requestSnodesForPubkey(pubKey: string): Promise<Array<Snod
} catch ( e ) {
log . error ( 'LokiSnodeAPI::requestSnodesForPubkey - error' , e . code , e . message ) ;
if ( sn ode) {
markNodeUnreachable ( sn ode) ;
if ( targetN ode) {
markNodeUnreachable ( targetN ode) ;
}
return [ ] ;
}
}
export async function requestLnsMapping ( n ode: Snode , nameHash : any ) {
export async function requestLnsMapping ( targetN ode: Snode , nameHash : any ) {
const { log } = window ;
log . debug ( '[lns] lns requests to {}:{}' , node . ip , node . port ) ;
log . debug ( '[lns] lns requests to {}:{}' , targetNode . ip , targetNode ) ;
try {
// TODO: Check response status
return snodeRpc (
@ -302,28 +262,134 @@ export async function requestLnsMapping(node: Snode, nameHash: any) {
{
name_hash : nameHash ,
} ,
n ode
targetN ode
) ;
} catch ( e ) {
log . warn ( 'exception caught making lns requests to a node' , n ode, e ) ;
log . warn ( 'exception caught making lns requests to a node' , targetN ode, e ) ;
return false ;
}
}
function checkResponse ( response : SnodeResponse ) : void {
const { log , textsecure } = window ;
/ * *
* Try to fetch from 3 different snodes an updated list of snodes .
* If we get less than 24 common snodes in those result , we consider the request to failed and an exception is thrown .
* Return the list of nodes all snodes agreed on .
* /
export async function getSnodePoolFromSnodes() {
const existingSnodePool = await getRandomSnodePool ( ) ;
if ( existingSnodePool . length < 3 ) {
window . log . warn ( 'cannot get snodes from snodes; not enough snodes' , existingSnodePool . length ) ;
return ;
}
if ( response . status === 406 ) {
throw new textsecure . TimestampError ( 'Invalid Timestamp (check your clock)' ) ;
// Note intersectionWith only works with 3 at most array to find the common snodes.
const nodesToRequest = _ . sampleSize ( existingSnodePool , 3 ) ;
const results = await Promise . all (
nodesToRequest . map ( async node = > {
return pRetry (
async ( ) = > {
return getSnodePoolFromSnode ( node ) ;
} ,
{
retries : 3 ,
factor : 1 ,
minTimeout : 1000 ,
}
) ;
} )
) ;
// we want those at least `requiredSnodesForAgreement` snodes common between all the result
const commonSnodes = _ . intersectionWith (
results [ 0 ] ,
results [ 1 ] ,
results [ 2 ] ,
( s1 : Snode , s2 : Snode ) = > {
return s1 . ip === s2 . ip && s1 . port === s2 . port ;
}
) ;
// We want the snodes to agree on at least this many snodes
if ( commonSnodes . length < requiredSnodesForAgreement ) {
throw new Error ( 'inconsistentSnodePools' ) ;
}
return commonSnodes ;
}
/ * *
* Returns a list of uniq snodes got from the specified targetNode
* /
async function getSnodePoolFromSnode ( targetNode : Snode ) : Promise < Array < Snode > > {
const params = {
endpoint : 'get_service_nodes' ,
params : {
active_only : true ,
// limit: 256,
fields : {
public_ip : true ,
storage_port : true ,
pubkey_x25519 : true ,
pubkey_ed25519 : true ,
} ,
} ,
} ;
const method = 'oxend_request' ;
const result = await snodeRpc ( method , params , targetNode ) ;
if ( ! result || result . status !== 200 ) {
throw new Error ( 'Invalid result' ) ;
}
try {
const json = JSON . parse ( result . body ) ;
if ( ! json || ! json . result || ! json . result . service_node_states ? . length ) {
window . log . error (
'loki_snode_api:::getSnodePoolFromSnode - invalid result from seed' ,
result . body
) ;
return [ ] ;
}
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
const snodes = json . result . service_node_states
. filter ( ( snode : any ) = > snode . public_ip !== '0.0.0.0' )
. map ( ( snode : any ) = > ( {
ip : snode.public_ip ,
port : snode.storage_port ,
pubkey_x25519 : snode.pubkey_x25519 ,
pubkey_ed25519 : snode.pubkey_ed25519 ,
version : '' ,
} ) ) as Array < Snode > ;
// we the return list by the snode is already made of uniq snodes
return _ . compact ( snodes ) ;
} catch ( e ) {
window . log . error ( 'Invalid json response' ) ;
return [ ] ;
}
}
const json = JSON . parse ( response . body ) ;
function checkResponse ( response : SnodeResponse ) : void {
if ( response . status === 406 ) {
throw new window . textsecure . TimestampError ( 'Invalid Timestamp (check your clock)' ) ;
}
// Wrong swarm
// Wrong /invalid swarm
if ( response . status === 421 ) {
log . warn ( 'Wrong swarm, now looking at snodes' , json . snodes ) ;
const newSwarm = json . snodes ? json . snodes : [ ] ;
throw new textsecure . WrongSwarmError ( newSwarm ) ;
let json ;
try {
json = JSON . parse ( response . body ) ;
} catch ( e ) {
// could not parse result. Consider that snode as invalid
throw new window . textsecure . InvalidateSwarm ( ) ;
}
// The snode isn't associated with the given public key anymore
window . log . warn ( 'Wrong swarm, now looking at snodes' , json . snodes ) ;
if ( json . snodes ? . length ) {
throw new window . textsecure . WrongSwarmError ( json . snodes ) ;
}
// remove this node from the swarm of this pubkey
throw new window . textsecure . InvalidateSwarm ( ) ;
}
}
@ -331,7 +397,8 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
const { log , textsecure } = window ;
let successiveFailures = 0 ;
while ( successiveFailures < MAX_ACCEPTABLE_FAILURES ) {
while ( successiveFailures < maxAcceptableFailuresStoreOnNode ) {
// the higher this is, the longer the user delay is
// we don't want to burn through all our retries quickly
// we need to give the node a chance to heal
@ -343,17 +410,17 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
const result = await snodeRpc ( 'store' , params , targetNode ) ;
// do not return true if we get false here...
if ( result === false ) {
if ( ! result ) {
// this means the node we asked for is likely down
log . warn (
` loki_message:::storeOnNode - Try # ${ successiveFailures } / ${ MAX_ACCEPTABLE_FAILURES } ${ targetNode . ip } : ${ targetNode . port } failed `
` loki_message:::storeOnNode - Try # ${ successiveFailures } / ${ maxAcceptableFailuresStoreOnNode } ${ targetNode . ip } : ${ targetNode . port } failed `
) ;
successiveFailures += 1 ;
// eslint-disable-next-line no-continue
continue ;
}
const snodeRes = result as SnodeResponse ;
const snodeRes = result ;
checkResponse ( snodeRes ) ;
@ -382,6 +449,16 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
// TODO: Handle working connection but error response
const body = await e . response . text ( ) ;
log . warn ( 'loki_message:::storeOnNode - HTTPError body:' , body ) ;
} else if ( e instanceof window . textsecure . InvalidateSwarm ) {
window . log . warn (
'Got an `InvalidateSwarm` error, removing this node from this swarm of this pubkey'
) ;
const existingSwarm = await getSwarm ( params . pubKey ) ;
const updatedSwarm = existingSwarm . filter (
node = > node . pubkey_ed25519 !== targetNode . pubkey_ed25519
) ;
await updateSnodesFor ( params . pubKey , updatedSwarm ) ;
}
successiveFailures += 1 ;
}
@ -394,7 +471,7 @@ export async function storeOnNode(targetNode: Snode, params: SendParams): Promis
}
export async function retrieveNextMessages (
nodeData : Snode ,
targetNode : Snode ,
lastHash : string ,
pubkey : string
) : Promise < Array < any > > {
@ -404,36 +481,42 @@ export async function retrieveNextMessages(
} ;
// let exceptions bubble up
const result = await snodeRpc ( 'retrieve' , params , nodeData ) ;
const result = await snodeRpc ( 'retrieve' , params , targetNode ) ;
if ( ! result ) {
window . log . warn (
` loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${ nodeData. ip } : ${ nodeData . port } `
` loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${ targetNode. ip } : ${ targetNode . port } `
) ;
return [ ] ;
}
const res = result as SnodeResponse ;
// NOTE: we call `checkResponse` to check for "wrong swarm"
try {
checkResponse ( res ) ;
checkResponse ( res ult ) ;
} catch ( e ) {
window . log . warn ( 'loki_message:::retrieveNextMessages - send error:' , e . code , e . message ) ;
if ( e instanceof window . textsecure . WrongSwarmError ) {
const { newSwarm } = e ;
await updateSnodesFor ( params . pubKey , newSwarm ) ;
return [ ] ;
} else if ( e instanceof window . textsecure . InvalidateSwarm ) {
const existingSwarm = await getSwarm ( params . pubKey ) ;
const updatedSwarm = existingSwarm . filter (
node = > node . pubkey_ed25519 !== targetNode . pubkey_ed25519
) ;
await updateSnodesFor ( params . pubKey , updatedSwarm ) ;
return [ ] ;
}
}
if ( res . status !== 200 ) {
if ( res ult . status !== 200 ) {
window . log ( 'retrieve result is not 200' ) ;
return [ ] ;
}
try {
const json = JSON . parse ( res . body ) ;
const json = JSON . parse ( res ult . body ) ;
return json . messages || [ ] ;
} catch ( e ) {
window . log . warn ( 'exception while parsing json of nextMessage:' , e ) ;
@ -441,5 +524,3 @@ export async function retrieveNextMessages(
return [ ] ;
}
}
const MAX_ACCEPTABLE_FAILURES = 10 ;