move SwarmPolling from window to require singleton

pull/1592/head
Audric Ackermann 4 years ago
parent 51e95bb16a
commit 7ec663df71
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -747,11 +747,6 @@
Whisper.Notifications.enable(); Whisper.Notifications.enable();
}, window.CONSTANTS.NOTIFICATION_ENABLE_TIMEOUT_SECONDS * 1000); }, window.CONSTANTS.NOTIFICATION_ENABLE_TIMEOUT_SECONDS * 1000);
// TODO: Investigate the case where we reconnect
const ourKey = window.libsession.Utils.UserUtils.getOurPubKeyStrFromCache();
window.SwarmPolling.addPubkey(ourKey);
window.SwarmPolling.start();
window.NewReceiver.queueAllCached(); window.NewReceiver.queueAllCached();
initAPIs(); initAPIs();

@ -410,14 +410,6 @@ window.DataMessageReceiver = require('./ts/receiver/dataMessage');
window.NewSnodeAPI = require('./ts/session/snode_api/serviceNodeAPI'); window.NewSnodeAPI = require('./ts/session/snode_api/serviceNodeAPI');
window.SnodePool = require('./ts/session/snode_api/snodePool'); window.SnodePool = require('./ts/session/snode_api/snodePool');
if (process.env.USE_STUBBED_NETWORK) {
const { SwarmPollingStub } = require('./ts/session/snode_api/swarmPollingStub');
window.SwarmPolling = new SwarmPollingStub();
} else {
const { SwarmPolling } = require('./ts/session/snode_api/swarmPolling');
window.SwarmPolling = new SwarmPolling();
}
// eslint-disable-next-line no-extend-native,func-names // eslint-disable-next-line no-extend-native,func-names
Promise.prototype.ignore = function() { Promise.prototype.ignore = function() {
// eslint-disable-next-line more/no-then // eslint-disable-next-line more/no-then

@ -33,6 +33,7 @@ import { cleanUpOldDecryptedMedias } from '../../session/crypto/DecryptedAttachm
import { OpenGroupManagerV2 } from '../../opengroup/opengroupV2/OpenGroupManagerV2'; import { OpenGroupManagerV2 } from '../../opengroup/opengroupV2/OpenGroupManagerV2';
import { loadDefaultRooms } from '../../opengroup/opengroupV2/ApiUtil'; import { loadDefaultRooms } from '../../opengroup/opengroupV2/ApiUtil';
import { forceRefreshRandomSnodePool } from '../../session/snode_api/snodePool'; import { forceRefreshRandomSnodePool } from '../../session/snode_api/snodePool';
import { SwarmPolling } from '../../session/snode_api/swarmPolling';
// tslint:disable-next-line: no-import-side-effect no-submodule-imports // tslint:disable-next-line: no-import-side-effect no-submodule-imports
export enum SectionType { export enum SectionType {
@ -176,6 +177,11 @@ const doAppStartUp = (dispatch: Dispatch<any>) => {
void triggerSyncIfIfNeeded(); void triggerSyncIfIfNeeded();
void loadDefaultRooms(); void loadDefaultRooms();
// TODO: Investigate the case where we reconnect
const ourKey = UserUtils.getOurPubKeyStrFromCache();
SwarmPolling.getInstance().addPubkey(ourKey);
SwarmPolling.getInstance().start();
}; };
/** /**

@ -33,6 +33,7 @@ import { MessageController } from '../session/messages';
import { ClosedGroupEncryptionPairReplyMessage } from '../session/messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairReplyMessage'; import { ClosedGroupEncryptionPairReplyMessage } from '../session/messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairReplyMessage';
import { queueAllCachedFromSource } from './receiver'; import { queueAllCachedFromSource } from './receiver';
import { actions as conversationActions } from '../state/ducks/conversations'; import { actions as conversationActions } from '../state/ducks/conversations';
import { SwarmPolling } from '../session/snode_api/swarmPolling';
export const distributingClosedGroupEncryptionKeyPairs = new Map<string, ECKeyPair>(); export const distributingClosedGroupEncryptionKeyPairs = new Map<string, ECKeyPair>();
@ -237,13 +238,32 @@ export async function handleNewClosedGroup(
await addClosedGroupEncryptionKeyPair(groupId, ecKeyPair.toHexKeyPair()); await addClosedGroupEncryptionKeyPair(groupId, ecKeyPair.toHexKeyPair());
// start polling for this new group // start polling for this new group
window.SwarmPolling.addGroupId(PubKey.cast(groupId)); SwarmPolling.getInstance().addGroupId(PubKey.cast(groupId));
await removeFromCache(envelope); await removeFromCache(envelope);
// trigger decrypting of all this group messages we did not decrypt successfully yet. // trigger decrypting of all this group messages we did not decrypt successfully yet.
await queueAllCachedFromSource(groupId); await queueAllCachedFromSource(groupId);
} }
/**
*
* @param isKicked if true, we mark the reason for leaving as a we got kicked
*/
export async function markGroupAsLeftOrKicked(
groupPublicKey: string,
groupConvo: ConversationModel,
isKicked: boolean
) {
await removeAllClosedGroupEncryptionKeyPairs(groupPublicKey);
if (isKicked) {
groupConvo.set('isKickedFromGroup', true);
} else {
groupConvo.set('left', true);
}
SwarmPolling.getInstance().removePubkey(groupPublicKey);
}
async function handleUpdateClosedGroup( async function handleUpdateClosedGroup(
envelope: EnvelopePlus, envelope: EnvelopePlus,
groupUpdate: SignalService.DataMessage.ClosedGroupControlMessage, groupUpdate: SignalService.DataMessage.ClosedGroupControlMessage,
@ -273,17 +293,14 @@ async function handleUpdateClosedGroup(
await removeFromCache(envelope); await removeFromCache(envelope);
return; return;
} }
await removeAllClosedGroupEncryptionKeyPairs(groupPublicKey); await markGroupAsLeftOrKicked(groupPublicKey, convo, true);
// Disable typing:
convo.set('isKickedFromGroup', true);
window.SwarmPolling.removePubkey(groupPublicKey);
} else { } else {
if (convo.get('isKickedFromGroup')) { if (convo.get('isKickedFromGroup')) {
// Enable typing: // Enable typing:
convo.set('isKickedFromGroup', false); convo.set('isKickedFromGroup', false);
convo.set('left', false); convo.set('left', false);
// Subscribe to this group id // Subscribe to this group id
window.SwarmPolling.addGroupId(new PubKey(groupPublicKey)); SwarmPolling.getInstance().addGroupId(new PubKey(groupPublicKey));
} }
} }
@ -492,7 +509,7 @@ async function performIfValid(
} else if (groupUpdate.type === Type.MEMBERS_REMOVED) { } else if (groupUpdate.type === Type.MEMBERS_REMOVED) {
await handleClosedGroupMembersRemoved(envelope, groupUpdate, convo); await handleClosedGroupMembersRemoved(envelope, groupUpdate, convo);
} else if (groupUpdate.type === Type.MEMBER_LEFT) { } else if (groupUpdate.type === Type.MEMBER_LEFT) {
await handleClosedGroupMemberLeft(envelope, groupUpdate, convo); await handleClosedGroupMemberLeft(envelope, convo);
} else if (groupUpdate.type === Type.ENCRYPTION_KEY_PAIR_REQUEST) { } else if (groupUpdate.type === Type.ENCRYPTION_KEY_PAIR_REQUEST) {
if (window.lokiFeatureFlags.useRequestEncryptionKeyPair) { if (window.lokiFeatureFlags.useRequestEncryptionKeyPair) {
await handleClosedGroupEncryptionKeyPairRequest(envelope, groupUpdate, convo); await handleClosedGroupEncryptionKeyPairRequest(envelope, groupUpdate, convo);
@ -555,7 +572,7 @@ async function handleClosedGroupMembersAdded(
} }
if (await areWeAdmin(convo)) { if (await areWeAdmin(convo)) {
await sendLatestKeyPairToUsers(envelope, convo, convo.id, membersNotAlreadyPresent); await sendLatestKeyPairToUsers(convo, convo.id, membersNotAlreadyPresent);
} }
const members = [...oldMembers, ...membersNotAlreadyPresent]; const members = [...oldMembers, ...membersNotAlreadyPresent];
@ -616,10 +633,7 @@ async function handleClosedGroupMembersRemoved(
const ourPubKey = UserUtils.getOurPubKeyFromCache(); const ourPubKey = UserUtils.getOurPubKeyFromCache();
const wasCurrentUserRemoved = !membersAfterUpdate.includes(ourPubKey.key); const wasCurrentUserRemoved = !membersAfterUpdate.includes(ourPubKey.key);
if (wasCurrentUserRemoved) { if (wasCurrentUserRemoved) {
await removeAllClosedGroupEncryptionKeyPairs(groupPubKey); await markGroupAsLeftOrKicked(groupPublicKey, convo, true);
// Disable typing:
convo.set('isKickedFromGroup', true);
window.SwarmPolling.removePubkey(groupPubKey);
} }
// Generate and distribute a new encryption key pair if needed // Generate and distribute a new encryption key pair if needed
if (await areWeAdmin(convo)) { if (await areWeAdmin(convo)) {
@ -651,19 +665,103 @@ async function handleClosedGroupMembersRemoved(
await removeFromCache(envelope); await removeFromCache(envelope);
} }
async function handleClosedGroupMemberLeft( function isUserAZombie(convo: ConversationModel, user: PubKey) {
return convo.get('zombies').includes(user.key);
}
/**
* Returns true if the user was not a zombie and so was added to the zombies.
* No commit() are called
*/
function addMemberToZombies(
envelope: EnvelopePlus, envelope: EnvelopePlus,
groupUpdate: SignalService.DataMessage.ClosedGroupControlMessage, userToAdd: PubKey,
convo: ConversationModel convo: ConversationModel
): boolean {
const zombies = convo.get('zombies');
const isAlreadyZombie = isUserAZombie(convo, userToAdd);
if (isAlreadyZombie) {
return false;
}
console.warn('Marking user ', userToAdd.key, ' as a zombie');
convo.set('zombies', [...zombies, userToAdd.key]);
return true;
}
/**
*
* Returns true if the user was not a zombie and so was not removed from the zombies.
* Note: no commit() are made
*/
function removeMemberFromZombies(
envelope: EnvelopePlus,
userToAdd: PubKey,
convo: ConversationModel
): boolean {
const zombies = convo.get('zombies');
const isAlreadyAZombie = isUserAZombie(convo, userToAdd);
if (!isAlreadyAZombie) {
return false;
}
convo.set(
'zombies',
zombies.filter(z => z !== userToAdd.key)
);
return true;
}
async function handleClosedGroupAdminMemberLeft(
groupPublicKey: string,
isCurrentUserAdmin: boolean,
convo: ConversationModel,
envelope: EnvelopePlus
) {
// if the admin was remove and we are the admin, it can only be voluntary
await markGroupAsLeftOrKicked(groupPublicKey, convo, !isCurrentUserAdmin);
convo.set('members', []);
// everybody left ! this is how we disable a group when the admin left
const groupDiff: ClosedGroup.GroupDiff = {
leavingMembers: convo.get('members'),
};
await ClosedGroup.addUpdateMessage(convo, groupDiff, 'incoming', _.toNumber(envelope.timestamp));
convo.updateLastMessage();
await convo.commit();
await removeFromCache(envelope);
}
async function handleClosedGroupLeftOurself(
groupPublicKey: string,
convo: ConversationModel,
envelope: EnvelopePlus
) { ) {
await markGroupAsLeftOrKicked(groupPublicKey, convo, false);
const groupDiff: ClosedGroup.GroupDiff = {
leavingMembers: [envelope.senderIdentity],
};
await ClosedGroup.addUpdateMessage(convo, groupDiff, 'incoming', _.toNumber(envelope.timestamp));
convo.updateLastMessage();
// remove ourself from the list of members
convo.set(
'members',
convo.get('members').filter(m => !UserUtils.isUsFromCache(m))
);
await convo.commit();
await removeFromCache(envelope);
}
async function handleClosedGroupMemberLeft(envelope: EnvelopePlus, convo: ConversationModel) {
const sender = envelope.senderIdentity; const sender = envelope.senderIdentity;
const groupPublicKey = envelope.source; const groupPublicKey = envelope.source;
const didAdminLeave = convo.get('groupAdmins')?.includes(sender) || false; const didAdminLeave = convo.get('groupAdmins')?.includes(sender) || false;
// If the admin leaves the group is disbanded // If the admin leaves the group is disbanded
// otherwise, we remove the sender from the list of current members in this group // otherwise, we remove the sender from the list of current members in this group
const oldMembers = convo.get('members') || []; const oldMembers = convo.get('members') || [];
const leftMemberWasPresent = oldMembers.includes(sender); const newMembers = oldMembers.filter(s => s !== sender);
const members = didAdminLeave ? [] : oldMembers.filter(s => s !== sender);
// Show log if we sent this message ourself (from another device or not) // Show log if we sent this message ourself (from another device or not)
if (UserUtils.isUsFromCache(sender)) { if (UserUtils.isUsFromCache(sender)) {
@ -671,56 +769,42 @@ async function handleClosedGroupMemberLeft(
} }
const ourPubkey = UserUtils.getOurPubKeyStrFromCache(); const ourPubkey = UserUtils.getOurPubKeyStrFromCache();
// Generate and distribute a new encryption key pair if needed // if the admin leaves, the group is disabled for every members
const isCurrentUserAdmin = convo.get('groupAdmins')?.includes(ourPubkey) || false; const isCurrentUserAdmin = convo.get('groupAdmins')?.includes(ourPubkey) || false;
if (isCurrentUserAdmin && !!members.length) {
await ClosedGroup.generateAndSendNewEncryptionKeyPair(groupPublicKey, members);
}
if (didAdminLeave) { if (didAdminLeave) {
window.SwarmPolling.removePubkey(groupPublicKey); await handleClosedGroupAdminMemberLeft(groupPublicKey, isCurrentUserAdmin, convo, envelope);
return;
await removeAllClosedGroupEncryptionKeyPairs(groupPublicKey);
// Disable typing
// if the admin was remove and we are the admin, it can only be voluntary
if (isCurrentUserAdmin) {
convo.set('left', true);
} else {
convo.set('isKickedFromGroup', true);
} }
// if we are no longer a member, we LEFT from another device
if (!newMembers.includes(ourPubkey)) {
// stop polling, remove all stored pubkeys and make sure the UI does not let us write messages
await handleClosedGroupLeftOurself(groupPublicKey, convo, envelope);
return;
} }
const didWeLeaveFromAnotherDevice = !members.includes(ourPubkey);
if (didWeLeaveFromAnotherDevice) { // if we are the admin, and there are still some members after the member left, we send a new keypair
await removeAllClosedGroupEncryptionKeyPairs(groupPublicKey); // to the remaining members
// Disable typing: if (isCurrentUserAdmin && !!newMembers.length) {
convo.set('left', true); await ClosedGroup.generateAndSendNewEncryptionKeyPair(groupPublicKey, newMembers);
window.SwarmPolling.removePubkey(groupPublicKey);
} }
// Only add update message if we have something to show // Another member left, not us, not the admin, just another member.
if (leftMemberWasPresent) { // But this member was in the list of members (as performIfValid checks for that)
const groupDiff: ClosedGroup.GroupDiff = { const groupDiff: ClosedGroup.GroupDiff = {
leavingMembers: didAdminLeave ? oldMembers : [sender], leavingMembers: [sender],
}; };
await ClosedGroup.addUpdateMessage( await ClosedGroup.addUpdateMessage(convo, groupDiff, 'incoming', _.toNumber(envelope.timestamp));
convo,
groupDiff,
'incoming',
_.toNumber(envelope.timestamp)
);
convo.updateLastMessage(); convo.updateLastMessage();
} await addMemberToZombies(envelope, PubKey.cast(sender), convo);
convo.set('members', newMembers);
convo.set('members', members);
await convo.commit(); await convo.commit();
await removeFromCache(envelope); await removeFromCache(envelope);
} }
async function sendLatestKeyPairToUsers( async function sendLatestKeyPairToUsers(
envelope: EnvelopePlus,
groupConvo: ConversationModel, groupConvo: ConversationModel,
groupPubKey: string, groupPubKey: string,
targetUsers: Array<string> targetUsers: Array<string>
@ -778,7 +862,7 @@ async function handleClosedGroupEncryptionKeyPairRequest(
await removeFromCache(envelope); await removeFromCache(envelope);
return; return;
} }
await sendLatestKeyPairToUsers(envelope, groupConvo, groupPublicKey, [sender]); await sendLatestKeyPairToUsers(groupConvo, groupPublicKey, [sender]);
return removeFromCache(envelope); return removeFromCache(envelope);
} }
@ -850,7 +934,7 @@ export async function createClosedGroup(groupName: string, members: Array<string
}); });
// Subscribe to this group id // Subscribe to this group id
window.SwarmPolling.addGroupId(new PubKey(groupPublicKey)); SwarmPolling.getInstance().addGroupId(new PubKey(groupPublicKey));
await Promise.all(promises); await Promise.all(promises);

@ -22,13 +22,17 @@ import { ConversationModel, ConversationTypeEnum } from '../../models/conversati
import { MessageModel } from '../../models/message'; import { MessageModel } from '../../models/message';
import { MessageModelType } from '../../models/messageType'; import { MessageModelType } from '../../models/messageType';
import { MessageController } from '../messages'; import { MessageController } from '../messages';
import { distributingClosedGroupEncryptionKeyPairs } from '../../receiver/closedGroups'; import {
distributingClosedGroupEncryptionKeyPairs,
markGroupAsLeftOrKicked,
} from '../../receiver/closedGroups';
import { getMessageQueue } from '..'; import { getMessageQueue } from '..';
import { ClosedGroupAddedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupAddedMembersMessage'; import { ClosedGroupAddedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupAddedMembersMessage';
import { ClosedGroupEncryptionPairMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairMessage'; import { ClosedGroupEncryptionPairMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairMessage';
import { ClosedGroupEncryptionPairRequestMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairRequestMessage'; import { ClosedGroupEncryptionPairRequestMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairRequestMessage';
import { ClosedGroupNameChangeMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNameChangeMessage'; import { ClosedGroupNameChangeMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNameChangeMessage';
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage'; import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
import { SwarmPolling } from '../snode_api/swarmPolling';
import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage'; import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage';
import { updateOpenGroupV1 } from '../../opengroup/opengroupV1/OpenGroup'; import { updateOpenGroupV1 } from '../../opengroup/opengroupV1/OpenGroup';
import { updateOpenGroupV2 } from '../../opengroup/opengroupV2/OpenGroupUpdate'; import { updateOpenGroupV2 } from '../../opengroup/opengroupV2/OpenGroupUpdate';
@ -337,10 +341,10 @@ export async function leaveClosedGroup(groupId: string) {
window.log.info(`We are leaving the group ${groupId}. Sending our leaving message.`); window.log.info(`We are leaving the group ${groupId}. Sending our leaving message.`);
// sent the message to the group and once done, remove everything related to this group // sent the message to the group and once done, remove everything related to this group
window.SwarmPolling.removePubkey(groupId); SwarmPolling.getInstance().removePubkey(groupId);
await getMessageQueue().sendToGroup(ourLeavingMessage, async () => { await getMessageQueue().sendToGroup(ourLeavingMessage, async () => {
window.log.info(`Leaving message sent ${groupId}. Removing everything related to this group.`); window.log.info(`Leaving message sent ${groupId}. Removing everything related to this group.`);
await removeAllClosedGroupEncryptionKeyPairs(groupId); await markGroupAsLeftOrKicked(groupId, convo, false);
}); });
} }

@ -41,16 +41,24 @@ export function processMessage(message: string, options: any = {}) {
} }
export class SwarmPolling { export class SwarmPolling {
private static instance: SwarmPolling;
private pubkeys: Array<PubKey>; private pubkeys: Array<PubKey>;
private groupPubkeys: Array<PubKey>; private groupPubkeys: Array<PubKey>;
private readonly lastHashes: { [key: string]: PubkeyToHash }; private readonly lastHashes: { [key: string]: PubkeyToHash };
constructor() { private constructor() {
this.pubkeys = []; this.pubkeys = [];
this.groupPubkeys = []; this.groupPubkeys = [];
this.lastHashes = {}; this.lastHashes = {};
} }
public static getInstance() {
if (!SwarmPolling.instance) {
SwarmPolling.instance = new SwarmPolling();
}
return SwarmPolling.instance;
}
public start(): void { public start(): void {
this.loadGroupIds(); this.loadGroupIds();
void this.pollForAllKeys(); void this.pollForAllKeys();

@ -1,30 +0,0 @@
import { processMessage, SwarmPolling } from './swarmPolling';
import { default as insecureNodeFetch } from 'node-fetch';
import { PubKey } from '../types';
export class SwarmPollingStub extends SwarmPolling {
private readonly baseUrl = 'http://localhost:3000';
protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) {
const pubkeyStr = pubkey.key ? pubkey.key : pubkey;
const get = {
method: 'GET',
};
// insecureNodeFetch but this is a stub
const res = await insecureNodeFetch(`${this.baseUrl}/messages?pubkey=${pubkeyStr}`, get);
try {
const json = await res.json();
const options = isGroup ? { conversationId: pubkeyStr } : {};
json.messages.forEach((m: any) => {
processMessage(m.data, options);
});
} catch (e) {
window.log.error('invalid json: ', e);
}
}
}

2
ts/window.d.ts vendored

@ -4,7 +4,6 @@ import { SignalInterface } from '../../js/modules/signal';
import { Libloki } from '../libloki'; import { Libloki } from '../libloki';
import { LokiPublicChatFactoryInterface } from '../js/modules/loki_public_chat_api'; import { LokiPublicChatFactoryInterface } from '../js/modules/loki_public_chat_api';
import { LokiAppDotNetServerInterface } from '../js/modules/loki_app_dot_net_api'; import { LokiAppDotNetServerInterface } from '../js/modules/loki_app_dot_net_api';
import { SwarmPolling } from './session/snode_api/swarmPolling';
import { LibTextsecure } from '../libtextsecure'; import { LibTextsecure } from '../libtextsecure';
import { ConfirmationDialogParams } from '../background'; import { ConfirmationDialogParams } from '../background';
@ -83,7 +82,6 @@ declare global {
versionInfo: any; versionInfo: any;
getStoragePubKey: (key: string) => string; getStoragePubKey: (key: string) => string;
getConversations: () => ConversationCollection; getConversations: () => ConversationCollection;
SwarmPolling: SwarmPolling;
SnodePool: { SnodePool: {
getSnodesFor: (string) => any; getSnodesFor: (string) => any;
}; };

Loading…
Cancel
Save