import { compact, concat, difference, flatten, last, sample, toNumber, uniqBy } from 'lodash'; import { Data, Snode } from '../../../data/data'; import { SignalService } from '../../../protobuf'; import * as Receiver from '../../../receiver/receiver'; import { PubKey } from '../../types'; import { ERROR_CODE_NO_CONNECT } from './SNodeAPI'; import * as snodePool from './snodePool'; import pRetry from 'p-retry'; import { ConversationModel } from '../../../models/conversation'; import { ConfigMessageHandler } from '../../../receiver/configMessage'; import { decryptEnvelopeWithOurKey } from '../../../receiver/contentMessage'; import { EnvelopePlus } from '../../../receiver/types'; import { updateIsOnline } from '../../../state/ducks/onion'; import { DURATION, SWARM_POLLING_TIMEOUT } from '../../constants'; import { getConversationController } from '../../conversations'; import { IncomingMessage } from '../../messages/incoming/IncomingMessage'; import { ed25519Str } from '../../onions/onionPath'; import { StringUtils, UserUtils } from '../../utils'; import { perfEnd, perfStart } from '../../utils/Performance'; import { SnodeNamespace, SnodeNamespaces } from './namespaces'; import { SnodeAPIRetrieve } from './retrieveRequest'; import { RetrieveMessageItem, RetrieveMessagesResultsBatched } from './types'; export function extractWebSocketContent( message: string, messageHash: string ): null | { body: Uint8Array; messageHash: string; } { try { const dataPlaintext = new Uint8Array(StringUtils.encode(message, 'base64')); const messageBuf = SignalService.WebSocketMessage.decode(dataPlaintext); if ( messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST && messageBuf.request?.body?.length ) { return { body: messageBuf.request.body, messageHash, }; } return null; } catch (error) { window?.log?.warn('extractWebSocketContent from message failed with:', error.message); return null; } } let instance: SwarmPolling | undefined; export const getSwarmPollingInstance = () => { if (!instance) { instance = new SwarmPolling(); } return instance; }; export class SwarmPolling { private groupPolling: Array<{ pubkey: PubKey; lastPolledTimestamp: number }>; private readonly lastHashes: Record>>; constructor() { this.groupPolling = []; this.lastHashes = {}; } public async start(waitForFirstPoll = false): Promise { this.loadGroupIds(); if (waitForFirstPoll) { await this.pollForAllKeys(); } else { setTimeout(() => { void this.pollForAllKeys(); }, 4000); } } /** * Used fo testing only */ public resetSwarmPolling() { this.groupPolling = []; } public forcePolledTimestamp(pubkey: PubKey, lastPoll: number) { this.groupPolling = this.groupPolling.map(group => { if (PubKey.isEqual(pubkey, group.pubkey)) { return { ...group, lastPolledTimestamp: lastPoll, }; } return group; }); } public addGroupId(pubkey: PubKey) { if (this.groupPolling.findIndex(m => m.pubkey.key === pubkey.key) === -1) { window?.log?.info('Swarm addGroupId: adding pubkey to polling', pubkey.key); this.groupPolling.push({ pubkey, lastPolledTimestamp: 0 }); } } public removePubkey(pk: PubKey | string) { const pubkey = PubKey.cast(pk); window?.log?.info('Swarm removePubkey: removing pubkey from polling', pubkey.key); this.groupPolling = this.groupPolling.filter(group => !pubkey.isEqual(group.pubkey)); } /** * Only public for testing purpose. * * Currently, a group with an * -> an activeAt less than 2 days old is considered active and polled often (every 5 sec) * -> an activeAt less than 1 week old is considered medium_active and polled a bit less (every minute) * -> an activeAt more than a week old is considered inactive, and not polled much (every 2 minutes) */ public getPollingTimeout(convoId: PubKey) { const convo = getConversationController().get(convoId.key); if (!convo) { return SWARM_POLLING_TIMEOUT.INACTIVE; } const activeAt = convo.get('active_at'); if (!activeAt) { return SWARM_POLLING_TIMEOUT.INACTIVE; } const currentTimestamp = Date.now(); // consider that this is an active group if activeAt is less than two days old if (currentTimestamp - activeAt <= DURATION.DAYS * 2) { return SWARM_POLLING_TIMEOUT.ACTIVE; } if (currentTimestamp - activeAt <= DURATION.DAYS * 7) { return SWARM_POLLING_TIMEOUT.MEDIUM_ACTIVE; } return SWARM_POLLING_TIMEOUT.INACTIVE; } /** * Only public for testing */ public async pollForAllKeys() { if (!window.getGlobalOnlineStatus()) { window?.log?.error('pollForAllKeys: offline'); // Important to set up a new polling setTimeout(this.pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE); return; } // we always poll as often as possible for our pubkey const ourPubkey = UserUtils.getOurPubKeyFromCache(); const directPromise = Promise.all([ this.pollOnceForKey(ourPubkey, false, this.getUserNamespacesPolled()), ]).then(() => undefined); const now = Date.now(); const groupPromises = this.groupPolling.map(async group => { const convoPollingTimeout = this.getPollingTimeout(group.pubkey); const diff = now - group.lastPolledTimestamp; const loggingId = getConversationController() .get(group.pubkey.key) ?.idForLogging() || group.pubkey.key; if (diff >= convoPollingTimeout) { window?.log?.info( `Polling for ${loggingId}; timeout: ${convoPollingTimeout}; diff: ${diff} ` ); return this.pollOnceForKey(group.pubkey, true, [SnodeNamespaces.ClosedGroupMessage]); } window?.log?.info( `Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}` ); return Promise.resolve(); }); try { await Promise.all(concat([directPromise], groupPromises)); } catch (e) { window?.log?.info('pollForAllKeys exception: ', e); throw e; } finally { setTimeout(this.pollForAllKeys.bind(this), SWARM_POLLING_TIMEOUT.ACTIVE); } } /** * Only exposed as public for testing */ public async pollOnceForKey( pubkey: PubKey, isGroup: boolean, namespaces: Array ) { const polledPubkey = pubkey.key; const swarmSnodes = await snodePool.getSwarmFor(polledPubkey); // Select nodes for which we already have lastHashes const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]); let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null; // If we need more nodes, select randomly from the remaining nodes: if (!toPollFrom) { const notPolled = difference(swarmSnodes, alreadyPolled); toPollFrom = sample(notPolled) as Snode; } let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; try { resultsFromAllNamespaces = await this.pollNodeForKey(toPollFrom, pubkey, namespaces); } catch (e) { window.log.warn( `pollNodeForKey of ${pubkey} namespaces: ${namespaces} failed with: ${e.message}` ); resultsFromAllNamespaces = null; } let allNamespacesWithoutUserConfigIfNeeded: Array = []; // check if we just fetched the details from the config namespaces. // If yes, merge them together and exclude them from the rest of the messages. if (window.sessionFeatureFlags.useSharedUtilForUserConfig && resultsFromAllNamespaces) { const userConfigMessages = resultsFromAllNamespaces .filter(m => SnodeNamespace.isUserConfigNamespace(m.namespace)) .map(r => r.messages.messages); allNamespacesWithoutUserConfigIfNeeded = flatten( compact( resultsFromAllNamespaces .filter(m => !SnodeNamespace.isUserConfigNamespace(m.namespace)) .map(r => r.messages.messages) ) ); if (!isGroup) { const userConfigMessagesMerged = flatten(compact(userConfigMessages)); window.log.info( `received userConfigMessagesMerged: ${userConfigMessagesMerged.length} for key ${pubkey.key}` ); try { await this.handleSharedConfigMessages(userConfigMessagesMerged); } catch (e) { window.log.warn( `handleSharedConfigMessages of ${userConfigMessagesMerged.length} failed with ${e.message}` ); // not rethrowing } } // first make sure to handle the shared user config message first } else { allNamespacesWithoutUserConfigIfNeeded = flatten( compact(resultsFromAllNamespaces?.map(m => m.messages.messages)) ); } window.log.info( `received allNamespacesWithoutUserConfigIfNeeded: ${allNamespacesWithoutUserConfigIfNeeded.length}` ); // Merge results into one list of unique messages const messages = uniqBy(allNamespacesWithoutUserConfigIfNeeded, x => x.hash); // if all snodes returned an error (null), no need to update the lastPolledTimestamp if (isGroup && allNamespacesWithoutUserConfigIfNeeded?.length) { window?.log?.info( `Polled for group(${ed25519Str(pubkey.key)}):, got ${messages.length} messages back.` ); let lastPolledTimestamp = Date.now(); if (messages.length >= 95) { // if we get 95 messages or more back, it means there are probably more than this // so make sure to retry the polling in the next 5sec by marking the last polled timestamp way before that it is really // this is a kind of hack lastPolledTimestamp = Date.now() - SWARM_POLLING_TIMEOUT.INACTIVE - 5 * 1000; } // update the last fetched timestamp this.groupPolling = this.groupPolling.map(group => { if (PubKey.isEqual(pubkey, group.pubkey)) { return { ...group, lastPolledTimestamp, }; } return group; }); } perfStart(`handleSeenMessages-${polledPubkey}`); const newMessages = await this.handleSeenMessages(messages); perfEnd(`handleSeenMessages-${polledPubkey}`, 'handleSeenMessages'); // trigger the handling of all the other messages, not shared config related newMessages.forEach(m => { const content = extractWebSocketContent(m.data, m.hash); if (!content) { return; } Receiver.handleRequest(content.body, isGroup ? polledPubkey : null, content.messageHash); }); } private async handleSharedConfigMessages(userConfigMessagesMerged: Array) { const extractedUserConfigMessage = compact( userConfigMessagesMerged.map((m: RetrieveMessageItem) => { return extractWebSocketContent(m.data, m.hash); }) ); const allDecryptedConfigMessages: Array> = []; for (let index = 0; index < extractedUserConfigMessage.length; index++) { const userConfigMessage = extractedUserConfigMessage[index]; try { const envelope: EnvelopePlus = SignalService.Envelope.decode(userConfigMessage.body) as any; const decryptedEnvelope = await decryptEnvelopeWithOurKey(envelope); if (!decryptedEnvelope?.byteLength) { continue; } const content = SignalService.Content.decode(new Uint8Array(decryptedEnvelope)); if (content.sharedConfigMessage) { const asIncomingMsg: IncomingMessage = { envelopeTimestamp: toNumber(envelope.timestamp), message: content.sharedConfigMessage, messageHash: userConfigMessage.messageHash, authorOrGroupPubkey: envelope.source, authorInGroup: envelope.senderIdentity, }; allDecryptedConfigMessages.push(asIncomingMsg); } else { throw new Error( 'received a message to a namespace reserved for user config but not containign a sharedConfigMessage' ); } } catch (e) { window.log.warn( `failed to decrypt message with hash "${userConfigMessage.messageHash}": ${e.message}` ); } } try { window.log.info( `handleConfigMessagesViaLibSession of "${allDecryptedConfigMessages.length}" messages with libsession` ); await ConfigMessageHandler.handleConfigMessagesViaLibSession(allDecryptedConfigMessages); } catch (e) { const allMessageHases = allDecryptedConfigMessages.map(m => m.messageHash).join(','); window.log.warn( `failed to handle messages hashes "${allMessageHases}" with libsession. Error: "${e.message}"` ); } } // Fetches messages for `pubkey` from `node` potentially updating // the lash hash record private async pollNodeForKey( node: Snode, pubkey: PubKey, namespaces: Array ): Promise { const namespaceLength = namespaces.length; if (namespaceLength <= 0) { throw new Error(`invalid number of retrieve namespace provided: ${namespaceLength}`); } const edkey = node.pubkey_ed25519; const pkStr = pubkey.key; try { return await pRetry( async () => { const prevHashes = await Promise.all( namespaces.map(namespace => this.getLastHash(edkey, pkStr, namespace)) ); const results = await SnodeAPIRetrieve.retrieveNextMessages( node, prevHashes, pkStr, namespaces, UserUtils.getOurPubKeyStrFromCache() ); if (!results.length) { return []; } if (results.length !== namespaceLength) { window.log.error( `pollNodeForKey asked for ${namespaceLength} namespaces but received only messages about ${results.length} namespaces` ); throw new Error( `pollNodeForKey asked for ${namespaceLength} namespaces but received only messages about ${results.length} namespaces` ); } const lastMessages = results.map(r => { return last(r.messages.messages); }); await Promise.all( lastMessages.map(async (lastMessage, index) => { if (!lastMessage) { return; } return this.updateLastHash({ edkey: edkey, pubkey, namespace: namespaces[index], hash: lastMessage.hash, expiration: lastMessage.expiration, }); }) ); return results; }, { minTimeout: 100, retries: 1, onFailedAttempt: e => { window?.log?.warn( `retrieveNextMessages attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... ${e.name}` ); }, } ); } catch (e) { if (e.message === ERROR_CODE_NO_CONNECT) { if (window.inboxStore?.getState().onionPaths.isOnline) { window.inboxStore?.dispatch(updateIsOnline(false)); } } else { if (!window.inboxStore?.getState().onionPaths.isOnline) { window.inboxStore?.dispatch(updateIsOnline(true)); } } window?.log?.info('pollNodeForKey failed with:', e.message); return null; } } private loadGroupIds() { const convos = getConversationController().getConversations(); const closedGroupsOnly = convos.filter( (c: ConversationModel) => (c.isMediumGroup() || PubKey.isClosedGroupV3(c.id)) && !c.isBlocked() && !c.get('isKickedFromGroup') && !c.get('left') ); closedGroupsOnly.forEach((c: any) => { this.addGroupId(new PubKey(c.id)); }); } private async handleSeenMessages( messages: Array ): Promise> { if (!messages.length) { return []; } const incomingHashes = messages.map((m: RetrieveMessageItem) => m.hash); const dupHashes = await Data.getSeenMessagesByHashList(incomingHashes); const newMessages = messages.filter((m: RetrieveMessageItem) => !dupHashes.includes(m.hash)); if (newMessages.length) { const newHashes = newMessages.map((m: RetrieveMessageItem) => ({ expiresAt: m.expiration, hash: m.hash, })); await Data.saveSeenMessageHashes(newHashes); } return newMessages; } private getUserNamespacesPolled() { return window.sessionFeatureFlags.useSharedUtilForUserConfig ? [ SnodeNamespaces.UserMessages, SnodeNamespaces.UserProfile, SnodeNamespaces.UserContacts, SnodeNamespaces.UserGroups, ] : [SnodeNamespaces.UserMessages]; } private async updateLastHash({ edkey, expiration, hash, namespace, pubkey, }: { edkey: string; pubkey: PubKey; namespace: number; hash: string; expiration: number; }): Promise { const pkStr = pubkey.key; const cached = await this.getLastHash(edkey, pubkey.key, namespace); if (!cached || cached !== hash) { await Data.updateLastHash({ convoId: pkStr, snode: edkey, hash, expiresAt: expiration, namespace, }); } if (!this.lastHashes[edkey]) { this.lastHashes[edkey] = {}; } if (!this.lastHashes[edkey][pkStr]) { this.lastHashes[edkey][pkStr] = {}; } this.lastHashes[edkey][pkStr][namespace] = hash; } private async getLastHash(nodeEdKey: string, pubkey: string, namespace: number): Promise { if (!this.lastHashes[nodeEdKey]?.[pubkey]?.[namespace]) { const lastHash = await Data.getLastHashBySnode(pubkey, nodeEdKey, namespace); if (!this.lastHashes[nodeEdKey]) { this.lastHashes[nodeEdKey] = {}; } if (!this.lastHashes[nodeEdKey][pubkey]) { this.lastHashes[nodeEdKey][pubkey] = {}; } this.lastHashes[nodeEdKey][pubkey][namespace] = lastHash || ''; return this.lastHashes[nodeEdKey][pubkey][namespace]; } // return the cached value return this.lastHashes[nodeEdKey][pubkey][namespace]; } }