handle offline status for selectGuardNodes stuff

pull/1839/head
audric 4 years ago
parent ac40143ad0
commit 8344c48d88

@ -409,9 +409,6 @@
// Clear timer, since we're only called when the timer is expired
disconnectTimer = null;
// FIXME audric stop polling opengroupv2 and swarm nodes
window.libsession.Utils.AttachmentDownloads.stop();
}

@ -290,6 +290,10 @@ async function testGuardNode(snode: Snode) {
if (e.type === 'request-timeout') {
window?.log?.warn('test timeout for node,', snode);
}
if (e.code === 'ENETUNREACH') {
window?.log?.warn('no network on node,', snode);
throw new pRetry.AbortError(ERROR_CODE_NO_CONNECT);
}
return false;
}
@ -310,20 +314,27 @@ export async function selectGuardNodes(): Promise<Array<Snode>> {
window.log.info('selectGuardNodes snodePool:', nodePool.length);
if (nodePool.length < desiredGuardCount) {
window?.log?.error(
'Could not select guard nodes. Not enough nodes in the pool: ',
nodePool.length
`Could not select guard nodes. Not enough nodes in the pool: ${nodePool.length}`
);
throw new Error(
`Could not select guard nodes. Not enough nodes in the pool: ${nodePool.length}`
);
return [];
}
const shuffled = _.shuffle(nodePool);
let selectedGuardNodes: Array<Snode> = [];
let attempts = 0;
// The use of await inside while is intentional:
// we only want to repeat if the await fails
// eslint-disable-next-line-no-await-in-loop
while (selectedGuardNodes.length < desiredGuardCount) {
if (!window.globalOnlineStatus) {
window?.log?.error('selectedGuardNodes: offline');
throw new Error('selectedGuardNodes: offline');
}
if (shuffled.length < desiredGuardCount) {
window?.log?.error('Not enough nodes in the pool');
break;
@ -331,15 +342,25 @@ export async function selectGuardNodes(): Promise<Array<Snode>> {
const candidateNodes = shuffled.splice(0, desiredGuardCount);
// Test all three nodes at once
if (attempts > 10) {
// too many retries. something is wrong.
window.log.info(`selectGuardNodes stopping after attempts: ${attempts}`);
throw new Error(`selectGuardNodes stopping after attempts: ${attempts}`);
}
window.log.info(`selectGuardNodes attempts: ${attempts}`);
// Test all three nodes at once, wait for all to resolve or reject
// eslint-disable-next-line no-await-in-loop
const idxOk = await Promise.all(candidateNodes.map(testGuardNode));
const idxOk = (await Promise.allSettled(candidateNodes.map(testGuardNode))).flatMap(p =>
p.status === 'fulfilled' ? p.value : null
);
const goodNodes = _.zip(idxOk, candidateNodes)
.filter(x => x[0])
.map(x => x[1]) as Array<Snode>;
selectedGuardNodes = _.concat(selectedGuardNodes, goodNodes);
attempts++;
}
if (selectedGuardNodes.length < desiredGuardCount) {
@ -379,8 +400,12 @@ async function buildNewOnionPathsWorker() {
}
// If guard nodes is still empty (the old nodes are now invalid), select new ones:
if (guardNodes.length < desiredGuardCount) {
// TODO: don't throw away potentially good guard nodes
guardNodes = await exports.selectGuardNodes();
try {
guardNodes = await exports.selectGuardNodes();
} catch (e) {
window.log.warn('selectGuardNodes throw error. Not retrying.', e);
return;
}
}
// be sure to fetch again as that list might have been refreshed by selectGuardNodes
allNodes = await SnodePool.getRandomSnodePool();

@ -1,7 +1,11 @@
// tslint:disable: cyclomatic-complexity
import { OnionPaths } from '.';
import { FinalRelayOptions, sendOnionRequestLsrpcDest, SnodeResponse } from '../snode_api/onions';
import {
FinalRelayOptions,
sendOnionRequestHandlingSnodeEject,
SnodeResponse,
} from '../snode_api/onions';
import _, { toNumber } from 'lodash';
import { PROTOCOLS } from '../constants';
import { toHex } from '../utils/String';
@ -26,6 +30,12 @@ type OnionPayloadObj = {
headers: Record<string, any>;
};
export type FinalDestinationOptions = {
destination_ed25519_hex?: string;
headers?: Record<string, string>;
body?: string;
};
const buildSendViaOnionPayload = (url: URL, fetchOptions: OnionFetchOptions): OnionPayloadObj => {
let tempHeaders = fetchOptions.headers || {};
const payloadObj = {
@ -82,7 +92,7 @@ const initOptionsWithDefaults = (options: OnionFetchBasicOptions) => {
return _.defaults(options, defaultFetchBasicOptions);
};
const sendViaOnionRetryable = async ({
const sendViaOnionToNonSnodeRetryable = async ({
castedDestinationX25519Key,
finalRelayOptions,
payloadObj,
@ -99,15 +109,18 @@ const sendViaOnionRetryable = async ({
throw new Error('getOnionPathForSending is emtpy');
}
// this call throws a normal error (which will trigger a retry) if a retryable is got (bad path or whatever)
// it throws an AbortError in case the retry should not be retried again (aborted, or )
const result = await sendOnionRequestLsrpcDest(
pathNodes,
castedDestinationX25519Key,
/**
* This call handles ejecting a snode or a path if needed. If that happens, it throws a retryable error and the pRetry
* call above will call us again with the same params but a different path.
* If the error is not recoverable, it throws a pRetry.AbortError.
*/
const result: SnodeResponse = await sendOnionRequestHandlingSnodeEject({
nodePath: pathNodes,
destX25519Any: castedDestinationX25519Key,
finalDestOptions: payloadObj,
finalRelayOptions,
payloadObj,
abortSignal
);
abortSignal,
});
return result;
};
@ -160,7 +173,7 @@ export const sendViaOnionToNonSnode = async (
try {
result = await pRetry(
async () => {
return sendViaOnionRetryable({
return sendViaOnionToNonSnodeRetryable({
castedDestinationX25519Key,
finalRelayOptions,
payloadObj,
@ -174,13 +187,13 @@ export const sendViaOnionToNonSnode = async (
maxTimeout: 4000,
onFailedAttempt: e => {
window?.log?.warn(
`sendViaOnionRetryable attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
`sendViaOnionToNonSnodeRetryable attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left...`
);
},
}
);
} catch (e) {
window?.log?.warn('sendViaOnionRetryable failed ', e);
window?.log?.warn('sendViaOnionToNonSnodeRetryable failed ', e);
return null;
}

@ -520,12 +520,6 @@ export type DestinationContext = {
ephemeralKey: ArrayBuffer;
};
export type FinalDestinationOptions = {
destination_ed25519_hex?: string;
headers?: Record<string, string>;
body?: string;
};
/**
* Handle a 421. The body is supposed to be the new swarm nodes for this publickey.
* @param snodeEd25519 the snode gaving the reply
@ -651,7 +645,7 @@ export async function incrementBadSnodeCountOrDrop({
* This call tries to send the request via onion. If we get a bad path, it handles the snode removing of the swarm and snode pool.
* But the caller needs to handle the retry (and rebuild the path on his side if needed)
*/
const sendOnionRequestHandlingSnodeEject = async ({
export const sendOnionRequestHandlingSnodeEject = async ({
destX25519Any,
finalDestOptions,
nodePath,
@ -691,6 +685,9 @@ const sendOnionRequestHandlingSnodeEject = async ({
decodingSymmetricKey = result.decodingSymmetricKey;
} catch (e) {
window.log.warn('sendOnionRequest', e);
if (e.code === 'ENETUNREACH') {
throw e;
}
}
// this call will handle the common onion failure logic.
// if an error is not retryable a AbortError is triggered, which is handled by pRetry and retries are stopped
@ -844,26 +841,6 @@ async function sendOnionRequestSnodeDest(
});
}
/**
* This call tries to send the request via onion. If we get a bad path, it handles the snode removing of the swarm and snode pool.
* But the caller needs to handle the retry (and rebuild the path on his side if needed)
*/
export async function sendOnionRequestLsrpcDest(
onionPath: Array<Snode>,
destX25519Any: string,
finalRelayOptions: FinalRelayOptions,
payloadObj: FinalDestinationOptions,
abortSignal?: AbortSignal
): Promise<SnodeResponse> {
return sendOnionRequestHandlingSnodeEject({
nodePath: onionPath,
destX25519Any,
finalDestOptions: payloadObj,
finalRelayOptions,
abortSignal,
});
}
export function getPathString(pathObjArr: Array<{ ip: string; port: number }>): string {
return pathObjArr.map(node => `${node.ip}:${node.port}`).join(', ');
}

@ -128,6 +128,12 @@ export class SwarmPolling {
* Only public for testing
*/
public async TEST_pollForAllKeys() {
if (!window.globalOnlineStatus) {
window?.log?.error('pollForAllKeys: offline');
// Important to set up a new polling
setTimeout(this.TEST_pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE);
return;
}
// we always poll as often as possible for our pubkey
const ourPubkey = UserUtils.getOurPubKeyFromCache();
const directPromise = this.TEST_pollOnceForKey(ourPubkey, false);
@ -158,7 +164,7 @@ export class SwarmPolling {
try {
await Promise.all(_.concat(directPromise, groupPromises));
} catch (e) {
(window?.log?.info || console.warn)('pollForAllKeys swallowing exception: ', e);
window?.log?.info('pollForAllKeys exception: ', e);
throw e;
} finally {
setTimeout(this.TEST_pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE);
@ -195,21 +201,23 @@ export class SwarmPolling {
nodesToPoll = _.concat(nodesToPoll, newNodes);
}
const resultsWithNull = await Promise.all(
nodesToPoll.map(async (n: Snode) => {
// this returns null if an exception occurs
return this.pollNodeForKey(n, pubkey);
})
);
const arrayOfResultsWithNull = (
await Promise.allSettled(
nodesToPoll.map(async (n: Snode) => {
// this returns null if an exception occurs
return this.pollNodeForKey(n, pubkey);
})
)
).flatMap(entry => (entry.status === 'fulfilled' ? entry.value : null));
// filter out null (exception thrown)
const results = _.compact(resultsWithNull);
const arrayOfResults = _.compact(arrayOfResultsWithNull);
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
const messages = _.uniqBy(_.flatten(arrayOfResults), (x: any) => x.hash);
// if all snodes returned an error (null), no need to update the lastPolledTimestamp
if (isGroup && results?.length) {
if (isGroup && arrayOfResults?.length) {
window?.log?.info(
`Polled for group(${ed25519Str(pubkey.key)}):, got ${messages.length} messages back.`
);
@ -223,6 +231,8 @@ export class SwarmPolling {
}
return group;
});
} else if (isGroup) {
window?.log?.info(`Polled for group(${ed25519Str(pubkey.key)}):, but no results.`);
}
perfStart(`handleSeenMessages-${pkStr}`);

@ -148,11 +148,7 @@ describe('OnionPaths', () => {
TestUtils.stubWindow('getSeedNodeList', () => ['seednode1']);
// tslint:disable: no-void-expression no-console
TestUtils.stubWindow('log', {
info: (args: any) => console.info(args),
warn: (args: any) => console.warn(args),
error: (args: any) => console.error(args),
});
TestUtils.stubWindowLog();
sandbox.stub(SNodeAPI.SnodePool, 'refreshRandomPoolDetail').resolves(fakeSnodePool);
SNodeAPI.Onions.resetSnodeFailureCount();

@ -49,6 +49,8 @@ describe('SwarmPolling', () => {
sandbox.stub(SnodePool, 'getSwarmFor').resolves([]);
TestUtils.stubWindow('profileImages', { removeImagesNotInArray: noop, hasImage: noop });
TestUtils.stubWindow('inboxStore', undefined);
TestUtils.stubWindowLog();
const convoController = getConversationController();
await convoController.load();
getConversationController().getOrCreate(ourPubkey.key, ConversationTypeEnum.PRIVATE);
@ -188,14 +190,18 @@ describe('SwarmPolling', () => {
ConversationTypeEnum.GROUP
);
convo.set('active_at', 1);
convo.set('active_at', 1); // really old
const groupConvoPubkey = PubKey.cast(convo.id as string);
swarmPolling.addGroupId(groupConvoPubkey);
// this calls the stub 2 times, one for our direct pubkey and one for the group
await swarmPolling.start(true);
// this should only call the stub one more time: for our direct pubkey but not for the group pubkey
await swarmPolling.TEST_pollForAllKeys();
expect(pollOnceForKeySpy.callCount).to.eq(3);
expect(pollOnceForKeySpy.firstCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([ourPubkey, false]);
});
@ -220,7 +226,7 @@ describe('SwarmPolling', () => {
expect(pollOnceForKeySpy.lastCall.args).to.deep.eq([groupConvoPubkey, true]);
});
it('does run once only if activeAt is inactive', async () => {
it('does run twice if activeAt is inactive and we tick longer than 2 minutes', async () => {
const convo = getConversationController().getOrCreate(
TestUtils.generateFakePubKeyStr(),
ConversationTypeEnum.GROUP
@ -229,19 +235,23 @@ describe('SwarmPolling', () => {
convo.set('active_at', Date.now());
const groupConvoPubkey = PubKey.cast(convo.id as string);
swarmPolling.addGroupId(groupConvoPubkey);
// this call the stub two times already
await swarmPolling.start(true);
// more than hour old, we should not tick after just 5 seconds
// more than week old, we should tick only once for this group
convo.set('active_at', Date.now() - 7 * 25 * 3600 * 1000);
clock.tick(6000);
clock.tick(3 * 60 * 1000);
expect(pollOnceForKeySpy.callCount).to.eq(3);
// we should have two more calls here, so 4 total.
expect(pollOnceForKeySpy.callCount).to.eq(4);
expect(pollOnceForKeySpy.firstCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.getCalls()[3].args).to.deep.eq([groupConvoPubkey, true]);
});
it('does run once if activeAt is inactive ', async () => {
it('does run once only if group is inactive and we tick less than 2 minutes ', async () => {
const convo = getConversationController().getOrCreate(
TestUtils.generateFakePubKeyStr(),
ConversationTypeEnum.GROUP
@ -255,10 +265,11 @@ describe('SwarmPolling', () => {
// more than a week old, we should not tick after just 5 seconds
convo.set('active_at', Date.now() - 7 * 24 * 3600 * 1000 - 3600 * 1000);
clock.tick(6 * 1000); // active
clock.tick(1 * 60 * 1000);
// we should have only one more call here, the one for our direct pubkey fetch
expect(pollOnceForKeySpy.callCount).to.eq(3);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([groupConvoPubkey, true]); // this one comes from the swarmPolling.start
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([ourPubkey, false]);
});
@ -278,34 +289,40 @@ describe('SwarmPolling', () => {
await swarmPolling.start(true);
});
it('does run twice if activeAt is more is medium active ', async () => {
it('does run twice if activeAt is less than 2 days', async () => {
pollOnceForKeySpy.resetHistory();
// medium active category
// less than 2 days old, this is an active group
convo.set('active_at', Date.now() - 2 * 24 * 3600 * 1000 - 3600 * 1000);
clock.tick(61 * 1000); // medium_active
// we tick more than 5 sec
clock.tick(6 * 1000);
await swarmPolling.TEST_pollForAllKeys();
expect(pollOnceForKeySpy.callCount).to.eq(3);
// we have 4 calls total. 2 for our direct promises run each 5 seconds, and 2 for the group pubkey active (so run every 5 sec too)
expect(pollOnceForKeySpy.callCount).to.eq(4);
// first two calls are our pubkey
expect(pollOnceForKeySpy.firstCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.getCalls()[3].args).to.deep.eq([groupConvoPubkey, true]);
});
it('does run twice if activeAt is more than 2 days old and we tick more than one minute ', async () => {
pollOnceForKeySpy.resetHistory();
convo.set('active_at', Date.now() - 2 * 24 * 3600 * 1000);
convo.set('active_at', Date.now() - 2 * 25 * 3600 * 1000); // medium active
clock.tick(65 * 1000); // inactive
clock.tick(65 * 1000); // should tick twice more (one more our direct pubkey and one for the group)
await swarmPolling.TEST_pollForAllKeys();
expect(pollOnceForKeySpy.callCount).to.eq(3);
expect(pollOnceForKeySpy.callCount).to.eq(4);
// first two calls are our pubkey
expect(pollOnceForKeySpy.firstCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.secondCall.args).to.deep.eq([groupConvoPubkey, true]);
expect(pollOnceForKeySpy.thirdCall.args).to.deep.eq([ourPubkey, false]);
expect(pollOnceForKeySpy.getCalls()[3].args).to.deep.eq([groupConvoPubkey, true]);
});
});
});

@ -70,3 +70,13 @@ export function restoreStubs() {
globalAny.window = undefined;
sandbox.restore();
}
export const stubWindowLog = () => {
stubWindow('log', {
// tslint:disable: no-void-expression
// tslint:disable: no-console
info: (args: any) => console.info(args),
warn: (args: any) => console.warn(args),
error: (args: any) => console.error(args),
});
};

@ -1,12 +1,12 @@
{
"compilerOptions": {
// Basic Options
"target": "es2017", // Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017','ES2018' or 'ESNEXT'.
"target": "es2020", // Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017','ES2018' or 'ESNEXT'.
"module": "commonjs", // Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', or 'ESNext'.
// Specify library files to be included in the compilation.
"lib": [
"dom", // Required to access `window`
"es2017" // Required by `@sindresorhus/is`
"es2020"
],
// "allowJs": true, // Allow javascript files to be compiled.
// "checkJs": true, // Report errors in .js files.

Loading…
Cancel
Save