do not update last fetch timestamp when an exception for retrieve

pull/1839/head
audric 4 years ago
parent 7a2d1c07a6
commit cc4168a858

@ -561,44 +561,36 @@ export async function retrieveNextMessages(
};
// let exceptions bubble up
try {
// no retry for this one as this a call we do every few seconds while polling for messages
const result = await snodeRpc('retrieve', params, targetNode, associatedWith);
if (!result) {
window?.log?.warn(
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}`
);
return [];
}
// no retry for this one as this a call we do every few seconds while polling for messages
const result = await snodeRpc('retrieve', params, targetNode, associatedWith);
if (result.status !== 200) {
window?.log?.warn('retrieve result is not 200');
return [];
}
if (!result) {
window?.log?.warn(
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}`
);
throw new Error(
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${targetNode.ip}:${targetNode.port}`
);
}
try {
const json = JSON.parse(result.body);
window.inboxStore?.dispatch(updateIsOnline(true));
if (result.status !== 200) {
window?.log?.warn('retrieve result is not 200');
throw new Error(
`loki_message:::_retrieveNextMessages - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port}`
);
}
return json.messages || [];
} catch (e) {
window?.log?.warn('exception while parsing json of nextMessage:', e);
window.inboxStore?.dispatch(updateIsOnline(true));
try {
const json = JSON.parse(result.body);
window.inboxStore?.dispatch(updateIsOnline(true));
return [];
}
return json.messages || [];
} catch (e) {
window?.log?.warn(
'Got an error while retrieving next messages. Not retrying as we trigger fetch often:',
e.message
window?.log?.warn('exception while parsing json of nextMessage:', e);
window.inboxStore?.dispatch(updateIsOnline(true));
throw new Error(
`loki_message:::_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}`
);
if (e.message === ERROR_CODE_NO_CONNECT) {
window.inboxStore?.dispatch(updateIsOnline(false));
} else {
window.inboxStore?.dispatch(updateIsOnline(true));
}
return [];
}
}

@ -1,6 +1,6 @@
import { PubKey } from '../types';
import * as snodePool from './snodePool';
import { retrieveNextMessages } from './SNodeAPI';
import { ERROR_CODE_NO_CONNECT, retrieveNextMessages } from './SNodeAPI';
import { SignalService } from '../../protobuf';
import * as Receiver from '../../receiver/receiver';
import _ from 'lodash';
@ -18,6 +18,8 @@ import { DURATION, SWARM_POLLING_TIMEOUT } from '../constants';
import { getConversationController } from '../conversations';
import { perfEnd, perfStart } from '../utils/Performance';
import { ed25519Str } from '../onions/onionPath';
import { updateIsOnline } from '../../state/ducks/onion';
import pRetry from 'p-retry';
type PubkeyToHash = { [key: string]: string };
@ -193,20 +195,23 @@ export class SwarmPolling {
nodesToPoll = _.concat(nodesToPoll, newNodes);
}
const results = await Promise.all(
const resultsWithNull = await Promise.all(
nodesToPoll.map(async (n: Snode) => {
// this returns null if an exception occurs
return this.pollNodeForKey(n, pubkey);
})
);
// filter out null (exception thrown)
const results = _.compact(resultsWithNull);
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
if (isGroup) {
// if all snodes returned an error (null), no need to update the lastPolledTimestamp
if (isGroup && results?.length) {
window?.log?.info(
`Polled for group(${ed25519Str(pubkey.key)}): group.pubkey, got ${
messages.length
} messages back.`
`Polled for group(${ed25519Str(pubkey.key)}):, got ${messages.length} messages back.`
);
// update the last fetched timestamp
this.groupPolling = this.groupPolling.map(group => {
@ -234,24 +239,44 @@ export class SwarmPolling {
// Fetches messages for `pubkey` from `node` potentially updating
// the lash hash record
private async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any>> {
private async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any> | null> {
const edkey = node.pubkey_ed25519;
const pkStr = pubkey.key;
const prevHash = await this.getLastHash(edkey, pkStr);
const messages = await retrieveNextMessages(node, prevHash, pkStr);
if (!messages.length) {
return [];
try {
return await pRetry(
async () => {
const messages = await retrieveNextMessages(node, prevHash, pkStr);
if (!messages.length) {
return [];
}
const lastMessage = _.last(messages);
await this.updateLastHash(edkey, pubkey, lastMessage.hash, lastMessage.expiration);
return messages;
},
{
minTimeout: 100,
retries: 2,
onFailedAttempt: e => {
window?.log?.warn(
`retrieveNextMessages attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
);
},
}
);
} catch (e) {
if (e.message === ERROR_CODE_NO_CONNECT) {
window.inboxStore?.dispatch(updateIsOnline(false));
} else {
window.inboxStore?.dispatch(updateIsOnline(true));
}
return null;
}
const lastMessage = _.last(messages);
await this.updateLastHash(edkey, pubkey, lastMessage.hash, lastMessage.expiration);
return messages;
}
private loadGroupIds() {

Loading…
Cancel
Save