From a49a65c92b8075ebd09ab781b39f0c193047cf1d Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Thu, 13 Jun 2024 11:11:23 +1000 Subject: [PATCH] chore: cleaned up the batch sender --- .../conversations/unsendingInteractions.ts | 7 ++-- ts/models/conversation.ts | 1 - ts/session/apis/snode_api/SNodeAPI.ts | 42 ++++++++++++------- .../apis/snode_api/SnodeRequestTypes.ts | 2 +- .../DeleteGroupHashesRequestFactory.ts | 16 +++---- .../DeleteUserHashesRequestFactory.ts | 13 ++++++ .../conversations/ConversationController.ts | 2 +- ts/session/sending/MessageSender.ts | 16 ++++--- ts/session/sending/MessageSentHandler.ts | 34 ++++++++------- .../jobs/GroupPendingRemovalsJob.ts | 1 - .../utils/job_runners/jobs/GroupSyncJob.ts | 12 +++--- .../utils/job_runners/jobs/UserSyncJob.ts | 2 +- ts/state/ducks/metaGroups.ts | 5 --- 13 files changed, 87 insertions(+), 66 deletions(-) create mode 100644 ts/session/apis/snode_api/factories/DeleteUserHashesRequestFactory.ts diff --git a/ts/interactions/conversations/unsendingInteractions.ts b/ts/interactions/conversations/unsendingInteractions.ts index 875f63ba3..e51a3e3fd 100644 --- a/ts/interactions/conversations/unsendingInteractions.ts +++ b/ts/interactions/conversations/unsendingInteractions.ts @@ -189,7 +189,7 @@ export async function deleteMessagesFromSwarmOnly( ) { const deletionMessageHashes = isStringArray(messages) ? messages : getMessageHashes(messages); try { - if (messages.length === 0) { + if (isEmpty(messages)) { return false; } @@ -199,10 +199,11 @@ export async function deleteMessagesFromSwarmOnly( ); return false; } + const hashesAsSet = new Set(deletionMessageHashes); if (PubKey.is03Pubkey(pubkey)) { - return await SnodeAPI.networkDeleteMessagesForGroup(deletionMessageHashes, pubkey); + return await SnodeAPI.networkDeleteMessagesForGroup(hashesAsSet, pubkey); } - return await SnodeAPI.networkDeleteMessageOurSwarm(deletionMessageHashes, pubkey); + return await SnodeAPI.networkDeleteMessageOurSwarm(hashesAsSet, pubkey); } catch (e) { window.log?.error( `deleteMessagesFromSwarmOnly: Error deleting message from swarm of ${ed25519Str(pubkey)}, hashes: ${deletionMessageHashes}`, diff --git a/ts/models/conversation.ts b/ts/models/conversation.ts index 9d4ddd777..cbc93446f 100644 --- a/ts/models/conversation.ts +++ b/ts/models/conversation.ts @@ -1130,7 +1130,6 @@ export class ConversationModel extends Backbone.Model { await GroupSync.pushChangesToGroupSwarmIfNeeded({ groupPk: this.id, - deleteAllMessagesSubRequest: null, supplementalKeysSubRequest: [], extraStoreRequests, }); diff --git a/ts/session/apis/snode_api/SNodeAPI.ts b/ts/session/apis/snode_api/SNodeAPI.ts index 47d053a76..81b38464e 100644 --- a/ts/session/apis/snode_api/SNodeAPI.ts +++ b/ts/session/apis/snode_api/SNodeAPI.ts @@ -3,18 +3,16 @@ import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs'; import { compact, isEmpty } from 'lodash'; import pRetry from 'p-retry'; +import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface'; import { getSodiumRenderer } from '../../crypto'; import { PubKey } from '../../types'; -import { - DeleteAllFromUserNodeSubRequest, - DeleteHashesFromGroupNodeSubRequest, - DeleteHashesFromUserNodeSubRequest, -} from './SnodeRequestTypes'; -import { BatchRequests } from './batchRequest'; -import { SnodePool } from './snodePool'; import { StringUtils, UserUtils } from '../../utils'; import { ed25519Str, fromBase64ToArray, fromHexToArray } from '../../utils/String'; -import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface'; +import { DeleteAllFromUserNodeSubRequest } from './SnodeRequestTypes'; +import { BatchRequests } from './batchRequest'; +import { DeleteGroupHashesFactory } from './factories/DeleteGroupHashesRequestFactory'; +import { DeleteUserHashesFactory } from './factories/DeleteUserHashesRequestFactory'; +import { SnodePool } from './snodePool'; export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.'; @@ -158,14 +156,22 @@ const TEST_getMinTimeout = () => 500; * Note: legacy group did not support removing messages from the swarm. */ const networkDeleteMessageOurSwarm = async ( - messagesHashes: Array, + messagesHashes: Set, pubkey: PubkeyType ): Promise => { const sodium = await getSodiumRenderer(); if (!PubKey.is05Pubkey(pubkey) || pubkey !== UserUtils.getOurPubKeyStrFromCache()) { throw new Error('networkDeleteMessageOurSwarm with 05 pk can only for our own swarm'); } - const request = new DeleteHashesFromUserNodeSubRequest({ messagesHashes }); + if (isEmpty(messagesHashes)) { + window.log.info('networkDeleteMessageOurSwarm: messageHashes is empty'); + return true; + } + const messageHashesArr = [...messagesHashes]; + const request = DeleteUserHashesFactory.makeUserHashesToDeleteSubRequest({ messagesHashes }); + if (!request) { + throw new Error('makeUserHashesToDeleteSubRequest returned invalid subrequest'); + } try { const success = await pRetry( @@ -237,7 +243,7 @@ const networkDeleteMessageOurSwarm = async ( const responseHashes = snodeJson.deleted as Array; const signatureSnode = snodeJson.signature as string; // The signature looks like ( PUBKEY_HEX || RMSG[0] || ... || RMSG[N] || DMSG[0] || ... || DMSG[M] ) - const dataToVerify = `${request.pubkey}${messagesHashes.join( + const dataToVerify = `${request.pubkey}${messageHashesArr.join( '' )}${responseHashes.join('')}`; const dataToVerifyUtf8 = StringUtils.encode(dataToVerify, 'utf8'); @@ -292,7 +298,7 @@ const networkDeleteMessageOurSwarm = async ( * - if the request failed too many times */ const networkDeleteMessagesForGroup = async ( - messagesHashes: Array, + messagesHashes: Set, groupPk: GroupPubkeyType ): Promise => { if (!PubKey.is03Pubkey(groupPk)) { @@ -301,17 +307,21 @@ const networkDeleteMessagesForGroup = async ( const group = await UserGroupsWrapperActions.getGroup(groupPk); if (!group || !group.secretKey || isEmpty(group.secretKey)) { window.log.warn( - `networkDeleteMessagesForGroup: not deleting from swarm of 03-group ${messagesHashes.length} hashes as we do not the adminKey` + `networkDeleteMessagesForGroup: not deleting from swarm of 03-group ${messagesHashes.size} hashes as we do not the adminKey` ); return false; } try { - const request = new DeleteHashesFromGroupNodeSubRequest({ + const request = DeleteGroupHashesFactory.makeGroupHashesToDeleteSubRequest({ messagesHashes, - groupPk, - secretKey: group.secretKey, + group, }); + if (!request) { + throw new Error( + 'DeleteGroupHashesFactory.makeGroupHashesToDeleteSubRequest failed to build a request ' + ); + } await pRetry( async () => { diff --git a/ts/session/apis/snode_api/SnodeRequestTypes.ts b/ts/session/apis/snode_api/SnodeRequestTypes.ts index caf06623e..317fe1f55 100644 --- a/ts/session/apis/snode_api/SnodeRequestTypes.ts +++ b/ts/session/apis/snode_api/SnodeRequestTypes.ts @@ -39,7 +39,7 @@ abstract class SnodeAPISubRequest { * Retrieve for legacy was not authenticated */ export class RetrieveLegacyClosedGroupSubRequest extends SnodeAPISubRequest { - public method = 'retrieve' as const; + method = 'retrieve' as const; public readonly legacyGroupPk: PubkeyType; public readonly last_hash: string; public readonly max_size: number | undefined; diff --git a/ts/session/apis/snode_api/factories/DeleteGroupHashesRequestFactory.ts b/ts/session/apis/snode_api/factories/DeleteGroupHashesRequestFactory.ts index 9c2b92165..033db8c48 100644 --- a/ts/session/apis/snode_api/factories/DeleteGroupHashesRequestFactory.ts +++ b/ts/session/apis/snode_api/factories/DeleteGroupHashesRequestFactory.ts @@ -4,32 +4,32 @@ import { ed25519Str } from '../../../utils/String'; import { DeleteHashesFromGroupNodeSubRequest } from '../SnodeRequestTypes'; function makeGroupHashesToDeleteSubRequest({ - allOldHashes, + messagesHashes, group, }: { group: Pick; - allOldHashes: Set; + messagesHashes: Set; }) { const groupPk = group.pubkeyHex; - const allOldHashesArray = [...allOldHashes]; - if (allOldHashesArray.length) { + const messagesHashesArr = [...messagesHashes]; + if (messagesHashesArr.length) { if (!group.secretKey || isEmpty(group.secretKey)) { window.log.debug( - `makeGroupHashesToDeleteSubRequest: ${ed25519Str(groupPk)}: allOldHashesArray not empty but we do not have the secretKey` + `makeGroupHashesToDeleteSubRequest: ${ed25519Str(groupPk)}: messagesHashesArr not empty but we do not have the secretKey` ); throw new Error( - 'makeGroupHashesToDeleteSubRequest: allOldHashesArray not empty but we do not have the secretKey' + 'makeGroupHashesToDeleteSubRequest: messagesHashesArr not empty but we do not have the secretKey' ); } return new DeleteHashesFromGroupNodeSubRequest({ - messagesHashes: [...allOldHashes], + messagesHashes: messagesHashesArr, groupPk, secretKey: group.secretKey, }); } - return null; + return undefined; } export const DeleteGroupHashesFactory = { makeGroupHashesToDeleteSubRequest }; diff --git a/ts/session/apis/snode_api/factories/DeleteUserHashesRequestFactory.ts b/ts/session/apis/snode_api/factories/DeleteUserHashesRequestFactory.ts new file mode 100644 index 000000000..441638aa4 --- /dev/null +++ b/ts/session/apis/snode_api/factories/DeleteUserHashesRequestFactory.ts @@ -0,0 +1,13 @@ +import { DeleteHashesFromUserNodeSubRequest } from '../SnodeRequestTypes'; + +function makeUserHashesToDeleteSubRequest({ messagesHashes }: { messagesHashes: Set }) { + const messagesHashesArr = [...messagesHashes]; + if (messagesHashesArr.length) { + return new DeleteHashesFromUserNodeSubRequest({ + messagesHashes: messagesHashesArr, + }); + } + return undefined; +} + +export const DeleteUserHashesFactory = { makeUserHashesToDeleteSubRequest }; diff --git a/ts/session/conversations/ConversationController.ts b/ts/session/conversations/ConversationController.ts index 407c68384..a908490d9 100644 --- a/ts/session/conversations/ConversationController.ts +++ b/ts/session/conversations/ConversationController.ts @@ -321,7 +321,7 @@ class ConvoController { groupPk, secretKey, }) - : null; + : undefined; // this marks the group info as deleted. We need to push those details await MetaGroupWrapperActions.infoDestroy(groupPk); diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index b3eab85c8..a0c2c5b79 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -429,8 +429,8 @@ async function sendMessagesDataToSnode( deleteHashesSubRequest, deleteAllMessagesSubRequest, }: WithRevokeSubRequest & { - deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest | null; - deleteHashesSubRequest: DeleteHashesRequestPerPubkey | null; + deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest; + deleteHashesSubRequest?: DeleteHashesRequestPerPubkey; }, method: MethodBatchType ): Promise { @@ -645,8 +645,8 @@ async function sendEncryptedDataToSnode( }: WithRevokeSubRequest & { storeRequests: StoreRequestsPerPubkey; // keeping those as an array because the order needs to be enforced for some (groupkeys for instance) destination: T; - deleteHashesSubRequest: DeleteHashesRequestPerPubkey | null; - deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest | null; + deleteHashesSubRequest?: DeleteHashesRequestPerPubkey; + deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest; }): Promise { try { const batchResults = await pRetry( @@ -736,7 +736,6 @@ async function sendUnencryptedDataToSnode & { + { + device: destination, + identifier, + isDestinationClosedGroup, + plainTextBuffer, + }: Pick & { /** * plainTextBuffer is only required when sending a message to a 1o1, * as we need it to encrypt it again for our linked devices (synced messages) */ plainTextBuffer: Uint8Array | null; + /** + * We must not sync a message when it was sent to a closed group + */ + isDestinationClosedGroup: boolean; }, effectiveTimestamp: number, storedHash: string | null ) { // The wrappedEnvelope will be set only if the message is not one of OpenGroupV2Message type. - let fetchedMessage = await fetchHandleMessageSentData(sentMessage.identifier); + let fetchedMessage = await fetchHandleMessageSentData(identifier); if (!fetchedMessage) { return; } let sentTo = fetchedMessage.get('sent_to') || []; - const isOurDevice = UserUtils.isUsFromCache(sentMessage.device); + const isOurDevice = UserUtils.isUsFromCache(destination); - // FIXME this is not correct and will cause issues with syncing - // At this point the only way to check for medium - // group is by comparing the encryption type - const isClosedGroupMessage = - sentMessage.encryption === SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE || - PubKey.is03Pubkey(sentMessage.device); + const isClosedGroupMessage = isDestinationClosedGroup || PubKey.is03Pubkey(destination); // We trigger a sync message only when the message is not to one of our devices, AND - // the message is not for an open group (there is no sync for opengroups, each device pulls all messages), AND + // the message is not for a group (there is no sync for groups, each device pulls all messages), AND // if we did not sync or trigger a sync message for this specific message already const shouldTriggerSyncMessage = !isOurDevice && @@ -100,16 +104,16 @@ async function handleSwarmMessageSentSuccess( // A message is synced if we triggered a sync message (sentSync) // and the current message was sent to our device (so a sync message) const shouldMarkMessageAsSynced = - isOurDevice && fetchedMessage.get('sentSync') && isClosedGroupMessage; + (isOurDevice && fetchedMessage.get('sentSync')) || isClosedGroupMessage; // Handle the sync logic here - if (shouldTriggerSyncMessage && sentMessage && sentMessage.plainTextBuffer) { + if (shouldTriggerSyncMessage && plainTextBuffer) { try { - const contentDecoded = SignalService.Content.decode(sentMessage.plainTextBuffer); + const contentDecoded = SignalService.Content.decode(plainTextBuffer); if (contentDecoded && contentDecoded.dataMessage) { try { await fetchedMessage.sendSyncMessage(contentDecoded, effectiveTimestamp); - const tempFetchMessage = await fetchHandleMessageSentData(sentMessage.identifier); + const tempFetchMessage = await fetchHandleMessageSentData(identifier); if (!tempFetchMessage) { window?.log?.warn( 'Got an error while trying to sendSyncMessage(): fetchedMessage is null' @@ -130,7 +134,7 @@ async function handleSwarmMessageSentSuccess( fetchedMessage.set({ synced: true }); } - sentTo = union(sentTo, [sentMessage.device]); + sentTo = union(sentTo, [destination]); if (storedHash) { fetchedMessage.updateMessageHash(storedHash); } diff --git a/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts b/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts index 9fdf9f831..44535bfc8 100644 --- a/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts +++ b/ts/session/utils/job_runners/jobs/GroupPendingRemovalsJob.ts @@ -161,7 +161,6 @@ class GroupPendingRemovalsJob extends PersistedJob; - deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest | null; + deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest; extraStoreRequests: Array; }): Promise { // save the dumps to DB even before trying to push them, so at least we have an up to date dumps in the DB in case of crash, no network etc await LibSessionUtil.saveDumpsToDb(groupPk); const { allOldHashes, messages: pendingConfigData } = await LibSessionUtil.pendingChangesForGroup(groupPk); - // If there are no pending changes then the job can just complete (next time something - // is updated we want to try and run immediately so don't schedule another run in this case) + // If there are no pending changes nor any requests to be made, + // then the job can just complete (next time something is updated we want + // to try and run immediately so don't schedule another run in this case) if ( isEmpty(pendingConfigData) && isEmpty(supplementalKeysSubRequest) && @@ -128,12 +129,13 @@ async function pushChangesToGroupSwarmIfNeeded({ const deleteHashesSubRequest = DeleteGroupHashesFactory.makeGroupHashesToDeleteSubRequest({ group, - allOldHashes, + messagesHashes: allOldHashes, }); const result = await MessageSender.sendEncryptedDataToSnode({ // Note: this is on purpose that supplementalKeysSubRequest is before pendingConfigRequests - // as this is to avoid a race condition where a device polls while we are posting the configs (already encrypted with the new keys) + // as this is to avoid a race condition where a device is polling right + // while we are posting the configs (already encrypted with the new keys) storeRequests: [...supplementalKeysSubRequest, ...pendingConfigRequests, ...extraStoreRequests], destination: groupPk, deleteHashesSubRequest, diff --git a/ts/session/utils/job_runners/jobs/UserSyncJob.ts b/ts/session/utils/job_runners/jobs/UserSyncJob.ts index c18234124..f1bc48e89 100644 --- a/ts/session/utils/job_runners/jobs/UserSyncJob.ts +++ b/ts/session/utils/job_runners/jobs/UserSyncJob.ts @@ -113,7 +113,7 @@ async function pushChangesToUserSwarmIfNeeded() { ? new DeleteHashesFromUserNodeSubRequest({ messagesHashes: [...changesToPush.allOldHashes], }) - : null; + : undefined; const result = await MessageSender.sendEncryptedDataToSnode({ storeRequests, diff --git a/ts/state/ducks/metaGroups.ts b/ts/state/ducks/metaGroups.ts index 4a1c018fc..74a5b9777 100644 --- a/ts/state/ducks/metaGroups.ts +++ b/ts/state/ducks/metaGroups.ts @@ -210,7 +210,6 @@ const initNewGroupInWrapper = createAsyncThunk( const result = await GroupSync.pushChangesToGroupSwarmIfNeeded({ groupPk, supplementalKeysSubRequest: [], - deleteAllMessagesSubRequest: null, extraStoreRequests, }); if (result !== RunJobResult.Success) { @@ -730,7 +729,6 @@ async function handleMemberAddedFromUI({ supplementalKeysSubRequest, revokeSubRequest, unrevokeSubRequest, - deleteAllMessagesSubRequest: null, extraStoreRequests, }); if (sequenceResult !== RunJobResult.Success) { @@ -834,7 +832,6 @@ async function handleMemberRemovedFromUI({ const sequenceResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({ groupPk, supplementalKeysSubRequest: [], - deleteAllMessagesSubRequest: null, extraStoreRequests, }); if (sequenceResult !== RunJobResult.Success) { @@ -914,7 +911,6 @@ async function handleNameChangeFromUI({ const batchResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({ groupPk, supplementalKeysSubRequest: [], - deleteAllMessagesSubRequest: null, extraStoreRequests, }); @@ -1035,7 +1031,6 @@ const triggerFakeAvatarUpdate = createAsyncThunk( const batchResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({ groupPk, supplementalKeysSubRequest: [], - deleteAllMessagesSubRequest: null, extraStoreRequests, }); if (!batchResult) {