From 52ebcfdbab836965077bfc1de0cab4868f5e2cca Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Wed, 17 Apr 2024 14:56:07 +1000 Subject: [PATCH] fix: randomly pick a snode topollfrom until we build a better way --- ts/session/apis/snode_api/retrieveRequest.ts | 19 ++++++++---- ts/session/apis/snode_api/swarmPolling.ts | 27 +++++++---------- .../job_runners/jobs/ConfigurationSyncJob.ts | 30 +++++++++++++++++-- 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/ts/session/apis/snode_api/retrieveRequest.ts b/ts/session/apis/snode_api/retrieveRequest.ts index f8c9d9bde..e5ed42639 100644 --- a/ts/session/apis/snode_api/retrieveRequest.ts +++ b/ts/session/apis/snode_api/retrieveRequest.ts @@ -1,4 +1,4 @@ -import { isArray, omit } from 'lodash'; +import { isArray, omit, sortBy } from 'lodash'; import { Snode } from '../../../data/data'; import { updateIsOnline } from '../../../state/ducks/onion'; import { doSnodeBatchRequest } from './batchRequest'; @@ -167,11 +167,18 @@ async function retrieveNextMessages( GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t); // merge results with their corresponding namespaces - return results.map((result, index) => ({ - code: result.code, - messages: result.body as RetrieveMessagesResultsContent, - namespace: namespaces[index], - })); + return results.map((result, index) => { + const messages = result.body as RetrieveMessagesResultsContent; + // Not sure if that makes sense, but we probably want those messages sorted. + const sortedMessages = sortBy(messages.messages, m => m.timestamp); + messages.messages = sortedMessages; + + return { + code: result.code, + messages, + namespace: namespaces[index], + }; + }); } catch (e) { window?.log?.warn('exception while parsing json of nextMessage:', e); if (!window.inboxStore?.getState().onionPaths.isOnline) { diff --git a/ts/session/apis/snode_api/swarmPolling.ts b/ts/session/apis/snode_api/swarmPolling.ts index e8ddcc092..9b8f0ee7b 100644 --- a/ts/session/apis/snode_api/swarmPolling.ts +++ b/ts/session/apis/snode_api/swarmPolling.ts @@ -1,7 +1,7 @@ /* eslint-disable no-await-in-loop */ /* eslint-disable more/no-then */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { compact, concat, difference, flatten, last, sample, toNumber, uniqBy } from 'lodash'; +import { compact, concat, flatten, last, sample, toNumber, uniqBy } from 'lodash'; import { Data, Snode } from '../../../data/data'; import { SignalService } from '../../../protobuf'; import * as Receiver from '../../../receiver/receiver'; @@ -24,7 +24,6 @@ import { getConversationController } from '../../conversations'; import { IncomingMessage } from '../../messages/incoming/IncomingMessage'; import { ed25519Str } from '../../onions/onionPath'; import { StringUtils, UserUtils } from '../../utils'; -import { perfEnd, perfStart } from '../../utils/Performance'; import { LibSessionUtil } from '../../utils/libsession/libsession_utils'; import { SnodeNamespace, SnodeNamespaces } from './namespaces'; import { SnodeAPIRetrieve } from './retrieveRequest'; @@ -228,21 +227,16 @@ export class SwarmPolling { namespaces: Array ) { const polledPubkey = pubkey.key; + let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; const swarmSnodes = await snodePool.getSwarmFor(polledPubkey); - - // Select nodes for which we already have lastHashes - const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]); - let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null; - - // If we need more nodes, select randomly from the remaining nodes: - if (!toPollFrom) { - const notPolled = difference(swarmSnodes, alreadyPolled); - toPollFrom = sample(notPolled) as Snode; - } - - let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; + let toPollFrom: Snode | undefined; try { + toPollFrom = sample(swarmSnodes); + + if (!toPollFrom) { + throw new Error(`pollOnceForKey: no snode in swarm for ${ed25519Str(polledPubkey)}`); + } // Note: always print something so we know if the polling is hanging window.log.info( `about to pollNodeForKey of ${ed25519Str(pubkey.key)} from snode: ${ed25519Str(toPollFrom.pubkey_ed25519)} namespaces: ${namespaces} ` @@ -337,9 +331,10 @@ export class SwarmPolling { }); } - perfStart(`handleSeenMessages-${polledPubkey}`); const newMessages = await this.handleSeenMessages(messages); - perfEnd(`handleSeenMessages-${polledPubkey}`, 'handleSeenMessages'); + window.log.info( + `handleSeenMessages: ${newMessages.length} out of ${messages.length} are not seen yet. snode: ${toPollFrom ? ed25519Str(toPollFrom.pubkey_ed25519) : 'undefined'}` + ); // don't handle incoming messages from group swarms when using the userconfig and the group is not one of the tracked group const isUserConfigReleaseLive = await ReleasedFeatures.checkIsUserConfigFeatureReleased(); diff --git a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts index aaae284d1..0e94040c0 100644 --- a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts +++ b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts @@ -1,14 +1,18 @@ /* eslint-disable no-await-in-loop */ +import { to_hex } from 'libsodium-wrappers-sumo'; import { compact, isArray, isEmpty, isNumber, isString } from 'lodash'; import { v4 } from 'uuid'; import { UserUtils } from '../..'; import { ConfigDumpData } from '../../../../data/configDump/configDump'; import { ConfigurationSyncJobDone } from '../../../../shims/events'; +import { ReleasedFeatures } from '../../../../util/releaseFeature'; +import { isSignInByLinking } from '../../../../util/storage'; import { GenericWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface'; import { NotEmptyArrayOfBatchResults } from '../../../apis/snode_api/SnodeRequestTypes'; import { getConversationController } from '../../../conversations'; import { SharedConfigMessage } from '../../../messages/outgoing/controlMessage/SharedConfigMessage'; import { MessageSender } from '../../../sending/MessageSender'; +import { allowOnlyOneAtATime } from '../../Promise'; import { LibSessionUtil, OutgoingConfResult } from '../../libsession/libsession_utils'; import { runners } from '../JobRunner'; import { @@ -17,9 +21,6 @@ import { PersistedJob, RunJobResult, } from '../PersistedJob'; -import { ReleasedFeatures } from '../../../../util/releaseFeature'; -import { allowOnlyOneAtATime } from '../../Promise'; -import { isSignInByLinking } from '../../../../util/storage'; const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s) const defaultMaxAttempts = 2; @@ -208,6 +209,29 @@ class ConfigurationSyncJob extends PersistedJob }; }); + if (window.sessionFeatureFlags.debug.debugLibsessionDumps) { + for (let index = 0; index < LibSessionUtil.requiredUserVariants.length; index++) { + const variant = LibSessionUtil.requiredUserVariants[index]; + + window.log.info( + `ConfigurationSyncJob: current dumps: ${variant}:`, + to_hex(await GenericWrapperActions.dump(variant)) + ); + } + window.log.info( + 'ConfigurationSyncJob: About to push changes: ', + msgs.map(m => { + return { + ...m, + message: { + ...m.message, + data: to_hex(m.message.data), + }, + }; + }) + ); + } + const result = await MessageSender.sendMessagesToSnode( msgs, thisJobDestination,