You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
189 lines
7.4 KiB
TypeScript
189 lines
7.4 KiB
TypeScript
import { isArray, omit } from 'lodash';
|
|
import { Snode } from '../../../data/types';
|
|
import { updateIsOnline } from '../../../state/ducks/onion';
|
|
import { doSnodeBatchRequest } from './batchRequest';
|
|
import { GetNetworkTime } from './getNetworkTime';
|
|
import { SnodeNamespace, SnodeNamespaces } from './namespaces';
|
|
|
|
import { DURATION, TTL_DEFAULT } from '../../constants';
|
|
import { UserUtils } from '../../utils';
|
|
import { sleepFor } from '../../utils/Promise';
|
|
import { SnodeResponseError } from '../../utils/errors';
|
|
import {
|
|
RetrieveLegacyClosedGroupSubRequestType,
|
|
RetrieveSubRequestType,
|
|
UpdateExpiryOnNodeSubRequest,
|
|
} from './SnodeRequestTypes';
|
|
import { SnodeSignature } from './snodeSignatures';
|
|
import { RetrieveMessagesResultsBatched, RetrieveMessagesResultsContent } from './types';
|
|
|
|
async function buildRetrieveRequest(
|
|
lastHashes: Array<string>,
|
|
pubkey: string,
|
|
namespaces: Array<SnodeNamespaces>,
|
|
ourPubkey: string,
|
|
configHashesToBump: Array<string> | null
|
|
): Promise<Array<RetrieveSubRequestType>> {
|
|
const maxSizeMap = SnodeNamespace.maxSizeMap(namespaces);
|
|
const retrieveRequestsParams: Array<RetrieveSubRequestType> = await Promise.all(
|
|
namespaces.map(async (namespace, index) => {
|
|
const foundMaxSize = maxSizeMap.find(m => m.namespace === namespace)?.maxSize;
|
|
const retrieveParam = {
|
|
pubkey,
|
|
last_hash: lastHashes.at(index) || '',
|
|
namespace,
|
|
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
|
|
max_size: foundMaxSize,
|
|
};
|
|
|
|
if (namespace === SnodeNamespaces.ClosedGroupMessage) {
|
|
if (pubkey === ourPubkey || !pubkey.startsWith('05')) {
|
|
throw new Error(
|
|
'namespace -10 can only be used to retrieve messages from a legacy closed group (prefix 05)'
|
|
);
|
|
}
|
|
const retrieveLegacyClosedGroup = {
|
|
...retrieveParam,
|
|
namespace,
|
|
};
|
|
const retrieveParamsLegacy: RetrieveLegacyClosedGroupSubRequestType = {
|
|
method: 'retrieve',
|
|
params: omit(retrieveLegacyClosedGroup, 'timestamp'), // if we give a timestamp, a signature will be required by the service node, and we don't want to provide one as this is an unauthenticated namespace
|
|
};
|
|
|
|
return retrieveParamsLegacy;
|
|
}
|
|
|
|
// all legacy closed group retrieves are unauthenticated and run above.
|
|
// if we get here, this can only be a retrieve for our own swarm, which must be authenticated
|
|
if (
|
|
!SnodeNamespace.isUserConfigNamespace(namespace) &&
|
|
namespace !== SnodeNamespaces.UserMessages
|
|
) {
|
|
throw new Error(`not a legacy closed group. namespace can only be 0 and was ${namespace}`);
|
|
}
|
|
if (pubkey !== ourPubkey) {
|
|
throw new Error('not a legacy closed group. pubkey can only be ours');
|
|
}
|
|
const signatureArgs = { ...retrieveParam, method: 'retrieve' as const, ourPubkey };
|
|
const signatureBuilt = await SnodeSignature.getSnodeSignatureParams(signatureArgs);
|
|
const retrieve: RetrieveSubRequestType = {
|
|
method: 'retrieve',
|
|
params: { ...retrieveParam, ...signatureBuilt },
|
|
};
|
|
return retrieve;
|
|
})
|
|
);
|
|
|
|
if (configHashesToBump?.length) {
|
|
const expiry = GetNetworkTime.getNowWithNetworkOffset() + TTL_DEFAULT.CONFIG_MESSAGE;
|
|
const signResult = await SnodeSignature.generateUpdateExpirySignature({
|
|
shortenOrExtend: '',
|
|
timestamp: expiry,
|
|
messageHashes: configHashesToBump,
|
|
});
|
|
if (!signResult) {
|
|
window.log.warn(
|
|
`SnodeSignature.generateUpdateExpirySignature returned result empty for hashes ${configHashesToBump}`
|
|
);
|
|
} else {
|
|
const expireParams: UpdateExpiryOnNodeSubRequest = {
|
|
method: 'expire',
|
|
params: {
|
|
messages: configHashesToBump,
|
|
pubkey: UserUtils.getOurPubKeyStrFromCache(),
|
|
expiry,
|
|
signature: signResult.signature,
|
|
pubkey_ed25519: signResult.pubkey_ed25519,
|
|
},
|
|
};
|
|
|
|
retrieveRequestsParams.push(expireParams);
|
|
}
|
|
}
|
|
return retrieveRequestsParams;
|
|
}
|
|
|
|
async function retrieveNextMessages(
|
|
targetNode: Snode,
|
|
lastHashes: Array<string>,
|
|
associatedWith: string,
|
|
namespaces: Array<SnodeNamespaces>,
|
|
ourPubkey: string,
|
|
configHashesToBump: Array<string> | null
|
|
): Promise<RetrieveMessagesResultsBatched> {
|
|
if (namespaces.length !== lastHashes.length) {
|
|
throw new Error('namespaces and last hashes do not match');
|
|
}
|
|
|
|
const retrieveRequestsParams = await buildRetrieveRequest(
|
|
lastHashes,
|
|
associatedWith,
|
|
namespaces,
|
|
ourPubkey,
|
|
configHashesToBump
|
|
);
|
|
// let exceptions bubble up
|
|
// no retry for this one as this a call we do every few seconds while polling for messages
|
|
const timeOutMs = 10 * DURATION.SECONDS; // yes this is a long timeout for just messages, but 4s timeouts way to often...
|
|
const timeoutPromise = async () => sleepFor(timeOutMs);
|
|
const fetchPromise = async () =>
|
|
doSnodeBatchRequest(retrieveRequestsParams, targetNode, timeOutMs, associatedWith);
|
|
|
|
// just to make sure that we don't hang for more than timeOutMs
|
|
const results = await Promise.race([timeoutPromise(), fetchPromise()]);
|
|
try {
|
|
if (!results || !isArray(results) || !results.length) {
|
|
window?.log?.warn(
|
|
`_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`
|
|
);
|
|
throw new SnodeResponseError(
|
|
`_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`
|
|
);
|
|
}
|
|
|
|
// the +1 is to take care of the extra `expire` method added once user config is released
|
|
if (results.length !== namespaces.length && results.length !== namespaces.length + 1) {
|
|
throw new Error(
|
|
`We asked for updates about ${namespaces.length} messages but got results of length ${results.length}`
|
|
);
|
|
}
|
|
|
|
// do a basic check to know if we have something kind of looking right (status 200 should always be there for a retrieve)
|
|
const firstResult = results[0];
|
|
|
|
if (firstResult.code !== 200) {
|
|
window?.log?.warn(`_retrieveNextMessages result is not 200 but ${firstResult.code}`);
|
|
throw new Error(
|
|
`_retrieveNextMessages - retrieve result is not 200 with ${targetNode.ip}:${targetNode.port} but ${firstResult.code}`
|
|
);
|
|
}
|
|
|
|
// we rely on the code of the first one to check for online status
|
|
const bodyFirstResult = firstResult.body;
|
|
if (!window.inboxStore?.getState().onionPaths.isOnline) {
|
|
window.inboxStore?.dispatch(updateIsOnline(true));
|
|
}
|
|
|
|
GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t);
|
|
|
|
// NOTE: We don't want to sort messages here because the ordering depends on the snode and when it received each message.
|
|
// The last_hash for that snode has to be the last one we've received from that same snode, othwerwise we end up fetching the same messages over and over again.
|
|
return results.map((result, index) => ({
|
|
code: result.code,
|
|
messages: result.body as RetrieveMessagesResultsContent,
|
|
namespace: namespaces[index],
|
|
}));
|
|
} catch (e) {
|
|
window?.log?.warn('exception while parsing json of nextMessage:', e);
|
|
if (!window.inboxStore?.getState().onionPaths.isOnline) {
|
|
window.inboxStore?.dispatch(updateIsOnline(true));
|
|
}
|
|
throw new Error(
|
|
`_retrieveNextMessages - exception while parsing json of nextMessage ${targetNode.ip}:${targetNode.port}: ${e?.message}`
|
|
);
|
|
}
|
|
}
|
|
|
|
export const SnodeAPIRetrieve = { retrieveNextMessages };
|