fix: add the maxSizeMap to have priority per retrieve namespaces

pull/2620/head
Audric Ackermann 2 years ago
parent 6f6620f622
commit 760ce5caa5

@ -324,7 +324,7 @@ export const CopyMenuItem = (): JSX.Element | null => {
return ( return (
<Item <Item
onClick={() => { onClick={() => {
copyPublicKeyByConvoId(convoId); void copyPublicKeyByConvoId(convoId);
}} }}
> >
{copyIdLabel} {copyIdLabel}

@ -289,12 +289,7 @@ export async function handleNewClosedGroup(
// ***** Creating a new group ***** // ***** Creating a new group *****
window?.log?.info('Received a new ClosedGroup of id:', groupId); window?.log?.info('Received a new ClosedGroup of id:', groupId);
await ClosedGroup.addUpdateMessage( // we don't want the initial "AAA,BBB and You joined the group"
convo,
{ newName: name, joiningMembers: members },
envelope.senderIdentity || envelope.source, // new group message are coming as session messages
envelopeTimestamp
);
// We only set group admins on group creation // We only set group admins on group creation
const groupDetails: ClosedGroup.GroupInfo = { const groupDetails: ClosedGroup.GroupInfo = {

@ -424,6 +424,7 @@ async function handleLegacyGroupUpdate(latestEnvelopeTimestamp: number) {
} }
if (changes) { if (changes) {
// this commit will grab the latest encryption keypair and add it to the user group wrapper if needed
await legacyGroupConvo.commit(); await legacyGroupConvo.commit();
} }

@ -209,6 +209,7 @@ export class OpenGroupManagerV2 {
// will need it and access it from the db // will need it and access it from the db
await OpenGroupData.saveV2OpenGroupRoom(room); await OpenGroupData.saveV2OpenGroupRoom(room);
// TODOLATER make the requests made here retry a few times (can fail when trying to join a group too soon after a restart)
const roomInfos = await openGroupV2GetRoomInfoViaOnionV4({ const roomInfos = await openGroupV2GetRoomInfoViaOnionV4({
serverPubkey: serverPublicKey, serverPubkey: serverPublicKey,
serverUrl, serverUrl,

@ -1,3 +1,4 @@
import { last, orderBy } from 'lodash';
import { assertUnreachable } from '../../../types/sqlSharedTypes'; import { assertUnreachable } from '../../../types/sqlSharedTypes';
export enum SnodeNamespaces { export enum SnodeNamespaces {
@ -32,7 +33,7 @@ export enum SnodeNamespaces {
/** /**
* This is the namespace used to sync the closed group details for each of the closed groups we are polling * This is the namespace used to sync the closed group details for each of the closed groups we are polling
*/ */
ClosedGroupInfo = 1, // ClosedGroupInfo = 1,
} }
type PickEnum<T, K extends T> = { type PickEnum<T, K extends T> = {
@ -41,7 +42,7 @@ type PickEnum<T, K extends T> = {
export type SnodeNamespacesGroup = PickEnum< export type SnodeNamespacesGroup = PickEnum<
SnodeNamespaces, SnodeNamespaces,
SnodeNamespaces.ClosedGroupInfo | SnodeNamespaces.ClosedGroupMessage SnodeNamespaces.ClosedGroupMessage // | SnodeNamespaces.ClosedGroupInfo
>; >;
export type SnodeNamespacesUser = PickEnum< export type SnodeNamespacesUser = PickEnum<
@ -62,7 +63,7 @@ function isUserConfigNamespace(namespace: SnodeNamespaces) {
case SnodeNamespaces.UserGroups: case SnodeNamespaces.UserGroups:
case SnodeNamespaces.ConvoInfoVolatile: case SnodeNamespaces.ConvoInfoVolatile:
return true; return true;
case SnodeNamespaces.ClosedGroupInfo: // case SnodeNamespaces.ClosedGroupInfo:
case SnodeNamespaces.ClosedGroupMessage: case SnodeNamespaces.ClosedGroupMessage:
return false; return false;
@ -76,6 +77,56 @@ function isUserConfigNamespace(namespace: SnodeNamespaces) {
} }
} }
function namespacePriority(namespace: SnodeNamespaces): number {
switch (namespace) {
case SnodeNamespaces.UserMessages:
return 10;
case SnodeNamespaces.UserContacts:
return 1;
case SnodeNamespaces.UserProfile:
return 1;
case SnodeNamespaces.UserGroups:
return 1;
case SnodeNamespaces.ConvoInfoVolatile:
return 1;
case SnodeNamespaces.ClosedGroupMessage:
return 10;
default:
try {
assertUnreachable(namespace, `isUserConfigNamespace case not handled: ${namespace}`);
} catch (e) {
window.log.warn(`isUserConfigNamespace case not handled: ${namespace}: ${e.message}`);
return 1;
}
}
}
function maxSizeMap(namespaces: Array<SnodeNamespaces>) {
let lastSplit = 1;
const withPriorities = namespaces.map(namespace => {
return { namespace, priority: namespacePriority(namespace) };
});
const groupedByPriorities: Array<{ priority: number; namespaces: Array<SnodeNamespaces> }> = [];
withPriorities.forEach(item => {
if (!groupedByPriorities.find(p => p.priority === item.priority)) {
groupedByPriorities.push({ priority: item.priority, namespaces: [] });
}
groupedByPriorities.find(p => p.priority === item.priority)?.namespaces.push(item.namespace);
});
const sortedDescPriorities = orderBy(groupedByPriorities, ['priority'], ['desc']);
const lowestPriority = last(sortedDescPriorities)?.priority || 1;
const sizeMap = sortedDescPriorities.flatMap(m => {
const paddingForLowerPriority = m.priority === lowestPriority ? 0 : 1;
const splitsForPriority = paddingForLowerPriority + m.namespaces.length;
lastSplit *= splitsForPriority;
return m.namespaces.map(namespace => ({ namespace, maxSize: -lastSplit }));
});
return sizeMap;
}
export const SnodeNamespace = { export const SnodeNamespace = {
isUserConfigNamespace, isUserConfigNamespace,
maxSizeMap,
}; };

@ -22,13 +22,16 @@ async function buildRetrieveRequest(
ourPubkey: string, ourPubkey: string,
configHashesToBump: Array<string> | null configHashesToBump: Array<string> | null
): Promise<Array<RetrieveSubRequestType>> { ): Promise<Array<RetrieveSubRequestType>> {
const maxSizeMap = SnodeNamespace.maxSizeMap(namespaces);
const retrieveRequestsParams: Array<RetrieveSubRequestType> = await Promise.all( const retrieveRequestsParams: Array<RetrieveSubRequestType> = await Promise.all(
namespaces.map(async (namespace, index) => { namespaces.map(async (namespace, index) => {
const foundMaxSize = maxSizeMap.find(m => m.namespace === namespace)?.maxSize;
const retrieveParam = { const retrieveParam = {
pubkey, pubkey,
last_hash: lastHashes.at(index) || '', last_hash: lastHashes.at(index) || '',
namespace, namespace,
timestamp: GetNetworkTime.getNowWithNetworkOffset(), timestamp: GetNetworkTime.getNowWithNetworkOffset(),
max_size: foundMaxSize,
}; };
if (namespace === SnodeNamespaces.ClosedGroupMessage) { if (namespace === SnodeNamespaces.ClosedGroupMessage) {

@ -319,7 +319,7 @@ export class SwarmPolling {
) { ) {
// that pubkey is not tracked in the wrapper anymore. Just discard those messages and make sure we are not polling // that pubkey is not tracked in the wrapper anymore. Just discard those messages and make sure we are not polling
// TODOLATER we might need to do something like this for the new closed groups once released // TODOLATER we might need to do something like this for the new closed groups once released
await getSwarmPollingInstance().removePubkey(polledPubkey); getSwarmPollingInstance().removePubkey(polledPubkey);
} else { } else {
// trigger the handling of all the other messages, not shared config related // trigger the handling of all the other messages, not shared config related
newMessages.forEach(m => { newMessages.forEach(m => {

@ -24,6 +24,7 @@ import { getSwarmPollingInstance } from '../apis/snode_api';
import { SnodeNamespaces } from '../apis/snode_api/namespaces'; import { SnodeNamespaces } from '../apis/snode_api/namespaces';
import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupMemberLeftMessage'; import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupMemberLeftMessage';
import { UserUtils } from '../utils'; import { UserUtils } from '../utils';
import { isEmpty } from 'lodash';
let instance: ConversationController | null; let instance: ConversationController | null;
@ -458,30 +459,42 @@ async function leaveClosedGroup(groupId: string, fromSyncMessage: boolean) {
getSwarmPollingInstance().removePubkey(groupId); getSwarmPollingInstance().removePubkey(groupId);
if (!fromSyncMessage) { if (fromSyncMessage) {
// Send the update to the group // no need to send our leave message as our other device should already have sent it.
const ourLeavingMessage = new ClosedGroupMemberLeftMessage({ await cleanUpFullyLeftLegacyGroup(groupId);
timestamp: networkTimestamp, return;
groupId, }
identifier: dbMessage.id as string,
});
window?.log?.info(`We are leaving the group ${groupId}. Sending our leaving message.`); const keypair = await Data.getLatestClosedGroupEncryptionKeyPair(groupId);
// sent the message to the group and once done, remove everything related to this group if (!keypair || isEmpty(keypair) || isEmpty(keypair.publicHex) || isEmpty(keypair.privateHex)) {
const wasSent = await getMessageQueue().sendToPubKeyNonDurably({ // if we do not have a keypair, we won't be able to send our leaving message neither, so just skip sending it.
message: ourLeavingMessage, // this can happen when getting a group from a broken libsession usergroup wrapper, but not only.
namespace: SnodeNamespaces.ClosedGroupMessage, await cleanUpFullyLeftLegacyGroup(groupId);
pubkey: PubKey.cast(groupId), return;
}); }
// Send the update to the group
const ourLeavingMessage = new ClosedGroupMemberLeftMessage({
timestamp: networkTimestamp,
groupId,
identifier: dbMessage.id as string,
});
window?.log?.info(`We are leaving the group ${groupId}. Sending our leaving message.`);
// if we do not have a keypair for that group, we can't send our leave message, so just skip the message sending part
const wasSent = await getMessageQueue().sendToPubKeyNonDurably({
message: ourLeavingMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
pubkey: PubKey.cast(groupId),
});
if (wasSent) {
window?.log?.info( window?.log?.info(
`Leaving message sent ${groupId}. Removing everything related to this group.` `Leaving message sent ${groupId}. Removing everything related to this group.`
); );
if (wasSent) {
await cleanUpFullyLeftLegacyGroup(groupId);
}
} else {
await cleanUpFullyLeftLegacyGroup(groupId); await cleanUpFullyLeftLegacyGroup(groupId);
} }
// if we failed to send our leaving message, don't remove everything yet as we might want to retry sending our leaving message later.
} }
async function cleanUpFullyLeftLegacyGroup(groupId: string) { async function cleanUpFullyLeftLegacyGroup(groupId: string) {

@ -1,7 +1,6 @@
import _ from 'lodash'; import _ from 'lodash';
import { ClosedGroup, getMessageQueue } from '..'; import { ClosedGroup, getMessageQueue } from '..';
import { ConversationTypeEnum } from '../../models/conversationAttributes'; import { ConversationTypeEnum } from '../../models/conversationAttributes';
import { MessageModel } from '../../models/message';
import { addKeyPairToCacheAndDBIfNeeded } from '../../receiver/closedGroups'; import { addKeyPairToCacheAndDBIfNeeded } from '../../receiver/closedGroups';
import { ECKeyPair } from '../../receiver/keypairs'; import { ECKeyPair } from '../../receiver/keypairs';
import { openConversationWithMessages } from '../../state/ducks/conversations'; import { openConversationWithMessages } from '../../state/ducks/conversations';
@ -64,13 +63,8 @@ export async function createClosedGroup(groupName: string, members: Array<string
expireTimer: existingExpireTimer, expireTimer: existingExpireTimer,
}; };
// used for UI only, adding of a message to remind who is in the group and the name of the group // we don't want the initial "AAA and You joined the group"
const groupDiff: ClosedGroup.GroupDiff = {
newName: groupName,
joiningMembers: listOfMembers,
};
const dbMessage = await ClosedGroup.addUpdateMessage(convo, groupDiff, us, Date.now());
// be sure to call this before sending the message. // be sure to call this before sending the message.
// the sending pipeline needs to know from GroupUtils when a message is for a medium group // the sending pipeline needs to know from GroupUtils when a message is for a medium group
await ClosedGroup.updateOrCreateClosedGroup(groupDetails); await ClosedGroup.updateOrCreateClosedGroup(groupDetails);
@ -89,18 +83,17 @@ export async function createClosedGroup(groupName: string, members: Array<string
groupName, groupName,
admins, admins,
encryptionKeyPair, encryptionKeyPair,
dbMessage,
existingExpireTimer existingExpireTimer
); );
if (allInvitesSent) { if (allInvitesSent) {
const newHexKeypair = encryptionKeyPair.toHexKeyPair(); const newHexKeypair = encryptionKeyPair.toHexKeyPair();
await addKeyPairToCacheAndDBIfNeeded(groupPublicKey, newHexKeypair); await addKeyPairToCacheAndDBIfNeeded(groupPublicKey, newHexKeypair);
// Subscribe to this group id // Subscribe to this group id
getSwarmPollingInstance().addGroupId(new PubKey(groupPublicKey)); getSwarmPollingInstance().addGroupId(new PubKey(groupPublicKey));
} }
// commit again as now the keypair is saved and can be added to the libsession wrapper UserGroup
await convo.commit();
await forceSyncConfigurationNowIfNeeded(); await forceSyncConfigurationNowIfNeeded();
@ -118,7 +111,6 @@ async function sendToGroupMembers(
groupName: string, groupName: string,
admins: Array<string>, admins: Array<string>,
encryptionKeyPair: ECKeyPair, encryptionKeyPair: ECKeyPair,
dbMessage: MessageModel,
existingExpireTimer: number, existingExpireTimer: number,
isRetry: boolean = false isRetry: boolean = false
): Promise<any> { ): Promise<any> {
@ -128,7 +120,6 @@ async function sendToGroupMembers(
groupName, groupName,
admins, admins,
encryptionKeyPair, encryptionKeyPair,
dbMessage,
existingExpireTimer existingExpireTimer
); );
window?.log?.info(`Sending invites for group ${groupPublicKey} to ${listOfMembers}`); window?.log?.info(`Sending invites for group ${groupPublicKey} to ${listOfMembers}`);
@ -184,7 +175,6 @@ async function sendToGroupMembers(
groupName, groupName,
admins, admins,
encryptionKeyPair, encryptionKeyPair,
dbMessage,
existingExpireTimer, existingExpireTimer,
isRetrySend isRetrySend
); );
@ -202,7 +192,6 @@ function createInvitePromises(
groupName: string, groupName: string,
admins: Array<string>, admins: Array<string>,
encryptionKeyPair: ECKeyPair, encryptionKeyPair: ECKeyPair,
dbMessage: MessageModel,
existingExpireTimer: number existingExpireTimer: number
) { ) {
return listOfMembers.map(async m => { return listOfMembers.map(async m => {
@ -213,7 +202,6 @@ function createInvitePromises(
admins, admins,
keypair: encryptionKeyPair, keypair: encryptionKeyPair,
timestamp: Date.now(), timestamp: Date.now(),
identifier: dbMessage.id,
expireTimer: existingExpireTimer, expireTimer: existingExpireTimer,
}; };
const message = new ClosedGroupNewMessage(messageParams); const message = new ClosedGroupNewMessage(messageParams);

@ -26,6 +26,7 @@ import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group
import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage'; import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage';
import { UserUtils } from '../utils'; import { UserUtils } from '../utils';
import { fromHexToArray, toHex } from '../utils/String'; import { fromHexToArray, toHex } from '../utils/String';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
export type GroupInfo = { export type GroupInfo = {
id: string; id: string;
@ -419,7 +420,7 @@ async function generateAndSendNewEncryptionKeyPair(
const keypairsMessage = new ClosedGroupEncryptionPairMessage({ const keypairsMessage = new ClosedGroupEncryptionPairMessage({
groupId: toHex(groupId), groupId: toHex(groupId),
timestamp: Date.now(), timestamp: GetNetworkTime.getNowWithNetworkOffset(),
encryptedKeyPairs: wrappers, encryptedKeyPairs: wrappers,
}); });
@ -433,7 +434,9 @@ async function generateAndSendNewEncryptionKeyPair(
distributingClosedGroupEncryptionKeyPairs.delete(toHex(groupId)); distributingClosedGroupEncryptionKeyPairs.delete(toHex(groupId));
await addKeyPairToCacheAndDBIfNeeded(toHex(groupId), newKeyPair.toHexKeyPair()); await addKeyPairToCacheAndDBIfNeeded(toHex(groupId), newKeyPair.toHexKeyPair());
await groupConvo?.commit(); // this makes sure to include the new encryption keypair in the libsession usergroup wrapper
}; };
// this is to be sent to the group pubkey adress // this is to be sent to the group pubkey adress
await getMessageQueue().sendToGroup({ await getMessageQueue().sendToGroup({
message: keypairsMessage, message: keypairsMessage,

@ -296,7 +296,7 @@ async function queueNewJobIfNeeded() {
// if we did run at t=100, and it is currently t=110, the difference is 10 // if we did run at t=100, and it is currently t=110, the difference is 10
const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0); const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0);
// but we want to run every 30, so what we need is actually `30-10` from now = 20 // but we want to run every 30, so what we need is actually `30-10` from now = 20
const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 0); const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 1000);
window.log.debug('Scheduling ConfSyncJob: LATER'); window.log.debug('Scheduling ConfSyncJob: LATER');
await runners.configurationSyncRunner.addJob( await runners.configurationSyncRunner.addJob(

@ -132,14 +132,18 @@ async function insertGroupsFromDBIntoWrapperAndRefresh(convoId: string): Promise
window.log.debug(`inserting into usergroup wrapper "${foundConvo.id}"... }`); window.log.debug(`inserting into usergroup wrapper "${foundConvo.id}"... }`);
// this does the create or the update of the matching existing legacy group // this does the create or the update of the matching existing legacy group
if ( if (
!isEmpty(wrapperLegacyGroup.name) && !isEmpty(wrapperLegacyGroup.name) &&
!isEmpty(wrapperLegacyGroup.encPubkey) && !isEmpty(wrapperLegacyGroup.encPubkey) &&
!isEmpty(wrapperLegacyGroup.encSeckey) !isEmpty(wrapperLegacyGroup.encSeckey)
) { ) {
console.warn('inserting into user wrapper', wrapperLegacyGroup); window.log.debug('inserting into user wrapper', wrapperLegacyGroup);
await UserGroupsWrapperActions.setLegacyGroup(wrapperLegacyGroup); await UserGroupsWrapperActions.setLegacyGroup(wrapperLegacyGroup);
} else {
window.log.debug(
'not inserting legacy group as name, or encryption keypair is empty',
foundConvo.id
);
} }
await refreshCachedUserGroup(convoId); await refreshCachedUserGroup(convoId);

@ -0,0 +1,30 @@
import { expect } from 'chai';
import Sinon from 'sinon';
import { SnodeNamespace } from '../../../../session/apis/snode_api/namespaces';
// tslint:disable-next-line: max-func-body-length
describe('Snode namespaces', () => {
describe('maxSizeMap', () => {
afterEach(() => {
Sinon.restore();
});
it('single namespace 0 returns -1', () => {
expect(SnodeNamespace.maxSizeMap([0])).to.be.deep.eq([{ namespace: 0, maxSize: -1 }]);
});
it('single namespace config 5 returns -1', () => {
expect(SnodeNamespace.maxSizeMap([5])).to.be.deep.eq([{ namespace: 5, maxSize: -1 }]);
});
it('multiple namespaces config 0,2,3,4,5 returns [-2,-8,-8,-8,-8]', () => {
expect(SnodeNamespace.maxSizeMap([0, 2, 3, 4, 5])).to.be.deep.eq([
{ namespace: 0, maxSize: -2 }, // 0 has a priority of 10 so takes its own bucket
{ namespace: 2, maxSize: -8 }, // the 4 other ones are sharing the next bucket
{ namespace: 3, maxSize: -8 },
{ namespace: 4, maxSize: -8 },
{ namespace: 5, maxSize: -8 },
]);
});
});
});
Loading…
Cancel
Save