fix: retrieve from 2 snodes on every call

to take care of a snode being out of sync
pull/3281/head
Audric Ackermann 3 months ago
parent 3e9c1e5060
commit 060175004f
No known key found for this signature in database

@ -13,6 +13,7 @@ import {
last,
omit,
sample,
sampleSize,
toNumber,
uniqBy,
} from 'lodash';
@ -53,12 +54,19 @@ import {
RetrieveMessageItem,
RetrieveMessageItemWithNamespace,
RetrieveMessagesResultsBatched,
RetrieveRequestResult,
type RetrieveMessagesResultsMergedBatched,
} from './types';
import { ConversationTypeEnum } from '../../../models/types';
import { Snode } from '../../../data/types';
const minMsgCountShouldRetry = 95;
/**
* We retrieve from multiple snodes at the same time, and merge their reported messages because it's easy
* for a snode to be out of sync.
* Sometimes, being out of sync means that we won't be able to retrieve a message at all (revoked_subaccount).
* We need a proper fix server side, but in the meantime, that's all we can do.
*/
const RETRIEVE_SNODES_COUNT = 2;
function extractWebSocketContent(
message: string,
@ -105,6 +113,33 @@ function entryToKey(entry: GroupPollingEntry) {
return entry.pubkey.key;
}
function mergeMultipleRetrieveResults(
results: RetrieveMessagesResultsBatched
): RetrieveMessagesResultsMergedBatched {
const mapped: Map<SnodeNamespaces, Map<string, RetrieveMessageItem>> = new Map();
for (let resultIndex = 0; resultIndex < results.length; resultIndex++) {
const result = results[resultIndex];
if (!mapped.has(result.namespace)) {
mapped.set(result.namespace, new Map());
}
if (result.messages.messages) {
for (let msgIndex = 0; msgIndex < result.messages.messages.length; msgIndex++) {
const msg = result.messages.messages[msgIndex];
if (!mapped.get(result.namespace)!.has(msg.hash)) {
mapped.get(result.namespace)!.set(msg.hash, msg);
}
}
}
}
// Convert the merged map back to an array
return Array.from(mapped.entries()).map(([namespace, messagesMap]) => ({
code: 200, // Assuming success code, adjust as needed
namespace,
messages: { messages: Array.from(messagesMap.values()) },
}));
}
export class SwarmPolling {
private groupPolling: Array<GroupPollingEntry>;
@ -414,27 +449,40 @@ export class SwarmPolling {
public async pollOnceForKey([pubkey, type]: PollForUs | PollForLegacy | PollForGroup) {
const namespaces = this.getNamespacesToPollFrom(type);
const swarmSnodes = await SnodePool.getSwarmFor(pubkey);
let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null;
let resultsFromAllNamespaces: RetrieveMessagesResultsMergedBatched | null;
let toPollFrom: Snode | undefined;
let toPollFrom: Array<Snode> = [];
try {
toPollFrom = sample(swarmSnodes);
toPollFrom = sampleSize(swarmSnodes, RETRIEVE_SNODES_COUNT);
if (!toPollFrom) {
if (toPollFrom.length !== RETRIEVE_SNODES_COUNT) {
throw new Error(
`SwarmPolling: pollOnceForKey: no snode in swarm for ${ed25519Str(pubkey)}`
`SwarmPolling: pollOnceForKey: not snodes in swarm for ${ed25519Str(pubkey)}. Expected to have at least ${RETRIEVE_SNODES_COUNT}.`
);
}
// Note: always print something so we know if the polling is hanging
window.log.info(
`SwarmPolling: about to pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(toPollFrom.pubkey_ed25519)} namespaces: ${namespaces} `
);
resultsFromAllNamespaces = await this.pollNodeForKey(toPollFrom, pubkey, namespaces, type);
// Note: always print something so we know if the polling is hanging
const resultsFromAllSnodesSettled = await Promise.allSettled(
toPollFrom.map(async snode => {
// Note: always print something so we know if the polling is hanging
window.log.info(
`SwarmPolling: about to pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(snode.pubkey_ed25519)} namespaces: ${namespaces} `
);
const thisSnodeResults = await this.pollNodeForKey(snode, pubkey, namespaces, type);
// Note: always print something so we know if the polling is hanging
window.log.info(
`SwarmPolling: pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(snode.pubkey_ed25519)} namespaces: ${namespaces} returned: ${thisSnodeResults?.length}`
);
return thisSnodeResults;
})
);
window.log.info(
`SwarmPolling: pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(toPollFrom.pubkey_ed25519)} namespaces: ${namespaces} returned: ${resultsFromAllNamespaces?.length}`
`SwarmPolling: pollNodeForKey of ${ed25519Str(pubkey)} namespaces: ${namespaces} returned ${resultsFromAllSnodesSettled.filter(m => m.status === 'fulfilled')}/${RETRIEVE_SNODES_COUNT} fulfilled promises`
);
resultsFromAllNamespaces = mergeMultipleRetrieveResults(
compact(
resultsFromAllSnodesSettled.filter(m => m.status === 'fulfilled').flatMap(m => m.value)
)
);
} catch (e) {
window.log.warn(
@ -490,7 +538,7 @@ export class SwarmPolling {
const newMessages = await this.handleSeenMessages(uniqOtherMsgs);
window.log.info(
`SwarmPolling: handleSeenMessages: ${newMessages.length} out of ${uniqOtherMsgs.length} are not seen yet about pk:${ed25519Str(pubkey)} snode: ${toPollFrom ? ed25519Str(toPollFrom.pubkey_ed25519) : 'undefined'}`
`SwarmPolling: handleSeenMessages: ${newMessages.length} out of ${uniqOtherMsgs.length} are not seen yet about pk:${ed25519Str(pubkey)} snode: ${JSON.stringify(toPollFrom.map(m => ed25519Str(m.pubkey_ed25519)))}`
);
if (type === ConversationTypeEnum.GROUPV2) {
if (!PubKey.is03Pubkey(pubkey)) {
@ -975,7 +1023,7 @@ const retrieveItemSchema = z.object({
});
function retrieveItemWithNamespace(
results: Array<RetrieveRequestResult>
results: RetrieveMessagesResultsMergedBatched
): Array<RetrieveMessageItemWithNamespace> {
return flatten(
compact(
@ -996,7 +1044,7 @@ function retrieveItemWithNamespace(
function filterMessagesPerTypeOfConvo<T extends ConversationTypeEnum>(
type: T,
retrieveResults: RetrieveMessagesResultsBatched
retrieveResults: RetrieveMessagesResultsMergedBatched
): {
confMessages: Array<RetrieveMessageItemWithNamespace> | null;
revokedMessages: Array<RetrieveMessageItemWithNamespace> | null;

@ -34,12 +34,21 @@ export type RetrieveMessagesResultsContent = {
t: number;
};
export type RetrieveRequestResult = {
type RetrieveMessagesResultsContentMerged = Pick<RetrieveMessagesResultsContent, 'messages'>;
type RetrieveRequestResult<
T extends RetrieveMessagesResultsContent | RetrieveMessagesResultsContentMerged,
> = {
code: number;
messages: RetrieveMessagesResultsContent;
messages: T;
namespace: SnodeNamespaces;
};
export type RetrieveMessagesResultsBatched = Array<RetrieveRequestResult>;
export type RetrieveMessagesResultsBatched = Array<
RetrieveRequestResult<RetrieveMessagesResultsContent>
>;
export type RetrieveMessagesResultsMergedBatched = Array<
RetrieveRequestResult<RetrieveMessagesResultsContentMerged>
>;
export type WithRevokeSubRequest = {
revokeSubRequest?: SubaccountRevokeSubRequest;

Loading…
Cancel
Save