From cc4168a8580d20c3285f9db6880af0714fdec237 Mon Sep 17 00:00:00 2001 From: audric Date: Thu, 12 Aug 2021 11:48:55 +1000 Subject: [PATCH] do not update last fetch timestamp when an exception for retrieve --- ts/session/snode_api/SNodeAPI.ts | 56 +++++++++++--------------- ts/session/snode_api/swarmPolling.ts | 59 ++++++++++++++++++++-------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/ts/session/snode_api/SNodeAPI.ts b/ts/session/snode_api/SNodeAPI.ts index 8771a53a3..b756a1ad4 100644 --- a/ts/session/snode_api/SNodeAPI.ts +++ b/ts/session/snode_api/SNodeAPI.ts @@ -561,44 +561,36 @@ export async function retrieveNextMessages( }; // let exceptions bubble up - try { - // no retry for this one as this a call we do every few seconds while polling for messages - const result = await snodeRpc('retrieve', params, targetNode, associatedWith); - - if (!result) { - window?.log?.warn( - `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}` - ); - return []; - } + // no retry for this one as this a call we do every few seconds while polling for messages + const result = await snodeRpc('retrieve', params, targetNode, associatedWith); - if (result.status !== 200) { - window?.log?.warn('retrieve result is not 200'); - return []; - } + if (!result) { + window?.log?.warn( + `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}` + ); + throw new Error( + `loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}` + ); + } - try { - const json = JSON.parse(result.body); - window.inboxStore?.dispatch(updateIsOnline(true)); + if (result.status !== 200) { + window?.log?.warn('retrieve result is not 200'); + throw new Error( + `loki_message:::_retrieveNextMessages - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port}` + ); + } - return json.messages || []; - } catch (e) { - window?.log?.warn('exception while parsing json of nextMessage:', e); - window.inboxStore?.dispatch(updateIsOnline(true)); + try { + const json = JSON.parse(result.body); + window.inboxStore?.dispatch(updateIsOnline(true)); - return []; - } + return json.messages || []; } catch (e) { - window?.log?.warn( - 'Got an error while retrieving next messages. Not retrying as we trigger fetch often:', - e.message + window?.log?.warn('exception while parsing json of nextMessage:', e); + window.inboxStore?.dispatch(updateIsOnline(true)); + throw new Error( + `loki_message:::_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}` ); - if (e.message === ERROR_CODE_NO_CONNECT) { - window.inboxStore?.dispatch(updateIsOnline(false)); - } else { - window.inboxStore?.dispatch(updateIsOnline(true)); - } - return []; } } diff --git a/ts/session/snode_api/swarmPolling.ts b/ts/session/snode_api/swarmPolling.ts index 8fafd555a..8e232cbd1 100644 --- a/ts/session/snode_api/swarmPolling.ts +++ b/ts/session/snode_api/swarmPolling.ts @@ -1,6 +1,6 @@ import { PubKey } from '../types'; import * as snodePool from './snodePool'; -import { retrieveNextMessages } from './SNodeAPI'; +import { ERROR_CODE_NO_CONNECT, retrieveNextMessages } from './SNodeAPI'; import { SignalService } from '../../protobuf'; import * as Receiver from '../../receiver/receiver'; import _ from 'lodash'; @@ -18,6 +18,8 @@ import { DURATION, SWARM_POLLING_TIMEOUT } from '../constants'; import { getConversationController } from '../conversations'; import { perfEnd, perfStart } from '../utils/Performance'; import { ed25519Str } from '../onions/onionPath'; +import { updateIsOnline } from '../../state/ducks/onion'; +import pRetry from 'p-retry'; type PubkeyToHash = { [key: string]: string }; @@ -193,20 +195,23 @@ export class SwarmPolling { nodesToPoll = _.concat(nodesToPoll, newNodes); } - const results = await Promise.all( + const resultsWithNull = await Promise.all( nodesToPoll.map(async (n: Snode) => { + // this returns null if an exception occurs return this.pollNodeForKey(n, pubkey); }) ); + // filter out null (exception thrown) + const results = _.compact(resultsWithNull); + // Merge results into one list of unique messages const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash); - if (isGroup) { + // if all snodes returned an error (null), no need to update the lastPolledTimestamp + if (isGroup && results?.length) { window?.log?.info( - `Polled for group(${ed25519Str(pubkey.key)}): group.pubkey, got ${ - messages.length - } messages back.` + `Polled for group(${ed25519Str(pubkey.key)}):, got ${messages.length} messages back.` ); // update the last fetched timestamp this.groupPolling = this.groupPolling.map(group => { @@ -234,24 +239,44 @@ export class SwarmPolling { // Fetches messages for `pubkey` from `node` potentially updating // the lash hash record - private async pollNodeForKey(node: Snode, pubkey: PubKey): Promise> { + private async pollNodeForKey(node: Snode, pubkey: PubKey): Promise | null> { const edkey = node.pubkey_ed25519; const pkStr = pubkey.key; const prevHash = await this.getLastHash(edkey, pkStr); - const messages = await retrieveNextMessages(node, prevHash, pkStr); - - if (!messages.length) { - return []; + try { + return await pRetry( + async () => { + const messages = await retrieveNextMessages(node, prevHash, pkStr); + if (!messages.length) { + return []; + } + + const lastMessage = _.last(messages); + + await this.updateLastHash(edkey, pubkey, lastMessage.hash, lastMessage.expiration); + return messages; + }, + { + minTimeout: 100, + retries: 2, + onFailedAttempt: e => { + window?.log?.warn( + `retrieveNextMessages attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...` + ); + }, + } + ); + } catch (e) { + if (e.message === ERROR_CODE_NO_CONNECT) { + window.inboxStore?.dispatch(updateIsOnline(false)); + } else { + window.inboxStore?.dispatch(updateIsOnline(true)); + } + return null; } - - const lastMessage = _.last(messages); - - await this.updateLastHash(edkey, pubkey, lastMessage.hash, lastMessage.expiration); - - return messages; } private loadGroupIds() {