You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
246 lines
6.8 KiB
TypeScript
246 lines
6.8 KiB
TypeScript
import { PubKey } from '../types';
|
|
import { getSwarmFor } from './snodePool';
|
|
import { retrieveNextMessages } from './SNodeAPI';
|
|
import { SignalService } from '../../protobuf';
|
|
import * as Receiver from '../../receiver/receiver';
|
|
import _ from 'lodash';
|
|
import {
|
|
getLastHashBySnode,
|
|
getSeenMessagesByHashList,
|
|
saveSeenMessageHashes,
|
|
Snode,
|
|
updateLastHash,
|
|
} from '../../../ts/data/data';
|
|
|
|
import { StringUtils } from '../../session/utils';
|
|
import { getConversationController } from '../conversations';
|
|
import { ConversationModel } from '../../models/conversation';
|
|
|
|
type PubkeyToHash = { [key: string]: string };
|
|
|
|
interface Message {
|
|
hash: string;
|
|
expiration: number;
|
|
data: string;
|
|
}
|
|
|
|
// Some websocket nonsense
|
|
export function processMessage(message: string, options: any = {}) {
|
|
try {
|
|
const dataPlaintext = new Uint8Array(StringUtils.encode(message, 'base64'));
|
|
const messageBuf = SignalService.WebSocketMessage.decode(dataPlaintext);
|
|
if (messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST) {
|
|
Receiver.handleRequest(messageBuf.request?.body, options);
|
|
}
|
|
} catch (error) {
|
|
const info = {
|
|
message,
|
|
error: error.message,
|
|
};
|
|
window?.log?.warn('HTTP-Resources Failed to handle message:', info);
|
|
}
|
|
}
|
|
|
|
let instance: SwarmPolling | undefined;
|
|
export const getSwarmPollingInstance = () => {
|
|
if (!instance) {
|
|
instance = new SwarmPolling();
|
|
}
|
|
return instance;
|
|
};
|
|
|
|
export class SwarmPolling {
|
|
private pubkeys: Array<PubKey>;
|
|
private groupPubkeys: Array<PubKey>;
|
|
private readonly lastHashes: { [key: string]: PubkeyToHash };
|
|
|
|
constructor() {
|
|
this.pubkeys = [];
|
|
this.groupPubkeys = [];
|
|
this.lastHashes = {};
|
|
}
|
|
|
|
public start(): void {
|
|
this.loadGroupIds();
|
|
void this.pollForAllKeys();
|
|
}
|
|
|
|
public addGroupId(pubkey: PubKey) {
|
|
if (this.groupPubkeys.findIndex(m => m.key === pubkey.key) === -1) {
|
|
window?.log?.info('Swarm addGroupId: adding pubkey to polling', pubkey.key);
|
|
this.groupPubkeys.push(pubkey);
|
|
}
|
|
}
|
|
|
|
public addPubkey(pk: PubKey | string) {
|
|
const pubkey = PubKey.cast(pk);
|
|
if (this.pubkeys.findIndex(m => m.key === pubkey.key) === -1) {
|
|
this.pubkeys.push(pubkey);
|
|
}
|
|
}
|
|
|
|
public removePubkey(pk: PubKey | string) {
|
|
const pubkey = PubKey.cast(pk);
|
|
window?.log?.info('Swarm removePubkey: removing pubkey from polling', pubkey.key);
|
|
|
|
this.pubkeys = this.pubkeys.filter(key => !pubkey.isEqual(key));
|
|
this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key));
|
|
}
|
|
|
|
protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
|
|
// NOTE: sometimes pubkey is string, sometimes it is object, so
|
|
// accept both until this is fixed:
|
|
const pkStr = pubkey.key;
|
|
|
|
const snodes = await getSwarmFor(pkStr);
|
|
|
|
// Select nodes for which we already have lastHashes
|
|
const alreadyPolled = snodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
|
|
|
|
// If we need more nodes, select randomly from the remaining nodes:
|
|
|
|
// Use 1 node for now:
|
|
const COUNT = 1;
|
|
|
|
let nodesToPoll = _.sampleSize(alreadyPolled, COUNT);
|
|
|
|
if (nodesToPoll.length < COUNT) {
|
|
const notPolled = _.difference(snodes, alreadyPolled);
|
|
|
|
const newNeeded = COUNT - alreadyPolled.length;
|
|
|
|
const newNodes = _.sampleSize(notPolled, newNeeded);
|
|
|
|
nodesToPoll = _.concat(nodesToPoll, newNodes);
|
|
}
|
|
|
|
const results = await Promise.all(
|
|
nodesToPoll.map(async (n: Snode) => {
|
|
return this.pollNodeForKey(n, pubkey);
|
|
})
|
|
);
|
|
|
|
// Merge results into one list of unique messages
|
|
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
|
|
|
|
const newMessages = await this.handleSeenMessages(messages);
|
|
|
|
newMessages.forEach((m: Message) => {
|
|
const options = isGroup ? { conversationId: pkStr } : {};
|
|
processMessage(m.data, options);
|
|
});
|
|
}
|
|
|
|
// Fetches messages for `pubkey` from `node` potentially updating
|
|
// the lash hash record
|
|
protected async pollNodeForKey(node: Snode, pubkey: PubKey): Promise<Array<any>> {
|
|
const edkey = node.pubkey_ed25519;
|
|
|
|
const pkStr = pubkey.key;
|
|
|
|
const prevHash = await this.getLastHash(edkey, pkStr);
|
|
|
|
const messages = await retrieveNextMessages(node, prevHash, pkStr);
|
|
|
|
if (!messages.length) {
|
|
return [];
|
|
}
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
await this.updateLastHash(edkey, pubkey, lastMessage.hash, lastMessage.expiration);
|
|
|
|
return messages;
|
|
}
|
|
|
|
private loadGroupIds() {
|
|
// Start polling for medium size groups as well (they might be in different swarms)
|
|
const convos = getConversationController().getConversations();
|
|
|
|
const mediumGroupsOnly = convos.filter(
|
|
(c: ConversationModel) =>
|
|
c.isMediumGroup() && !c.isBlocked() && !c.get('isKickedFromGroup') && !c.get('left')
|
|
);
|
|
|
|
mediumGroupsOnly.forEach((c: any) => {
|
|
this.addGroupId(new PubKey(c.id));
|
|
// TODO: unsubscribe if the group is deleted
|
|
});
|
|
}
|
|
|
|
private async handleSeenMessages(messages: Array<Message>): Promise<Array<Message>> {
|
|
if (!messages.length) {
|
|
return [];
|
|
}
|
|
|
|
const incomingHashes = messages.map((m: Message) => m.hash);
|
|
|
|
const dupHashes = await getSeenMessagesByHashList(incomingHashes);
|
|
const newMessages = messages.filter((m: Message) => !dupHashes.includes(m.hash));
|
|
|
|
if (newMessages.length) {
|
|
const newHashes = newMessages.map((m: Message) => ({
|
|
expiresAt: m.expiration,
|
|
hash: m.hash,
|
|
}));
|
|
await saveSeenMessageHashes(newHashes);
|
|
}
|
|
return newMessages;
|
|
}
|
|
|
|
private async pollForAllKeys() {
|
|
const directPromises = this.pubkeys.map(async pk => {
|
|
return this.pollOnceForKey(pk, false);
|
|
});
|
|
|
|
const groupPromises = this.groupPubkeys.map(async pk => {
|
|
return this.pollOnceForKey(pk, true);
|
|
});
|
|
try {
|
|
await Promise.all(_.concat(directPromises, groupPromises));
|
|
} catch (e) {
|
|
window?.log?.warn('pollForAllKeys swallowing exception: ', e);
|
|
throw e;
|
|
} finally {
|
|
setTimeout(this.pollForAllKeys.bind(this), 2000);
|
|
}
|
|
}
|
|
|
|
private async updateLastHash(
|
|
edkey: string,
|
|
pubkey: PubKey,
|
|
hash: string,
|
|
expiration: number
|
|
): Promise<void> {
|
|
const pkStr = pubkey.key;
|
|
|
|
await updateLastHash({
|
|
convoId: pkStr,
|
|
snode: edkey,
|
|
hash,
|
|
expiresAt: expiration,
|
|
});
|
|
|
|
if (!this.lastHashes[edkey]) {
|
|
this.lastHashes[edkey] = {};
|
|
}
|
|
|
|
this.lastHashes[edkey][pkStr] = hash;
|
|
}
|
|
|
|
private async getLastHash(nodeEdKey: string, pubkey: string): Promise<string> {
|
|
// TODO: always retrieve from the database?
|
|
|
|
const nodeRecords = this.lastHashes[nodeEdKey];
|
|
|
|
if (!nodeRecords || !nodeRecords[pubkey]) {
|
|
const lastHash = await getLastHashBySnode(pubkey, nodeEdKey);
|
|
|
|
return lastHash || '';
|
|
} else {
|
|
// Don't need to go to the database every time:
|
|
return nodeRecords[pubkey];
|
|
}
|
|
}
|
|
}
|