|
|
|
@ -65,6 +65,82 @@ export class SwarmPolling {
|
|
|
|
|
this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
|
|
|
|
|
// NOTE: sometimes pubkey is string, sometimes it is object, so
|
|
|
|
|
// accept both until this is fixed:
|
|
|
|
|
const pk = (pubkey.key ? pubkey.key : pubkey) as string;
|
|
|
|
|
|
|
|
|
|
const snodes = await getSnodesFor(pk);
|
|
|
|
|
|
|
|
|
|
// Select nodes for which we already have lastHashes
|
|
|
|
|
const alreadyPolled = snodes.filter(
|
|
|
|
|
(n: Snode) => this.lastHashes[n.pubkey_ed25519]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// If we need more nodes, select randomly from the remaining nodes:
|
|
|
|
|
|
|
|
|
|
// Use 1 node for now:
|
|
|
|
|
const COUNT = 1;
|
|
|
|
|
|
|
|
|
|
let nodesToPoll = _.sampleSize(alreadyPolled, COUNT);
|
|
|
|
|
|
|
|
|
|
if (nodesToPoll.length < COUNT) {
|
|
|
|
|
const notPolled = _.difference(snodes, alreadyPolled);
|
|
|
|
|
|
|
|
|
|
const newNeeded = COUNT - alreadyPolled.length;
|
|
|
|
|
|
|
|
|
|
const newNodes = _.sampleSize(notPolled, newNeeded);
|
|
|
|
|
|
|
|
|
|
nodesToPoll = _.concat(nodesToPoll, newNodes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const results = await Promise.all(
|
|
|
|
|
nodesToPoll.map(async (n: Snode) => {
|
|
|
|
|
return this.pollNodeForKey(n, pubkey);
|
|
|
|
|
})
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Merge results into one list of unique messages
|
|
|
|
|
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
|
|
|
|
|
|
|
|
|
|
const newMessages = await this.handleSeenMessages(messages);
|
|
|
|
|
|
|
|
|
|
newMessages.forEach((m: Message) => {
|
|
|
|
|
const options = isGroup ? { conversationId: pk } : {};
|
|
|
|
|
processMessage(m.data, options);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Fetches messages for `pubkey` from `node` potentially updating
|
|
|
|
|
// the lash hash record
|
|
|
|
|
protected async pollNodeForKey(
|
|
|
|
|
node: Snode,
|
|
|
|
|
pubkey: PubKey
|
|
|
|
|
): Promise<Array<any>> {
|
|
|
|
|
const edkey = node.pubkey_ed25519;
|
|
|
|
|
|
|
|
|
|
const pkStr = pubkey.key ? pubkey.key : pubkey;
|
|
|
|
|
|
|
|
|
|
const prevHash = await this.getLastHash(edkey, pkStr as string);
|
|
|
|
|
|
|
|
|
|
const messages = await retrieveNextMessages(node, prevHash, pubkey);
|
|
|
|
|
|
|
|
|
|
if (!messages.length) {
|
|
|
|
|
return [];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
|
|
|
|
|
|
this.updateLastHash(
|
|
|
|
|
edkey,
|
|
|
|
|
pubkey,
|
|
|
|
|
lastMessage.hash,
|
|
|
|
|
lastMessage.expiration
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
return messages;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private loadGroupIds() {
|
|
|
|
|
// Start polling for medium size groups as well (they might be in different swarms)
|
|
|
|
|
const convos = window
|
|
|
|
@ -103,11 +179,11 @@ export class SwarmPolling {
|
|
|
|
|
|
|
|
|
|
private async pollForAllKeys() {
|
|
|
|
|
const directPromises = this.pubkeys.map(async pk => {
|
|
|
|
|
return this.pollOnceForKey(pk);
|
|
|
|
|
return this.pollOnceForKey(pk, false);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const groupPromises = this.groupPubkeys.map(async pk => {
|
|
|
|
|
return this.pollOnceForKey(pk);
|
|
|
|
|
return this.pollOnceForKey(pk, true);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await Promise.all(_.concat(directPromises, groupPromises));
|
|
|
|
@ -154,81 +230,4 @@ export class SwarmPolling {
|
|
|
|
|
return nodeRecords[pubkey];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Fetches messages for `pubkey` from `node` potentially updating
|
|
|
|
|
// the lash hash record
|
|
|
|
|
private async pollNodeForKey(
|
|
|
|
|
node: Snode,
|
|
|
|
|
pubkey: PubKey
|
|
|
|
|
): Promise<Array<any>> {
|
|
|
|
|
const edkey = node.pubkey_ed25519;
|
|
|
|
|
|
|
|
|
|
const pkStr = pubkey.key ? pubkey.key : pubkey;
|
|
|
|
|
|
|
|
|
|
const prevHash = await this.getLastHash(edkey, pkStr as string);
|
|
|
|
|
|
|
|
|
|
const messages = await retrieveNextMessages(node, prevHash, pubkey);
|
|
|
|
|
|
|
|
|
|
if (!messages.length) {
|
|
|
|
|
return [];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
|
|
|
|
|
|
this.updateLastHash(
|
|
|
|
|
edkey,
|
|
|
|
|
pubkey,
|
|
|
|
|
lastMessage.hash,
|
|
|
|
|
lastMessage.expiration
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
return messages;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async pollOnceForKey(pubkey: PubKey) {
|
|
|
|
|
// NOTE: sometimes pubkey is string, sometimes it is object, so
|
|
|
|
|
// accept both until this is fixed:
|
|
|
|
|
const pk = pubkey.key ? pubkey.key : pubkey;
|
|
|
|
|
|
|
|
|
|
const snodes = await getSnodesFor(pk as string);
|
|
|
|
|
|
|
|
|
|
// Select nodes for which we already have lastHashes
|
|
|
|
|
const alreadyPolled = snodes.filter(
|
|
|
|
|
(n: Snode) => this.lastHashes[n.pubkey_ed25519]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// If we need more nodes, select randomly from the remaining nodes:
|
|
|
|
|
|
|
|
|
|
// Use 1 node for now:
|
|
|
|
|
const COUNT = 1;
|
|
|
|
|
|
|
|
|
|
let nodesToPoll = _.sampleSize(alreadyPolled, COUNT);
|
|
|
|
|
|
|
|
|
|
if (nodesToPoll.length < COUNT) {
|
|
|
|
|
const notPolled = _.difference(snodes, alreadyPolled);
|
|
|
|
|
|
|
|
|
|
const newNeeded = COUNT - alreadyPolled.length;
|
|
|
|
|
|
|
|
|
|
const newNodes = _.sampleSize(notPolled, newNeeded);
|
|
|
|
|
|
|
|
|
|
nodesToPoll = _.concat(nodesToPoll, newNodes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const results = await Promise.all(
|
|
|
|
|
nodesToPoll.map(async (n: Snode) => {
|
|
|
|
|
return this.pollNodeForKey(n, pubkey);
|
|
|
|
|
})
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Merge results into one list of unique messages
|
|
|
|
|
const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash);
|
|
|
|
|
|
|
|
|
|
const newMessages = await this.handleSeenMessages(messages);
|
|
|
|
|
|
|
|
|
|
newMessages.forEach((m: Message) => {
|
|
|
|
|
processMessage(m.data, { conversationId: pubkey.key });
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// TODO: `onMessages`
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|