@ -1,4 +1,4 @@
import { omit } from 'lodash' ;
import { isArray, omit } from 'lodash' ;
import { Snode } from '../../../data/data' ;
import { updateIsOnline } from '../../../state/ducks/onion' ;
import { doSnodeBatchRequest } from './batchRequest' ;
@ -7,8 +7,8 @@ import { SnodeNamespace, SnodeNamespaces } from './namespaces';
import { TTL_DEFAULT } from '../../constants' ;
import { UserUtils } from '../../utils' ;
import { sleepFor } from '../../utils/Promise' ;
import {
NotEmptyArrayOfBatchResults ,
RetrieveLegacyClosedGroupSubRequestType ,
RetrieveSubRequestType ,
UpdateExpiryOnNodeSubRequest ,
@ -103,40 +103,6 @@ async function buildRetrieveRequest(
return retrieveRequestsParams ;
}
function verifyBatchRequestResults (
targetNode : Snode ,
namespaces : Array < SnodeNamespaces > ,
results : NotEmptyArrayOfBatchResults
) {
if ( ! results || ! results . length ) {
window ? . log ? . warn (
` _retrieveNextMessages - sessionRpc could not talk to ${ targetNode . ip } : ${ targetNode . port } `
) ;
throw new Error (
` _retrieveNextMessages - sessionRpc could not talk to ${ targetNode . ip } : ${ targetNode . port } `
) ;
}
// the +1 is to take care of the extra `expire` method added once user config is released
if ( results . length !== namespaces . length && results . length !== namespaces . length + 1 ) {
throw new Error (
` We asked for updates about ${ namespaces . length } messages but got results of length ${ results . length } `
) ;
}
// do a basic check to know if we have something kind of looking right (status 200 should always be there for a retrieve)
const firstResult = results [ 0 ] ;
if ( firstResult . code !== 200 ) {
window ? . log ? . warn ( ` _retrieveNextMessages result is not 200 but ${ firstResult . code } ` ) ;
throw new Error (
` _retrieveNextMessages - retrieve result is not 200 with ${ targetNode . ip } : ${ targetNode . port } but ${ firstResult . code } `
) ;
}
return firstResult ;
}
async function retrieveNextMessages (
targetNode : Snode ,
lastHashes : Array < string > ,
@ -158,17 +124,41 @@ async function retrieveNextMessages(
) ;
// let exceptions bubble up
// no retry for this one as this a call we do every few seconds while polling for messages
const timeOutMs = 4 * 1000 ;
const timeoutPromise = async ( ) = > sleepFor ( timeOutMs ) ;
const fetchPromise = async ( ) = >
doSnodeBatchRequest ( retrieveRequestsParams , targetNode , timeOutMs , associatedWith ) ;
const results = await doSnodeBatchRequest (
retrieveRequestsParams ,
targetNode ,
4000 ,
associatedWith
) ;
// just to make sure that we don't hang for more than timeOutMs
const results = await Promise . race ( [ timeoutPromise ( ) , fetchPromise ( ) ] ) ;
try {
if ( ! results || ! isArray ( results ) || ! results . length ) {
window ? . log ? . warn (
` _retrieveNextMessages - sessionRpc could not talk to ${ targetNode . ip } : ${ targetNode . port } `
) ;
throw new Error (
` _retrieveNextMessages - sessionRpc could not talk to ${ targetNode . ip } : ${ targetNode . port } `
) ;
}
// the +1 is to take care of the extra `expire` method added once user config is released
if ( results . length !== namespaces . length && results . length !== namespaces . length + 1 ) {
throw new Error (
` We asked for updates about ${ namespaces . length } messages but got results of length ${ results . length } `
) ;
}
// do a basic check to know if we have something kind of looking right (status 200 should always be there for a retrieve)
const firstResult = results [ 0 ] ;
if ( firstResult . code !== 200 ) {
window ? . log ? . warn ( ` _retrieveNextMessages result is not 200 but ${ firstResult . code } ` ) ;
throw new Error (
` _retrieveNextMessages - retrieve result is not 200 with ${ targetNode . ip } : ${ targetNode . port } but ${ firstResult . code } `
) ;
}
// we rely on the code of the first one to check for online status
const firstResult = verifyBatchRequestResults ( targetNode , namespaces , results ) ;
const bodyFirstResult = firstResult . body ;
if ( ! window . inboxStore ? . getState ( ) . onionPaths . isOnline ) {
window . inboxStore ? . dispatch ( updateIsOnline ( true ) ) ;