import { PubKey } from '../types'; import { getSnodesFor, Snode } from './snodePool'; import { retrieveNextMessages } from './serviceNodeAPI'; import { SignalService } from '../../protobuf'; import * as Receiver from '../../receiver/receiver'; import _ from 'lodash'; import * as Data from '../../../js/modules/data'; import { StringUtils } from '../../session/utils'; type PubkeyToHash = { [key: string]: string }; interface Message { hash: string; expiration: number; data: string; } // Some websocket nonsence export function processMessage(message: string, options: any = {}) { try { const dataPlaintext = new Uint8Array(StringUtils.encode(message, 'base64')); const messageBuf = SignalService.WebSocketMessage.decode(dataPlaintext); if (messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST) { // tslint:disable-next-line no-floating-promises Receiver.handleRequest(messageBuf.request?.body, options); } } catch (error) { const info = { message, error: error.message, }; window.log.warn('HTTP-Resources Failed to handle message:', info); } } export class SwarmPolling { private pubkeys: Array; private groupPubkeys: Array; private readonly lastHashes: { [key: string]: PubkeyToHash }; constructor() { this.pubkeys = []; this.groupPubkeys = []; this.lastHashes = {}; } public async start(): Promise { this.loadGroupIds(); // tslint:disable: no-floating-promises this.pollForAllKeys(); } public addGroupId(pubkey: PubKey) { this.groupPubkeys.push(pubkey); } public addPubkey(pk: PubKey | string) { const pubkey = PubKey.cast(pk); this.pubkeys.push(pubkey); } public removePubkey(pk: PubKey | string) { const pubkey = PubKey.cast(pk); this.pubkeys = this.pubkeys.filter(key => !pubkey.isEqual(key)); this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key)); } protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) { // NOTE: sometimes pubkey is string, sometimes it is object, so // accept both until this is fixed: const pkStr = pubkey.key; const snodes = await getSnodesFor(pkStr); // Select nodes for which we already have lastHashes const alreadyPolled = snodes.filter( (n: Snode) => this.lastHashes[n.pubkey_ed25519] ); // If we need more nodes, select randomly from the remaining nodes: // Use 1 node for now: const COUNT = 1; let nodesToPoll = _.sampleSize(alreadyPolled, COUNT); if (nodesToPoll.length < COUNT) { const notPolled = _.difference(snodes, alreadyPolled); const newNeeded = COUNT - alreadyPolled.length; const newNodes = _.sampleSize(notPolled, newNeeded); nodesToPoll = _.concat(nodesToPoll, newNodes); } const results = await Promise.all( nodesToPoll.map(async (n: Snode) => { return this.pollNodeForKey(n, pubkey); }) ); // Merge results into one list of unique messages const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash); const newMessages = await this.handleSeenMessages(messages); newMessages.forEach((m: Message) => { const options = isGroup ? { conversationId: pkStr } : {}; processMessage(m.data, options); }); } // Fetches messages for `pubkey` from `node` potentially updating // the lash hash record protected async pollNodeForKey( node: Snode, pubkey: PubKey ): Promise> { const edkey = node.pubkey_ed25519; const pkStr = pubkey.key; const prevHash = await this.getLastHash(edkey, pkStr); const messages = await retrieveNextMessages(node, prevHash, pkStr); if (!messages.length) { return []; } const lastMessage = _.last(messages); this.updateLastHash( edkey, pubkey, lastMessage.hash, lastMessage.expiration ); return messages; } private loadGroupIds() { // Start polling for medium size groups as well (they might be in different swarms) const convos = window .getConversations() .filter((c: any) => c.isMediumGroup()); convos.forEach((c: any) => { this.addGroupId(new PubKey(c.id)); // TODO: unsubscribe if the group is deleted }); } private async handleSeenMessages( messages: Array ): Promise> { if (!messages.length) { return []; } const incomingHashes = messages.map((m: Message) => m.hash); const dupHashes = await Data.getSeenMessagesByHashList(incomingHashes); const newMessages = messages.filter( (m: Message) => !dupHashes.includes(m.hash) ); if (newMessages.length) { const newHashes = newMessages.map((m: Message) => ({ expiresAt: m.expiration, hash: m.hash, })); await Data.saveSeenMessageHashes(newHashes); } return newMessages; } private async pollForAllKeys() { const directPromises = this.pubkeys.map(async pk => { return this.pollOnceForKey(pk, false); }); const groupPromises = this.groupPubkeys.map(async pk => { return this.pollOnceForKey(pk, true); }); await Promise.all(_.concat(directPromises, groupPromises)); setTimeout(this.pollForAllKeys.bind(this), 2000); } private async updateLastHash( edkey: string, pubkey: PubKey, hash: string, expiration: number ): Promise { const pkStr = pubkey.key; await Data.updateLastHash({ convoId: pkStr, snode: edkey, hash, expiresAt: expiration, }); if (!this.lastHashes[edkey]) { this.lastHashes[edkey] = {}; } this.lastHashes[edkey][pkStr] = hash; } private async getLastHash( nodeEdKey: string, pubkey: string ): Promise { // TODO: always retrieve from the database? const nodeRecords = this.lastHashes[nodeEdKey]; if (!nodeRecords || !nodeRecords[pubkey]) { const lastHash = await Data.getLastHashBySnode(pubkey, nodeEdKey); return lastHash || ''; } else { // Don't need to go to the database every time: return nodeRecords[pubkey]; } } }