From d8cc0c79eac57d819c7f4c129fb9ecca9a2350d0 Mon Sep 17 00:00:00 2001 From: William Grant Date: Tue, 2 Apr 2024 16:57:55 +1100 Subject: [PATCH] feat: refactored swarm polling to use only retrieveNextMessages again created verifyBatchRequestResults function --- ts/session/apis/snode_api/retrieveRequest.ts | 126 +++++-------------- ts/session/apis/snode_api/swarmPolling.ts | 51 ++++---- 2 files changed, 57 insertions(+), 120 deletions(-) diff --git a/ts/session/apis/snode_api/retrieveRequest.ts b/ts/session/apis/snode_api/retrieveRequest.ts index ba4bbd9f8..8640587cb 100644 --- a/ts/session/apis/snode_api/retrieveRequest.ts +++ b/ts/session/apis/snode_api/retrieveRequest.ts @@ -8,6 +8,7 @@ import { SnodeNamespace, SnodeNamespaces } from './namespaces'; import { TTL_DEFAULT } from '../../constants'; import { UserUtils } from '../../utils'; import { + NotEmptyArrayOfBatchResults, RetrieveLegacyClosedGroupSubRequestType, RetrieveSubRequestType, UpdateExpiryOnNodeSubRequest, @@ -102,34 +103,11 @@ async function buildRetrieveRequest( return retrieveRequestsParams; } -async function retrieveNextMessages( +function verifyBatchRequestResults( targetNode: Snode, - lastHashes: Array, - associatedWith: string, namespaces: Array, - ourPubkey: string, - configHashesToBump: Array | null -): Promise { - if (namespaces.length !== lastHashes.length) { - throw new Error('namespaces and last hashes do not match'); - } - - const retrieveRequestsParams = await buildRetrieveRequest( - lastHashes, - associatedWith, - namespaces, - ourPubkey, - configHashesToBump - ); - // let exceptions bubble up - // no retry for this one as this a call we do every few seconds while polling for messages - - const results = await doSnodeBatchRequest( - retrieveRequestsParams, - targetNode, - 4000, - associatedWith - ); + results: NotEmptyArrayOfBatchResults +) { if (!results || !results.length) { window?.log?.warn( `_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}` @@ -150,83 +128,47 @@ async function retrieveNextMessages( const firstResult = results[0]; if (firstResult.code !== 200) { - window?.log?.warn(`retrieveNextMessages result is not 200 but ${firstResult.code}`); + window?.log?.warn(`_retrieveNextMessages result is not 200 but ${firstResult.code}`); throw new Error( `_retrieveNextMessages - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port} but ${firstResult.code}` ); } - try { - // we rely on the code of the first one to check for online status - const bodyFirstResult = firstResult.body; - if (!window.inboxStore?.getState().onionPaths.isOnline) { - window.inboxStore?.dispatch(updateIsOnline(true)); - } - - GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t); - - // merge results with their corresponding namespaces - return results.map((result, index) => ({ - code: result.code, - messages: result.body as RetrieveMessagesResultsContent, - namespace: namespaces[index], - })); - } catch (e) { - window?.log?.warn('exception while parsing json of nextMessage:', e); - if (!window.inboxStore?.getState().onionPaths.isOnline) { - window.inboxStore?.dispatch(updateIsOnline(true)); - } - throw new Error( - `_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}` - ); - } + return firstResult; } -async function retrieveDisplayName( +async function retrieveNextMessages( targetNode: Snode, - ourPubkey: string + lastHashes: Array, + associatedWith: string, + namespaces: Array, + ourPubkey: string, + configHashesToBump: Array | null ): Promise { + if (namespaces.length !== lastHashes.length) { + throw new Error('namespaces and last hashes do not match'); + } + const retrieveRequestsParams = await buildRetrieveRequest( - [], - ourPubkey, - [SnodeNamespaces.UserProfile], + lastHashes, + associatedWith, + namespaces, ourPubkey, - [] + configHashesToBump ); - // let exceptions bubble up - // no retry for this one as this a call we do every few seconds through polling - - const results = await doSnodeBatchRequest(retrieveRequestsParams, targetNode, 4000, ourPubkey); - - if (!results || !results.length) { - window?.log?.warn( - `retrieveDisplayName - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}` - ); - throw new Error( - `retrieveDisplayName - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}` - ); - } - - // the +1 is to take care of the extra `expire` method added once user config is released - if (results.length !== 1 && results.length !== 2) { - throw new Error( - `We asked for updates about a message but got results of length ${results.length}` - ); - } - - // do a basic check to know if we have something kind of looking right (status 200 should always be there for a retrieve) - const firstResult = results[0]; + // no retry for this one as this a call we do every few seconds while polling for messages - if (firstResult.code !== 200) { - window?.log?.warn(`retrieveDisplayName result is not 200 but ${firstResult.code}`); - throw new Error( - `retrieveDisplayName - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port} but ${firstResult.code}` - ); - } + const results = await doSnodeBatchRequest( + retrieveRequestsParams, + targetNode, + 4000, + associatedWith + ); try { // we rely on the code of the first one to check for online status + const firstResult = verifyBatchRequestResults(targetNode, namespaces, results); const bodyFirstResult = firstResult.body; if (!window.inboxStore?.getState().onionPaths.isOnline) { window.inboxStore?.dispatch(updateIsOnline(true)); @@ -235,22 +177,20 @@ async function retrieveDisplayName( GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t); // merge results with their corresponding namespaces - const resultsWithNamespaces = results.map(result => ({ + return results.map((result, index) => ({ code: result.code, messages: result.body as RetrieveMessagesResultsContent, - namespace: SnodeNamespaces.UserProfile, + namespace: namespaces[index], })); - - return resultsWithNamespaces; } catch (e) { - window?.log?.warn('retrieveDisplayName:', e); + window?.log?.warn('exception while parsing json of nextMessage:', e); if (!window.inboxStore?.getState().onionPaths.isOnline) { window.inboxStore?.dispatch(updateIsOnline(true)); } throw new Error( - `retrieveDisplayName - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}` + `_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}` ); } } -export const SnodeAPIRetrieve = { retrieveNextMessages, retrieveDisplayName }; +export const SnodeAPIRetrieve = { retrieveNextMessages }; diff --git a/ts/session/apis/snode_api/swarmPolling.ts b/ts/session/apis/snode_api/swarmPolling.ts index 5981936a2..e90b30faa 100644 --- a/ts/session/apis/snode_api/swarmPolling.ts +++ b/ts/session/apis/snode_api/swarmPolling.ts @@ -225,18 +225,7 @@ export class SwarmPolling { namespaces: Array ) { const polledPubkey = pubkey.key; - - const swarmSnodes = await snodePool.getSwarmFor(polledPubkey); - - // Select nodes for which we already have lastHashes - const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]); - let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null; - - // If we need more nodes, select randomly from the remaining nodes: - if (!toPollFrom) { - const notPolled = difference(swarmSnodes, alreadyPolled); - toPollFrom = sample(notPolled) as Snode; - } + const toPollFrom = await this.getNodesToPollFrom(pubkey.key); let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; try { @@ -558,6 +547,22 @@ export class SwarmPolling { } } + private async getNodesToPollFrom(polledPubkey: string) { + const swarmSnodes = await snodePool.getSwarmFor(polledPubkey); + + // Select nodes for which we already have lastHashes + const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]); + let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null; + + // If we need more nodes, select randomly from the remaining nodes: + if (!toPollFrom) { + const notPolled = difference(swarmSnodes, alreadyPolled); + toPollFrom = sample(notPolled) as Snode; + } + + return toPollFrom; + } + private loadGroupIds() { const convos = getConversationController().getConversations(); @@ -667,18 +672,7 @@ export class SwarmPolling { } const pubkey = UserUtils.getOurPubKeyFromCache(); - const polledPubkey = pubkey.key; - const swarmSnodes = await snodePool.getSwarmFor(polledPubkey); - - // Select nodes for which we already have lastHashes - const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]); - let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null; - - // If we need more nodes, select randomly from the remaining nodes: - if (!toPollFrom) { - const notPolled = difference(swarmSnodes, alreadyPolled); - toPollFrom = sample(notPolled) as Snode; - } + const toPollFrom = await this.getNodesToPollFrom(pubkey.key); if (abortSignal?.aborted) { throw new NotFoundError( @@ -686,9 +680,13 @@ export class SwarmPolling { ); } - const resultsFromUserProfile = await SnodeAPIRetrieve.retrieveDisplayName( + const resultsFromUserProfile = await SnodeAPIRetrieve.retrieveNextMessages( toPollFrom, - pubkey.key + [''], + pubkey.key, + [SnodeNamespaces.UserProfile], + pubkey.key, + null ); // check if we just fetched the details from the config namespaces. @@ -721,7 +719,6 @@ export class SwarmPolling { ); } - // window.log.debug(`[pollOnceForOurDisplayName] displayName found ${displayName}`); return displayName; } catch (e) { if (e.message === ERROR_CODE_NO_CONNECT) {