fix: add a way to empty all seenHashes & lastHashes for a pukbey

pull/3281/head
Audric Ackermann 5 months ago
parent 760aac3723
commit 315b4b4e4c
No known key found for this signature in database

@ -18,6 +18,7 @@ import {
AsyncWrapper,
MsgDuplicateSearchOpenGroup,
SaveConversationReturn,
SaveSeenMessageHash,
UnprocessedDataNode,
UpdateLastHashType,
} from '../types/sqlSharedTypes';
@ -215,17 +216,16 @@ async function cleanLastHashes(): Promise<void> {
await channels.cleanLastHashes();
}
export type SeenMessageHashes = {
expiresAt: number;
hash: string;
};
async function saveSeenMessageHashes(data: Array<SeenMessageHashes>): Promise<void> {
async function saveSeenMessageHashes(data: Array<SaveSeenMessageHash>): Promise<void> {
await channels.saveSeenMessageHashes(cleanData(data));
}
async function clearLastHashesForConvoId(data: { convoId: string }): Promise<void> {
await channels.clearLastHashesForConvoId(cleanData(data));
async function clearLastHashesForConvoId(conversationId: string): Promise<void> {
await channels.clearLastHashesForConvoId(conversationId);
}
async function emptySeenMessageHashesForConversation(conversationId: string): Promise<void> {
await channels.emptySeenMessageHashesForConversation(conversationId);
}
async function updateLastHash(data: UpdateLastHashType): Promise<void> {
@ -867,6 +867,7 @@ export const Data = {
cleanLastHashes,
clearLastHashesForConvoId,
saveSeenMessageHashes,
emptySeenMessageHashesForConversation,
updateLastHash,
saveMessage,
saveMessages,

@ -40,6 +40,7 @@ const channelsToMake = new Set([
'updateLastHash',
'clearLastHashesForConvoId',
'saveSeenMessageHashes',
'emptySeenMessageHashesForConversation',
'saveMessages',
'removeMessage',
'removeMessagesByIds',

@ -242,6 +242,7 @@ export async function declineConversationWithoutConfirm({
forceDestroyForAllMembers: false,
fromSyncMessage: false,
sendLeaveMessage: false,
clearFetchedHashes: false,
});
}
@ -435,6 +436,7 @@ async function leaveGroupOrCommunityByConvoId({
deleteAllMessagesOnSwarm: false,
deletionType: 'doNotKeep',
forceDestroyForAllMembers: false,
clearFetchedHashes: true,
});
}
await clearConversationInteractionState({ conversationId });

@ -17,6 +17,7 @@ export const ITEMS_TABLE = 'items';
export const ATTACHMENT_DOWNLOADS_TABLE = 'attachment_downloads';
export const CLOSED_GROUP_V2_KEY_PAIRS_TABLE = 'encryptionKeyPairsForClosedGroupV2';
export const LAST_HASHES_TABLE = 'lastHashes';
export const SEEN_MESSAGE_TABLE = 'seenMessages';
export const HEX_KEY = /[^0-9A-Fa-f]/;

@ -19,6 +19,7 @@ import {
MESSAGES_TABLE,
NODES_FOR_PUBKEY_TABLE,
OPEN_GROUP_ROOMS_V2_TABLE,
SEEN_MESSAGE_TABLE,
dropFtsAndTriggers,
objectToJSON,
rebuildFtsTable,
@ -106,6 +107,7 @@ const LOKI_SCHEMA_VERSIONS = [
updateToSessionSchemaVersion36,
updateToSessionSchemaVersion37,
updateToSessionSchemaVersion38,
updateToSessionSchemaVersion39,
];
function updateToSessionSchemaVersion1(currentVersion: number, db: BetterSqlite3.Database) {
@ -2003,6 +2005,26 @@ function updateToSessionSchemaVersion38(currentVersion: number, db: BetterSqlite
console.log(`updateToSessionSchemaVersion${targetVersion}: success!`);
}
function updateToSessionSchemaVersion39(currentVersion: number, db: BetterSqlite3.Database) {
const targetVersion = 39;
if (currentVersion >= targetVersion) {
return;
}
console.log(`updateToSessionSchemaVersion${targetVersion}: starting...`);
db.transaction(() => {
db.exec(`ALTER TABLE ${SEEN_MESSAGE_TABLE} ADD COLUMN conversationId TEXT;`);
db.exec(`CREATE INDEX seen_hashes_per_pubkey ON ${SEEN_MESSAGE_TABLE} (
conversationId
);`);
writeSessionSchemaVersion(targetVersion, db);
})();
console.log(`updateToSessionSchemaVersion${targetVersion}: success!`);
}
export function printTableColumns(table: string, db: BetterSqlite3.Database) {
console.info(db.pragma(`table_info('${table}');`));
}

@ -11,6 +11,7 @@ import {
LAST_HASHES_TABLE,
MESSAGES_FTS_TABLE,
MESSAGES_TABLE,
SEEN_MESSAGE_TABLE,
} from '../database_utility';
import { getAppRootPath } from '../getRootPath';
import { updateSessionSchema } from './sessionMigrations';
@ -245,7 +246,7 @@ function updateToSchemaVersion6(currentVersion: number, db: BetterSqlite3.Databa
expiresAt INTEGER
);
CREATE TABLE seenMessages(
CREATE TABLE ${SEEN_MESSAGE_TABLE}(
hash TEXT PRIMARY KEY,
expiresAt INTEGER
);

@ -48,6 +48,7 @@ import {
NODES_FOR_PUBKEY_TABLE,
objectToJSON,
OPEN_GROUP_ROOMS_V2_TABLE,
SEEN_MESSAGE_TABLE,
toSqliteBoolean,
} from './database_utility';
import type { SetupI18nReturnType } from '../types/localizer'; // checked - only node
@ -58,6 +59,7 @@ import {
MsgDuplicateSearchOpenGroup,
roomHasBlindEnabled,
SaveConversationReturn,
SaveSeenMessageHash,
UnprocessedDataNode,
UnprocessedParameter,
UpdateLastHashType,
@ -937,12 +939,21 @@ function saveMessage(data: MessageAttributes) {
return id;
}
function saveSeenMessageHashes(arrayOfHashes: Array<string>) {
function saveSeenMessageHashes(arrayOfHashes: Array<SaveSeenMessageHash>) {
assertGlobalInstance().transaction(() => {
map(arrayOfHashes, saveSeenMessageHash);
})();
}
function emptySeenMessageHashesForConversation(conversationId: string) {
if (!isString(conversationId) || isEmpty(conversationId)) {
throw new Error('emptySeenMessageHashesForConversation: conversationId is not a string');
}
assertGlobalInstance()
.prepare(`DELETE FROM ${SEEN_MESSAGE_TABLE} WHERE conversationId=$conversationId`)
.run({ conversationId });
}
function updateLastHash(data: UpdateLastHashType) {
const { convoId, snode, hash, expiresAt, namespace } = data;
if (!isNumber(namespace)) {
@ -973,32 +984,43 @@ function updateLastHash(data: UpdateLastHashType) {
});
}
function clearLastHashesForConvoId(data: { convoId: string }) {
const { convoId } = data;
if (!isString(convoId)) {
throw new Error('clearLastHashesForPubkey: convoId not a string');
function clearLastHashesForConvoId(conversationId: string) {
if (!isString(conversationId) || isEmpty(conversationId)) {
throw new Error('clearLastHashesForConvoId: conversationId is not a string');
}
assertGlobalInstance().prepare(`DELETE FROM ${LAST_HASHES_TABLE} WHERE pubkey=$convoId;`).run({
convoId,
});
assertGlobalInstance()
.prepare(`DELETE FROM ${LAST_HASHES_TABLE} WHERE id=$conversationId`)
.run({ conversationId });
}
function saveSeenMessageHash(data: any) {
const { expiresAt, hash } = data;
function saveSeenMessageHash(data: SaveSeenMessageHash) {
const { expiresAt, hash, conversationId } = data;
if (!isString(conversationId)) {
throw new Error('saveSeenMessageHash conversationId must be a string');
}
if (!isString(hash)) {
throw new Error('saveSeenMessageHash hash must be a string');
}
if (!isNumber(expiresAt)) {
throw new Error('saveSeenMessageHash expiresAt must be a number');
}
try {
assertGlobalInstance()
.prepare(
`INSERT OR REPLACE INTO seenMessages (
`INSERT OR REPLACE INTO ${SEEN_MESSAGE_TABLE} (
expiresAt,
hash
hash,
conversationId
) values (
$expiresAt,
$hash
$hash,
$conversationId
);`
)
.run({
expiresAt,
hash,
conversationId,
});
} catch (e) {
console.error('saveSeenMessageHash failed:', e.message);
@ -1012,7 +1034,7 @@ function cleanLastHashes() {
}
function cleanSeenMessages() {
assertGlobalInstance().prepare('DELETE FROM seenMessages WHERE expiresAt <= $now;').run({
assertGlobalInstance().prepare(`DELETE FROM ${SEEN_MESSAGE_TABLE} WHERE expiresAt <= $now;`).run({
now: Date.now(),
});
}
@ -1748,7 +1770,9 @@ function getLastHashBySnode(convoId: string, snode: string, namespace: number) {
function getSeenMessagesByHashList(hashes: Array<string>) {
const rows = assertGlobalInstance()
.prepare(`SELECT * FROM seenMessages WHERE hash IN ( ${hashes.map(() => '?').join(', ')} );`)
.prepare(
`SELECT * FROM ${SEEN_MESSAGE_TABLE} WHERE hash IN ( ${hashes.map(() => '?').join(', ')} );`
)
.all(hashes);
return map(rows, row => row.hash);
@ -2008,7 +2032,7 @@ function removeAll() {
DELETE FROM ${LAST_HASHES_TABLE};
DELETE FROM ${NODES_FOR_PUBKEY_TABLE};
DELETE FROM ${CLOSED_GROUP_V2_KEY_PAIRS_TABLE};
DELETE FROM seenMessages;
DELETE FROM ${SEEN_MESSAGE_TABLE};
DELETE FROM ${CONVERSATIONS_TABLE};
DELETE FROM ${MESSAGES_TABLE};
DELETE FROM ${ATTACHMENT_DOWNLOADS_TABLE};
@ -2657,7 +2681,7 @@ export const sqlNode = {
cleanLastHashes,
clearLastHashesForConvoId,
saveSeenMessageHashes,
saveSeenMessageHash,
emptySeenMessageHashesForConversation,
updateLastHash,
saveMessages,
removeMessage,

@ -759,6 +759,7 @@ async function handleSingleGroupUpdateToLeave(toLeave: GroupPubkeyType) {
deletionType: 'doNotKeep',
deleteAllMessagesOnSwarm: false,
forceDestroyForAllMembers: false,
clearFetchedHashes: true,
});
} catch (e) {
window.log.info('Failed to deleteClosedGroup with: ', e.message);

@ -57,6 +57,7 @@ async function handleLibSessionKickedMessage({
deletionType: inviteWasPending ? 'doNotKeep' : 'keepAsKicked',
deleteAllMessagesOnSwarm: false,
forceDestroyForAllMembers: false,
clearFetchedHashes: true,
});
}

@ -489,7 +489,7 @@ export class SwarmPolling {
const newMessages = await this.handleSeenMessages(uniqOtherMsgs);
window.log.info(
`SwarmPolling: handleSeenMessages: ${newMessages.length} out of ${uniqOtherMsgs.length} are not seen yet. snode: ${toPollFrom ? ed25519Str(toPollFrom.pubkey_ed25519) : 'undefined'}`
`SwarmPolling: handleSeenMessages: ${newMessages.length} out of ${uniqOtherMsgs.length} are not seen yet about pk:${ed25519Str(pubkey)} snode: ${toPollFrom ? ed25519Str(toPollFrom.pubkey_ed25519) : 'undefined'}`
);
if (type === ConversationTypeEnum.GROUPV2) {
if (!PubKey.is03Pubkey(pubkey)) {
@ -510,7 +510,7 @@ export class SwarmPolling {
// private and legacy groups are cached, so we can mark them as seen right away, they are still in the cache until processed correctly.
// at some point we should get rid of the cache completely, and do the same logic as for groupv2 above
await this.updateSeenMessages(newMessages);
await this.updateSeenMessages(newMessages, pubkey);
// trigger the handling of all the other messages, not shared config related and not groupv2 encrypted
newMessages.forEach(m => {
const extracted = extractWebSocketContent(m.data, m.hash);
@ -696,6 +696,7 @@ export class SwarmPolling {
deletionType: 'doNotKeep',
deleteAllMessagesOnSwarm: false,
forceDestroyForAllMembers: false,
clearFetchedHashes: true,
});
}
}
@ -722,19 +723,22 @@ export class SwarmPolling {
}
const incomingHashes = messages.map((m: RetrieveMessageItem) => m.hash);
const dupHashes = await Data.getSeenMessagesByHashList(incomingHashes);
const newMessages = messages.filter((m: RetrieveMessageItem) => !dupHashes.includes(m.hash));
return newMessages;
}
private async updateSeenMessages(processedMessages: Array<RetrieveMessageItem>) {
private async updateSeenMessages(
processedMessages: Array<RetrieveMessageItem>,
conversationId: string
) {
if (processedMessages.length) {
const newHashes = processedMessages.map((m: RetrieveMessageItem) => ({
// NOTE setting expiresAt will trigger the global function destroyExpiredMessages() on it's next interval
expiresAt: m.expiration,
hash: m.hash,
conversationId,
}));
await Data.saveSeenMessageHashes(newHashes);
}
@ -822,6 +826,17 @@ export class SwarmPolling {
return this.lastHashes[nodeEdKey][pubkey][namespace];
}
public async resetLastHashesForConversation(conversationId: string) {
await Data.clearLastHashesForConvoId(conversationId);
const snodeKeys = Object.keys(this.lastHashes);
for (let index = 0; index < snodeKeys.length; index++) {
const snodeKey = snodeKeys[index];
if (!isEmpty(this.lastHashes[snodeKey][conversationId])) {
this.lastHashes[snodeKey][conversationId] = {};
}
}
}
public async pollOnceForOurDisplayName(abortSignal?: AbortSignal): Promise<string> {
if (abortSignal?.aborted) {
throw new NotFoundError('[pollOnceForOurDisplayName] aborted right away');
@ -1089,6 +1104,7 @@ async function handleMessagesForGroupV2(
{
hash: msg.hash,
expiresAt: msg.expiration,
conversationId: groupPk,
},
]);
} catch (e) {
@ -1096,9 +1112,4 @@ async function handleMessagesForGroupV2(
}
}
}
// make sure that all the message above are indeed seen (extra check as everything should already be marked as seen in the loop above)
await Data.saveSeenMessageHashes(
newMessages.map(m => ({ hash: m.hash, expiresAt: m.expiration }))
);
}

@ -48,6 +48,7 @@ async function handleMetaMergeResults(groupPk: GroupPubkeyType) {
deletionType: 'keepAsDestroyed', // we just got something from the group's swarm, so it is not pendingInvite
deleteAllMessagesOnSwarm: false,
forceDestroyForAllMembers: false,
clearFetchedHashes: true,
});
} else {
if (

@ -259,11 +259,13 @@ class ConvoController {
deletionType,
deleteAllMessagesOnSwarm,
forceDestroyForAllMembers,
clearFetchedHashes,
}: DeleteOptions & {
sendLeaveMessage: boolean;
deletionType: 'doNotKeep' | 'keepAsKicked' | 'keepAsDestroyed';
deleteAllMessagesOnSwarm: boolean;
forceDestroyForAllMembers: boolean;
clearFetchedHashes: boolean;
}
) {
if (!PubKey.is03Pubkey(groupPk)) {
@ -271,7 +273,7 @@ class ConvoController {
}
window.log.info(
`deleteGroup: ${ed25519Str(groupPk)}, sendLeaveMessage:${sendLeaveMessage}, fromSyncMessage:${fromSyncMessage}, deletionType:${deletionType}, deleteAllMessagesOnSwarm:${deleteAllMessagesOnSwarm}, forceDestroyForAllMembers:${forceDestroyForAllMembers}`
`deleteGroup: ${ed25519Str(groupPk)}, sendLeaveMessage:${sendLeaveMessage}, fromSyncMessage:${fromSyncMessage}, deletionType:${deletionType}, deleteAllMessagesOnSwarm:${deleteAllMessagesOnSwarm}, forceDestroyForAllMembers:${forceDestroyForAllMembers}, clearFetchedHashes:${clearFetchedHashes}`
);
// this deletes all messages in the conversation
@ -373,6 +375,14 @@ class ConvoController {
await this.removeGroupOrCommunityFromDBAndRedux(groupPk);
}
// We want to clear the lastHash and the seenHashes of the corresponding group.
// We do this so that if we get reinvited to the group, we will
// fetch and display all the messages from the group's swarm again.
if (clearFetchedHashes) {
await getSwarmPollingInstance().resetLastHashesForConversation(groupPk);
await Data.emptySeenMessageHashesForConversation(groupPk);
}
await SessionUtilConvoInfoVolatile.removeGroupFromWrapper(groupPk);
// release the memory (and the current meta-dumps in memory for that group)
window.log.info(`freeing meta group wrapper: ${ed25519Str(groupPk)}`);

@ -4,7 +4,7 @@ import { AbortController } from 'abort-controller';
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { isArray, isEmpty, isNumber, isString } from 'lodash';
import pRetry from 'p-retry';
import { Data, SeenMessageHashes } from '../../data/data';
import { Data } from '../../data/data';
import { UserGroupsWrapperActions } from '../../webworker/workers/browser/libsession_worker_interface';
import { OpenGroupMessageV2 } from '../apis/open_group_api/opengroupV2/OpenGroupMessageV2';
import {
@ -53,7 +53,7 @@ import { UserUtils } from '../utils';
import { ed25519Str, fromUInt8ArrayToBase64 } from '../utils/String';
import { MessageSentHandler } from './MessageSentHandler';
import { EncryptAndWrapMessageResults, MessageWrapper } from './MessageWrapper';
import { stringify } from '../../types/sqlSharedTypes';
import { SaveSeenMessageHash, stringify } from '../../types/sqlSharedTypes';
import { OpenGroupRequestCommonType } from '../../data/types';
import { NetworkTime } from '../../util/NetworkTime';
import { MergedAbortSignal } from '../apis/snode_api/requestWith';
@ -632,7 +632,7 @@ async function handleBatchResultWithSubRequests({
return;
}
const seenHashes: Array<SeenMessageHashes> = [];
const seenHashes: Array<SaveSeenMessageHash> = [];
for (let index = 0; index < subRequests.length; index++) {
const subRequest = subRequests[index];
@ -658,6 +658,7 @@ async function handleBatchResultWithSubRequests({
seenHashes.push({
expiresAt: NetworkTime.now() + TTL_DEFAULT.CONTENT_MESSAGE, // non config msg expire at CONTENT_MESSAGE at most
hash: storedHash,
conversationId: destination,
});
// We need to store the hash of our synced message for a 1o1. (as this is the one stored on our swarm)

@ -268,6 +268,7 @@ const initNewGroupInWrapper = createAsyncThunk(
deletionType: 'doNotKeep',
deleteAllMessagesOnSwarm: false,
forceDestroyForAllMembers: false,
clearFetchedHashes: true,
});
}
throw e;

@ -319,3 +319,5 @@ export function stringify(obj: unknown) {
2
);
}
export type SaveSeenMessageHash = { expiresAt: number; hash: string; conversationId: string };

Loading…
Cancel
Save