feat: track and delete configMessageHashes

pull/2620/head
Audric Ackermann 2 years ago
parent 7c56310e69
commit 6bbb16b46d

@ -16,7 +16,7 @@ export const ConfigDumpData: AsyncObjectWrapper<ConfigDumpDataNode> = {
},
saveConfigDump: (dump: ConfigDumpRow) => {
console.warn('saveConfigDump', dump);
if (dump.combinedMessageHashes.some(m => m && m.length < 5)) {
if (dump.combinedMessageHashes.some(m => Boolean(m && m.length < 5))) {
throw new Error('saveConfigDump combinedMessageHashes have invalid size');
}
return channels.saveConfigDump(dump);

@ -1062,7 +1062,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
options: {
fromSync?: boolean;
} = {},
shouldCommit = true
shouldCommit = true,
): Promise<void> {
let expireTimer = providedExpireTimer;
let source = providedSource;

@ -2,7 +2,8 @@
* Config dumps sql calls
*/
import { compact, flatten, isEmpty, uniq } from 'lodash';
import { compact, isEmpty, uniq } from 'lodash';
import { uniqFromListOfList } from '../../shared/string_utils';
import {
ConfigDumpDataNode,
ConfigDumpRow,
@ -36,10 +37,17 @@ function parseRowNoData(
return toRet;
}
export function uniqCompacted<T extends string>(list: Array<T>): Array<T> {
if (!list || !list.length) {
return [];
}
return uniq(compact(list));
}
function parseRowMessageHashes(row: CombinedMessageHashes): Array<string> {
if (!isEmpty(row.combinedMessageHashes) && row.combinedMessageHashes) {
try {
return JSON.parse(row.combinedMessageHashes) || [];
return uniqCompacted(JSON.parse(row.combinedMessageHashes));
} catch (e) {
console.warn('parseRowMessageHashes row failed');
}
@ -65,7 +73,10 @@ export const configDumpData: ConfigDumpDataNode = {
return compact(rows.map(parseRow));
},
getMessageHashesByVariantAndPubkey: (variant: ConfigWrapperObjectTypes, publicKey: string) => {
getMessageHashesByVariantAndPubkey: (
variant: ConfigWrapperObjectTypes,
publicKey: string
): Array<string> => {
const rows = assertGlobalInstance()
.prepare(
'SELECT combinedMessageHashes from configDump WHERE variant = $variant AND publicKey = $publicKey;'
@ -78,7 +89,10 @@ export const configDumpData: ConfigDumpDataNode = {
if (!rows) {
return [];
}
return uniq(flatten(rows.map(parseRowMessageHashes)));
const parsedRows: Array<Array<string>> = rows.map(parseRowMessageHashes);
const unique: Array<string> = uniqFromListOfList(parsedRows);
return unique;
},
getAllDumpsWithData: () => {
@ -123,7 +137,7 @@ export const configDumpData: ConfigDumpDataNode = {
.run({
publicKey,
variant,
combinedMessageHashes: JSON.stringify(combinedMessageHashes || []),
combinedMessageHashes: JSON.stringify(uniqCompacted(combinedMessageHashes)),
data,
});
},
@ -141,7 +155,7 @@ export const configDumpData: ConfigDumpDataNode = {
.run({
publicKey,
variant,
combinedMessageHashes: JSON.stringify(combinedMessageHashes || []),
combinedMessageHashes: JSON.stringify(uniqCompacted(combinedMessageHashes)),
});
},
@ -156,17 +170,8 @@ export const configDumpData: ConfigDumpDataNode = {
});
if (!rows) {
return new Set();
return new Array<string>();
}
const asArrays = compact(
rows.map(t => {
try {
return JSON.parse(t.combinedMessageHashes);
} catch {
return null;
}
})
);
return new Set(asArrays.flat(1));
return uniqFromListOfList(rows.map(parseRowMessageHashes));
},
};

@ -161,16 +161,17 @@ async function processMergingResults(
const variant = LibSessionUtil.kindToVariant(kind);
// We need to get the existing message hashes and combine them with the latest from the
// service node to ensure the next push will properly clean up old messages
const oldMessagesHashesSet = await ConfigDumpData.getCombinedHashesByVariantAndPubkey(
const oldMessagesHashes = await ConfigDumpData.getCombinedHashesByVariantAndPubkey(
variant,
envelope.source
);
const allMessageHashes = new Set([...oldMessagesHashesSet, ...finalResult.messageHashes]);
const allMessageHashes = [...oldMessagesHashes, ...finalResult.messageHashes];
const finalResultsHashes = new Set(finalResult.messageHashes);
// lodash does deep compare of Sets
const messageHashesChanged = !isEqual(oldMessagesHashesSet, finalResultsHashes);
const messageHashesChanged = !isEqual(oldMessagesHashes, finalResultsHashes);
if (finalResult.needsDump) {
// The config data had changes so regenerate the dump and save it
@ -180,7 +181,7 @@ async function processMergingResults(
data: dump,
publicKey: pubkey,
variant,
combinedMessageHashes: [...allMessageHashes],
combinedMessageHashes: allMessageHashes,
});
} else if (messageHashesChanged) {
// The config data didn't change but there were different messages on the service node
@ -188,7 +189,7 @@ async function processMergingResults(
await ConfigDumpData.saveCombinedMessageHashesForMatching({
publicKey: pubkey,
variant,
combinedMessageHashes: [...allMessageHashes],
combinedMessageHashes: allMessageHashes,
});
}

@ -1,3 +1,4 @@
import { omit } from 'lodash';
import { Snode } from '../../../data/data';
import { updateIsOnline } from '../../../state/ducks/onion';
import { doSnodeBatchRequest } from './batchRequest';
@ -38,7 +39,7 @@ async function buildRetrieveRequest(
};
const retrieveParamsLegacy: RetrieveLegacyClosedGroupSubRequestType = {
method: 'retrieve',
params: { ...retrieveLegacyClosedGroup },
params: omit(retrieveLegacyClosedGroup, 'timestamp'), // if we give a timestamp, a signature will be required by the service node, and we don't want to provide one as this is an unauthenticated namespace
};
return retrieveParamsLegacy;
@ -65,7 +66,6 @@ async function buildRetrieveRequest(
return retrieve;
})
);
return retrieveRequestsParams;
}

@ -318,8 +318,8 @@ export class SwarmPolling {
namespaces: Array<SnodeNamespaces>
): Promise<RetrieveMessagesResultsBatched | null> {
const namespaceLength = namespaces.length;
if (namespaceLength > 3 || namespaceLength <= 0) {
throw new Error('pollNodeForKey needs 1 or 2 namespaces to be given at all times');
if (namespaceLength > 5 || namespaceLength <= 0) {
throw new Error(`invalid number of retrieve namespace provided: ${namespaceLength}`);
}
const edkey = node.pubkey_ed25519;
const pkStr = pubkey.key;

@ -183,7 +183,7 @@ async function send(
async function sendMessagesDataToSnode(
params: Array<StoreOnNodeParamsNoSig>,
destination: string,
oldMessageHashes: Array<string> | null
oldMessageHashes: Set<string> | null
): Promise<NotEmptyArrayOfBatchResults> {
const rightDestination = params.filter(m => m.pubkey === destination);
const swarm = await getSwarmFor(destination);
@ -211,14 +211,14 @@ async function sendMessagesDataToSnode(
})
);
debugger;
const signedDeleteOldHashesRequest = oldMessageHashes?.length
? await SnodeSignature.getSnodeSignatureByHashesParams({
method: 'delete' as const,
messages: oldMessageHashes,
pubkey: destination,
})
: null;
const signedDeleteOldHashesRequest =
oldMessageHashes && oldMessageHashes.size
? await SnodeSignature.getSnodeSignatureByHashesParams({
method: 'delete' as const,
messages: [...oldMessageHashes],
pubkey: destination,
})
: null;
const snode = sample(swarm);
if (!snode) {
@ -351,7 +351,7 @@ async function encryptMessagesAndWrap(
async function sendMessagesToSnode(
params: Array<StoreOnNodeMessage>,
destination: string,
oldMessageHashes: Array<string> | null
oldMessageHashes: Set<string> | null
): Promise<NotEmptyArrayOfBatchResults | null> {
try {
const recipient = PubKey.cast(destination);

@ -297,6 +297,7 @@ export class PersistedJobRunner<T extends TypeOfPersistedData> {
this.deleteJobsByIdentifier([this.currentJob.persistedData.identifier]);
await this.writeJobsToDB();
} catch (e) {
window.log.warn(`JobRunner current ${this.jobRunnerType} failed with ${e.message}`);
if (
success === RunJobResult.PermanentFailure ||
nextJob.persistedData.currentRetry >= nextJob.persistedData.maxAttempts - 1

@ -130,6 +130,8 @@ class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData> {
return RunJobResult.PermanentFailure;
}
debugger;
let conversation = getConversationController().get(convoId);
if (!conversation) {
// return true so we do not retry this task.

@ -49,11 +49,10 @@ async function retrieveSingleDestinationChanges(): Promise<Array<SingleDestinati
const singleDestChanges: Array<SingleDestinationChanges> = Object.keys(groupedByDestination).map(
destination => {
const messages = groupedByDestination[destination];
const uniqHashes = compact(
uniq(messages.filter(m => m.oldMessageHashes).map(m => m.oldMessageHashes)).flat()
);
// the delete hashes sub request can be done accross namespaces, so we can do a single one of it with all the hashes to remove (per pubkey)
const hashes = compact(messages.map(m => m.oldMessageHashes)).flat();
return { allOldHashes: uniqHashes, destination, messages };
return { allOldHashes: hashes, destination, messages };
}
);
@ -78,7 +77,6 @@ function resultsToSuccessfulChange(
* As it is a sequence, the delete might have failed but the new config message might still be posted.
* So we need to check which request failed, and if it is the delete by hashes, we need to add the hash of the posted message to the list of hashes
*/
debugger;
try {
for (let i = 0; i < allResults.length; i++) {
@ -122,11 +120,12 @@ function resultsToSuccessfulChange(
`messagePostedHashes for j:${j}; didDeleteOldConfigMessages:${didDeleteOldConfigMessages}: `,
messagePostedHashes
);
const updatedHashes: Array<string> = didDeleteOldConfigMessages
? [messagePostedHashes]
: uniq(compact([...request.allOldHashes, messagePostedHashes]));
successfulChanges.push({
publicKey: request.destination,
updatedHash: didDeleteOldConfigMessages
? [messagePostedHashes]
: [...request.allOldHashes, messagePostedHashes],
updatedHash: updatedHashes,
message: request.messages?.[j].message,
});
}
@ -134,7 +133,6 @@ function resultsToSuccessfulChange(
}
} catch (e) {
console.warn('eeee', e);
debugger;
throw e;
}
@ -232,7 +230,8 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
message: item.message,
};
});
return MessageSender.sendMessagesToSnode(msgs, dest.destination, dest.allOldHashes);
const asSet = new Set(dest.allOldHashes);
return MessageSender.sendMessagesToSnode(msgs, dest.destination, asSet);
})
);
@ -241,7 +240,6 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
);
// we do a sequence call here. If we do not have the right expected number of results, consider it
debugger;
if (!isArray(allResults) || allResults.length !== singleDestChanges.length) {
return RunJobResult.RetryJobIfPossible;
}

@ -29,7 +29,7 @@ export type OutgoingConfResult = {
message: SharedConfigMessage;
namespace: SnodeNamespaces;
destination: string;
oldMessageHashes?: Array<string>;
oldMessageHashes: Array<string>;
};
async function insertUserProfileIntoWrapperIfChanged() {
@ -149,13 +149,18 @@ async function pendingChangesForPubkey(pubkey: string): Promise<Array<OutgoingCo
if (pubkey === us) {
LibSessionUtil.requiredUserDumpVariants.forEach(requiredVariant => {
if (!dumps.find(m => m.publicKey === us && m.variant === requiredVariant)) {
dumps.push({ publicKey: us, variant: requiredVariant, combinedMessageHashes: [] });
dumps.push({
publicKey: us,
variant: requiredVariant,
combinedMessageHashes: [],
});
}
});
}
const results: Array<OutgoingConfResult> = [];
debugger;
for (let index = 0; index < dumps.length; index++) {
const dump = dumps[index];
const variant = dump.variant;

@ -0,0 +1,8 @@
import { compact, uniq } from 'lodash';
/**
* Returns a compact list of all the items present in all those arrays, once each only.
*/
export function uniqFromListOfList<T extends string>(list: Array<Array<T>>): Array<T> {
return uniq(compact(list.flat()));
}

@ -2,7 +2,8 @@ import { expect } from 'chai';
import { from_hex, from_string } from 'libsodium-wrappers-sumo';
// tslint:disable: chai-vague-errors no-unused-expression no-http-string no-octal-literal whitespace
// tslint:disable: chai-vague-errors no-unused-expression no-http-string no-octal-literal whitespace no-require-imports variable-name
import * as SessionUtilWrapper from 'session_util_wrapper';
describe('libsession_wrapper_contacts ', () => {
// Note: To run this test, you need to compile the libsession wrapper for node (and not for electron).
@ -16,10 +17,10 @@ describe('libsession_wrapper_contacts ', () => {
const edSecretKey = from_hex(
'0123456789abcdef0123456789abcdef000000000000000000000000000000004cb76fdc6d32278e3f83dbf608360ecc6b65727934b85d2fb86862ff98c46ab7'
);
const SessionUtilWrapper = require('session_util_wrapper');
// const SessionUtilWrapper = require('session_util_wrapper');
// Initialize a brand new, empty config because we have no dump data to deal with.
const contacts = new SessionUtilWrapper.ContactsConfigWrapper(edSecretKey, null);
const contacts = new SessionUtilWrapper.ContactsConfigWrapperInsideWorker(edSecretKey, null);
// We don't need to push anything, since this is an empty config
expect(contacts.needsPush()).to.be.eql(false);
@ -40,6 +41,8 @@ describe('libsession_wrapper_contacts ', () => {
expect(created.approvedMe).to.be.eq(false);
expect(created.blocked).to.be.eq(false);
expect(created.id).to.be.eq(real_id);
expect(created.profilePicture?.url).to.be.eq(undefined);
expect(created.profilePicture?.key).to.be.eq(undefined);
expect(contacts.needsPush()).to.be.eql(false);
expect(contacts.needsDump()).to.be.eql(false);
@ -62,14 +65,15 @@ describe('libsession_wrapper_contacts ', () => {
expect(updated?.approved).to.be.true;
expect(updated?.approvedMe).to.be.true;
expect(updated?.blocked).to.be.false;
expect(updated?.profilePicture).to.be.undefined;
created.profilePicture = { key: new Uint8Array([1, 2, 3]), url: 'fakeUrl' };
contacts.set(created);
updated = contacts.get(real_id);
const updated2 = contacts.get(real_id);
expect(updated?.profilePicture?.url).to.be.deep.eq('fakeUrl');
expect(updated?.profilePicture?.key).to.be.deep.eq(new Uint8Array([1, 2, 3]));
expect(updated2?.profilePicture?.url).to.be.deep.eq('fakeUrl');
expect(updated2?.profilePicture?.key).to.be.deep.eq(new Uint8Array([1, 2, 3]));
expect(contacts.needsPush()).to.be.eql(true);
expect(contacts.needsDump()).to.be.eql(true);
@ -84,7 +88,7 @@ describe('libsession_wrapper_contacts ', () => {
const dump = contacts.dump();
const contacts2 = new SessionUtilWrapper.ContactsConfigWrapper(edSecretKey, dump);
const contacts2 = new SessionUtilWrapper.ContactsConfigWrapperInsideWorker(edSecretKey, dump);
expect(contacts2.needsPush()).to.be.eql(false);
expect(contacts2.needsDump()).to.be.eql(false);
@ -101,10 +105,10 @@ describe('libsession_wrapper_contacts ', () => {
expect(x?.approvedMe).to.be.true;
expect(x?.blocked).to.be.false;
const another_id = '051111111111111111111111111111111111111111111111111111111111111111';
contacts2.getOrCreate(another_id);
const anotherId = '051111111111111111111111111111111111111111111111111111111111111111';
contacts2.getOrCreate(anotherId);
contacts2.set({
id: another_id,
id: anotherId,
});
// We're not setting any fields, but we should still keep a record of the session id
expect(contacts2.needsPush()).to.be.true;
@ -125,7 +129,7 @@ describe('libsession_wrapper_contacts ', () => {
const nicknames = allContacts.map((m: any) => m.nickname || '(N/A)');
expect(session_ids.length).to.be.eq(2);
expect(session_ids).to.be.deep.eq([real_id, another_id]);
expect(session_ids).to.be.deep.eq([real_id, anotherId]);
expect(nicknames).to.be.deep.eq(['Joey', '(N/A)']);
// Conflict! Oh no!
@ -173,12 +177,12 @@ describe('libsession_wrapper_contacts ', () => {
expect(contacts2.needsPush()).to.be.false;
const allContacts2 = contacts.getAll();
const session_ids2 = allContacts2.map((m: any) => m.id);
const sessionIds2 = allContacts2.map((m: any) => m.id);
const nicknames2 = allContacts2.map((m: any) => m.nickname || '(N/A)');
expect(session_ids2.length).to.be.eq(2);
expect(sessionIds2.length).to.be.eq(2);
expect(nicknames2.length).to.be.eq(2);
expect(session_ids2).to.be.deep.eq([another_id, third_id]);
expect(sessionIds2).to.be.deep.eq([anotherId, third_id]);
expect(nicknames2).to.be.deep.eq(['(N/A)', 'Nickname 3']);
});
@ -189,13 +193,13 @@ describe('libsession_wrapper_contacts ', () => {
const SessionUtilWrapper = require('session_util_wrapper');
// Initialize a brand new, empty config because we have no dump data to deal with.
const contacts = new SessionUtilWrapper.ContactsConfigWrapper(edSecretKey, null);
const contacts = new SessionUtilWrapper.ContactsConfigWrapperInsideWorker(edSecretKey, null);
const real_id = '050000000000000000000000000000000000000000000000000000000000000000';
const realId = '050000000000000000000000000000000000000000000000000000000000000000';
expect(contacts.get(real_id)).to.be.null;
const c = contacts.getOrCreate(real_id);
expect(c.id).to.be.eq(real_id);
expect(contacts.get(realId)).to.be.null;
const c = contacts.getOrCreate(realId);
expect(c.id).to.be.eq(realId);
expect(c.name).to.be.null;
expect(c.nickname).to.be.null;
expect(c.approved).to.be.false;
@ -214,7 +218,7 @@ describe('libsession_wrapper_contacts ', () => {
// contacts.setApproved(real_id, c.approved);
// contacts.setApprovedMe(real_id, c.approvedMe);
const c2 = contacts.getOrCreate(real_id);
const c2 = contacts.getOrCreate(realId);
expect(c2.name).to.be.eq('Joe');
expect(c2.nickname).to.be.eq('Joey');
expect(c2.approved).to.be.true;
@ -229,14 +233,14 @@ describe('libsession_wrapper_contacts ', () => {
let push1 = contacts.push();
expect(push1.seqno).to.be.equal(1);
const contacts2 = new SessionUtilWrapper.ContactsConfigWrapper(edSecretKey, null);
const contacts2 = new SessionUtilWrapper.ContactsConfigWrapperInsideWorker(edSecretKey, null);
let accepted = contacts2.merge([push1.data]);
expect(accepted).to.be.equal(1);
contacts.confirmPushed(push1.seqno);
let c3 = contacts2.getOrCreate(real_id);
let c3 = contacts2.getOrCreate(realId);
expect(c3.name).to.be.eq('Joe');
expect(c3.nickname).to.be.eq('Joey');
expect(c3.approved).to.be.true;
@ -268,7 +272,7 @@ describe('libsession_wrapper_contacts ', () => {
expect(session_ids2.length).to.be.eq(2);
expect(nicknames2.length).to.be.eq(2);
expect(session_ids2).to.be.deep.eq([real_id, another_id]);
expect(session_ids2).to.be.deep.eq([realId, another_id]);
expect(nicknames2).to.be.deep.eq(['Joey', '(N/A)']);
// Changing things while iterating:
@ -278,7 +282,7 @@ describe('libsession_wrapper_contacts ', () => {
let deletions = 0;
let non_deletions = 0;
allContacts3.forEach((c: any) => {
if (c.id !== real_id) {
if (c.id !== realId) {
contacts.erase(c.id);
deletions++;
} else {
@ -288,7 +292,7 @@ describe('libsession_wrapper_contacts ', () => {
expect(deletions).to.be.eq(1);
expect(non_deletions).to.be.eq(1);
expect(contacts.get(real_id)).to.exist;
expect(contacts.get(realId)).to.exist;
expect(contacts.get(another_id)).to.be.null;
});
});

@ -21,7 +21,7 @@ describe('libsession_wrapper', () => {
const SessionUtilWrapper = require('session_util_wrapper');
// Initialize a brand new, empty config because we have no dump data to deal with.
const conf = new SessionUtilWrapper.UserConfigWrapper(edSecretKey, null);
const conf = new SessionUtilWrapper.UserConfigWrapperInsideWorker(edSecretKey, null);
// We don't need to push anything, since this is an empty config
expect(conf.needsPush()).to.be.eql(false);
@ -116,9 +116,8 @@ describe('libsession_wrapper', () => {
// Now we're going to set up a second, competing config object (in the real world this would be
// another Session client somewhere).
// Start with an empty config, as above:
const conf2 = new SessionUtilWrapper.UserConfigWrapper(edSecretKey, null);
// Start with an empty config, as abo.setve:
const conf2 = new SessionUtilWrapper.UserConfigWrapperInsideWorker(edSecretKey, null);
expect(conf2.needsDump()).to.be.false;

@ -34,7 +34,7 @@ export type ConfigDumpRow = {
variant: ConfigWrapperObjectTypes; // the variant this entry is about. (user pr, contacts, ...)
publicKey: string; // either our pubkey if a dump for our own swarm or the closed group pubkey
data: Uint8Array; // the blob returned by libsession.dump() call
combinedMessageHashes: Array<string>; // array of lastHashes to keep track of
combinedMessageHashes: Array<string>; // set of lastHashes to keep track of
// we might need to add a `seqno` field here.
};
@ -62,7 +62,7 @@ export type ConfigDumpDataNode = {
getCombinedHashesByVariantAndPubkey: (
variant: ConfigWrapperObjectTypes,
pubkey: string
) => Set<string>;
) => Array<string>;
};
// ========== unprocessed

@ -1,8 +0,0 @@
/**
* @license
* Lodash <https://lodash.com/>
* Copyright OpenJS Foundation and other contributors <https://openjsf.org/>
* Released under MIT license <https://lodash.com/license>
* Based on Underscore.js 1.8.3 <http://underscorejs.org/LICENSE>
* Copyright Jeremy Ashkenas, DocumentCloud and Investigative Reporters & Editors
*/
Loading…
Cancel
Save