add sync of closed groups/open groups/ contacts with new pipeline

pull/1194/head
Audric Ackermann 5 years ago
parent aee7428282
commit 655cc0575e
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -76,7 +76,7 @@
result.reset();
return result;
}
async function createContactSyncProtoMessage(sessionContacts) {
async function createContactSyncMessage(sessionContacts) {
if (sessionContacts.length === 0) {
return null;
}
@ -115,16 +115,13 @@
// Serialise array of byteBuffers into 1 byteBuffer
const byteBuffer = serialiseByteBuffers(contactDetails);
const data = new Uint8Array(byteBuffer.toArrayBuffer());
const contacts = new textsecure.protobuf.SyncMessage.Contacts({
return new libsession.Messages.Outgoing.ContactSyncMessage({
timestamp: Date.now(),
data,
});
const syncMessage = new textsecure.protobuf.SyncMessage({
contacts,
});
return syncMessage;
}
function createGroupSyncProtoMessage(sessionGroup) {
function createGroupSyncMessage(sessionGroup) {
// We are getting a single open group here
const rawGroup = {
@ -141,37 +138,12 @@
// Serialise array of byteBuffers into 1 byteBuffer
const byteBuffer = serialiseByteBuffers([groupDetail]);
const data = new Uint8Array(byteBuffer.toArrayBuffer());
const groups = new textsecure.protobuf.SyncMessage.Groups({
return new libsession.Messages.Outgoing.ClosedGroupSyncMessage({
timestamp: Date.now(),
data,
});
const syncMessage = new textsecure.protobuf.SyncMessage({
groups,
});
return syncMessage;
}
function createOpenGroupsSyncProtoMessage(conversations) {
// We only want to sync across open groups that we haven't left
const sessionOpenGroups = conversations.filter(
c => c.isPublic() && !c.isRss() && !c.get('left')
);
if (sessionOpenGroups.length === 0) {
return null;
}
const openGroups = sessionOpenGroups.map(
conversation =>
new textsecure.protobuf.SyncMessage.OpenGroupDetails({
url: conversation.id.split('@').pop(),
channelId: conversation.get('channelId'),
})
);
const syncMessage = new textsecure.protobuf.SyncMessage({
openGroups,
});
return syncMessage;
}
async function sendSessionRequestsToMembers(members = []) {
// For every member, trigger a session request if needed
members.forEach(async memberStr => {
@ -196,9 +168,8 @@
window.libloki.api = {
sendSessionEstablishedMessage,
sendSessionRequestsToMembers,
createContactSyncProtoMessage,
createGroupSyncProtoMessage,
createOpenGroupsSyncProtoMessage,
createContactSyncMessage,
createGroupSyncMessage,
debug,
};
})();

@ -101,12 +101,6 @@ function getStaleDeviceIdsForNumber(number) {
});
}
const DebugMessageType = {
CONTACT_SYNC_SEND: 'contact-sync-send',
CLOSED_GROUP_SYNC_SEND: 'closed-group-sync-send',
OPEN_GROUP_SYNC_SEND: 'open-group-sync-send',
};
function OutgoingMessage(
server,
timestamp,
@ -144,7 +138,6 @@ function OutgoingMessage(
isPublic,
isMediumGroup,
publicSendData,
debugMessageType,
autoSession,
} = options || {};
this.numberInfo = numberInfo;
@ -159,7 +152,6 @@ function OutgoingMessage(
this.senderCertificate = senderCertificate;
this.online = online;
this.messageType = messageType || 'outgoing';
this.debugMessageType = debugMessageType;
this.autoSession = autoSession || false;
}
@ -363,8 +355,6 @@ OutgoingMessage.prototype = {
message: this.message,
};
const messageTypeStr = this.debugMessageType;
const ourPubKey = textsecure.storage.user.getNumber();
const ourPrimaryPubkey = window.storage.get('primaryDevicePubKey');
const secondaryPubKeys =
@ -380,7 +370,7 @@ OutgoingMessage.prototype = {
aliasedPubkey = 'OUR SECONDARY PUBKEY';
}
libloki.api.debug.logSessionMessageSending(
`Sending ${messageTypeStr}:${this.messageType} message to ${aliasedPubkey} details:`,
`Sending :${this.messageType} message to ${aliasedPubkey} details:`,
logDetails
);
@ -638,7 +628,5 @@ OutgoingMessage.prototype = {
},
};
OutgoingMessage.DebugMessageType = DebugMessageType;
window.textsecure = window.textsecure || {};
window.textsecure.OutgoingMessage = OutgoingMessage;

@ -1,4 +1,4 @@
/* global textsecure, WebAPI, libsignal, window, OutgoingMessage, libloki, libsession */
/* global textsecure, WebAPI, libsignal, window, OutgoingMessage, libloki, _, libsession */
/* eslint-disable more/no-then, no-bitwise */
@ -450,173 +450,94 @@ MessageSender.prototype = {
},
async sendContactSyncMessage() {
return Promise.resolve();
const convosToSync = await libsession.Utils.SyncMessageUtils.getSyncContacts();
// // If we havn't got a primaryDeviceKey then we are in the middle of pairing
// // primaryDevicePubKey is set to our own number if we are the master device
// const primaryDeviceKey = window.storage.get('primaryDevicePubKey');
// if (!primaryDeviceKey) {
// return Promise.resolve();
// }
// // first get all friends with primary devices
// const sessionContactsPrimary =
// conversations.filter(
// c =>
// c.isPrivate() &&
// !c.isOurLocalDevice() &&
// !c.isBlocked() &&
// !c.get('secondaryStatus')
// ) || [];
// // then get all friends with secondary devices
// let sessionContactsSecondary = conversations.filter(
// c =>
// c.isPrivate() &&
// !c.isOurLocalDevice() &&
// !c.isBlocked() &&
// c.get('secondaryStatus')
// );
// // then morph all secondary conversation to their primary
// sessionContactsSecondary =
// (await Promise.all(
// // eslint-disable-next-line arrow-body-style
// sessionContactsSecondary.map(async c => {
// return window.ConversationController.getOrCreateAndWait(
// c.getPrimaryDevicePubKey(),
// 'private'
// );
// })
// )) || [];
// // filter out our primary pubkey if it was added.
// sessionContactsSecondary = sessionContactsSecondary.filter(
// c => c.id !== primaryDeviceKey
// );
// const contactsSet = new Set([
// ...sessionContactsPrimary,
// ...sessionContactsSecondary,
// ]);
// if (contactsSet.size === 0) {
// window.console.info('No contacts to sync.');
// return Promise.resolve();
// }
// libloki.api.debug.logContactSync('Triggering contact sync message with:', [
// ...contactsSet,
// ]);
// // We need to sync across 3 contacts at a time
// // This is to avoid hitting storage server limit
// const chunked = _.chunk([...contactsSet], 3);
// const syncMessages = await Promise.all(
// chunked.map(c => libloki.api.createContactSyncProtoMessage(c))
// );
// const syncPromises = syncMessages
// .filter(message => message != null)
// .map(syncMessage => {
// const contentMessage = new textsecure.protobuf.Content();
// contentMessage.syncMessage = syncMessage;
// const silent = true;
// const debugMessageType =
// window.textsecure.OutgoingMessage.DebugMessageType.CONTACT_SYNC_SEND;
// return this.sendIndividualProto(
// primaryDeviceKey,
// contentMessage,
// Date.now(),
// silent,
// { debugMessageType } // options
// );
// });
// return Promise.all(syncPromises);
},
if (convosToSync.size === 0) {
window.console.info('No contacts to sync.');
sendGroupSyncMessage() {
return Promise.resolve();
// // If we havn't got a primaryDeviceKey then we are in the middle of pairing
// // primaryDevicePubKey is set to our own number if we are the master device
// const primaryDeviceKey = window.storage.get('primaryDevicePubKey');
// if (!primaryDeviceKey) {
// window.console.debug('sendGroupSyncMessage: no primary device pubkey');
// return Promise.resolve();
// }
// // We only want to sync across closed groups that we haven't left
// const sessionGroups = conversations.filter(
// c =>
// c.isClosedGroup() &&
// !c.get('left') &&
// !c.isBlocked() &&
// !c.isMediumGroup()
// );
// if (sessionGroups.length === 0) {
// window.console.info('No closed group to sync.');
// return Promise.resolve();
// }
// // We need to sync across 1 group at a time
// // This is because we could hit the storage server limit with one group
// const syncPromises = sessionGroups
// .map(c => libloki.api.createGroupSyncProtoMessage(c))
// .filter(message => message != null)
// .map(syncMessage => {
// const contentMessage = new textsecure.protobuf.Content();
// contentMessage.syncMessage = syncMessage;
// const silent = true;
// const debugMessageType =
// window.textsecure.OutgoingMessage.DebugMessageType
// .CLOSED_GROUP_SYNC_SEND;
// return this.sendIndividualProto(
// primaryDeviceKey,
// contentMessage,
// Date.now(),
// silent,
// { debugMessageType } // options
// );
// });
// return Promise.all(syncPromises);
return Promise.resolve();
}
libloki.api.debug.logContactSync(
'Triggering contact sync message with:',
convosToSync
);
// We need to sync across 3 contacts at a time
// This is to avoid hitting storage server limit
const chunked = _.chunk(convosToSync, 3);
const syncMessages = await Promise.all(
chunked.map(c => libloki.api.createContactSyncMessage(c))
);
const syncPromises = syncMessages.map(syncMessage =>
libsession.getMessageQueue().sendSyncMessage(syncMessage)
);
return Promise.all(syncPromises);
},
sendOpenGroupsSyncMessage(conversations) {
sendGroupSyncMessage(conversations) {
// If we havn't got a primaryDeviceKey then we are in the middle of pairing
// primaryDevicePubKey is set to our own number if we are the master device
const primaryDeviceKey = window.storage.get('primaryDevicePubKey');
if (!primaryDeviceKey) {
window.console.debug('sendGroupSyncMessage: no primary device pubkey');
return Promise.resolve();
}
// We only want to sync across closed groups that we haven't left
const sessionGroups = conversations.filter(
c =>
c.isClosedGroup() &&
!c.get('left') &&
!c.isBlocked() &&
!c.isMediumGroup()
);
if (sessionGroups.length === 0) {
window.console.info('No closed group to sync.');
return Promise.resolve();
}
// Send the whole list of open groups in a single message
// We need to sync across 1 group at a time
// This is because we could hit the storage server limit with one group
const syncPromises = sessionGroups
.map(c => libloki.api.createGroupSyncMessage(c))
.map(syncMessage =>
libsession.getMessageQueue().sendSyncMessage(syncMessage)
);
return Promise.all(syncPromises);
},
async sendOpenGroupsSyncMessage(conversations) {
// If we havn't got a primaryDeviceKey then we are in the middle of pairing
// primaryDevicePubKey is set to our own number if we are the master device
const primaryDeviceKey = window.storage.get('primaryDevicePubKey');
if (!primaryDeviceKey) {
return Promise.resolve();
}
const openGroupsSyncMessage = libloki.api.createOpenGroupsSyncProtoMessage(
const openGroupsConvos = await libsession.Utils.SyncMessageUtils.filterOpenGroupsConvos(
conversations
);
if (!openGroupsSyncMessage) {
if (!openGroupsConvos.length) {
window.log.info('No open groups to sync');
return Promise.resolve();
}
const contentMessage = new textsecure.protobuf.Content();
contentMessage.syncMessage = openGroupsSyncMessage;
const silent = true;
const debugMessageType =
window.textsecure.OutgoingMessage.DebugMessageType.OPEN_GROUP_SYNC_SEND;
return this.sendIndividualProto(
primaryDeviceKey,
contentMessage,
Date.now(),
silent,
{ debugMessageType } // options
// Send the whole list of open groups in a single message
const openGroupsDetails = openGroupsConvos.map(conversation => ({
url: conversation.id,
channelId: conversation.get('channelId'),
}));
const openGroupsSyncParams = {
timestamp: Date.now(),
openGroupsDetails,
};
const openGroupsSyncMessage = new libsession.Messages.Outgoing.OpenGroupSyncMessage(
openGroupsSyncParams
);
return libsession.getMessageQueue().sendSyncMessage(openGroupsSyncMessage);
},
syncReadMessages(reads, options) {
const myNumber = textsecure.storage.user.getNumber();
@ -946,7 +867,6 @@ textsecure.MessageSender = function MessageSenderWrapper(username, password) {
this.sendMessageToGroup = sender.sendMessageToGroup.bind(sender);
this.updateMediumGroup = sender.updateMediumGroup.bind(sender);
this.requestSenderKeys = sender.requestSenderKeys.bind(sender);
this.sendSyncMessage = sender.sendSyncMessage.bind(sender);
this.uploadAvatar = sender.uploadAvatar.bind(sender);
this.syncReadMessages = sender.syncReadMessages.bind(sender);
this.syncVerification = sender.syncVerification.bind(sender);

@ -1,6 +1,8 @@
import * as Messages from './messages';
import * as Protocols from './protocols';
import * as Types from './types';
import * as Utils from './utils';
import { getMessageQueue } from './instance';
export { Messages, Protocols, Types, getMessageQueue };
export { Messages, Utils, Protocols, Types, getMessageQueue };

@ -0,0 +1,25 @@
import { SyncMessage } from './SyncMessage';
import { SignalService } from '../../../../../protobuf';
import { MessageParams } from '../../Message';
import { PubKey } from '../../../../types';
interface ClosedGroupSyncMessageParams extends MessageParams {
data: Uint8Array;
}
export abstract class ClosedGroupSyncMessage extends SyncMessage {
public readonly data: Uint8Array;
constructor(params: ClosedGroupSyncMessageParams) {
super({ timestamp: params.timestamp, identifier: params.identifier });
this.data = params.data;
}
protected syncProto(): SignalService.SyncMessage {
const syncMessage = super.syncProto();
syncMessage.groups = new SignalService.SyncMessage.Groups({
data: this.data,
});
return syncMessage;
}
}

@ -0,0 +1,24 @@
import { SyncMessage } from './SyncMessage';
import { SignalService } from '../../../../../protobuf';
import { MessageParams } from '../../Message';
interface ContactSyncMessageParams extends MessageParams {
data: Uint8Array;
}
export abstract class ContactSyncMessage extends SyncMessage {
public readonly data: Uint8Array;
constructor(params: ContactSyncMessageParams) {
super({ timestamp: params.timestamp, identifier: params.identifier });
this.data = params.data;
}
protected syncProto(): SignalService.SyncMessage {
const syncMessage = super.syncProto();
syncMessage.contacts = new SignalService.SyncMessage.Contacts({
data: this.data,
});
return syncMessage;
}
}

@ -0,0 +1,33 @@
import { SyncMessage } from './SyncMessage';
import { SignalService } from '../../../../../protobuf';
import { MessageParams } from '../../Message';
interface OpenGroupDetails {
url: string;
channelId: number;
}
interface OpenGroupSyncMessageParams extends MessageParams {
openGroupsDetails: [OpenGroupDetails];
}
export abstract class OpenGroupSyncMessage extends SyncMessage {
public readonly openGroupsDetails: [OpenGroupDetails];
constructor(params: OpenGroupSyncMessageParams) {
super({ timestamp: params.timestamp, identifier: params.identifier });
this.openGroupsDetails = params.openGroupsDetails;
}
protected syncProto(): SignalService.SyncMessage {
const syncMessage = super.syncProto();
syncMessage.openGroups = this.openGroupsDetails.map(openGroup => {
return new SignalService.SyncMessage.OpenGroupDetails({
url: openGroup.url.split('@').pop(),
channelId: openGroup.channelId,
});
});
return syncMessage;
}
}

@ -4,9 +4,6 @@ import { MessageParams } from '../../Message';
import { PubKey } from '../../../../types';
interface SentSyncMessageParams extends MessageParams {
// const dataMessage = textsecure.protobuf.DataMessage.decode(
// encodedDataMessage
// );
dataMessage: SignalService.DataMessage;
expirationStartTimestamp?: number;
sentTo?: [PubKey];

@ -1 +1,6 @@
export * from './RequestSyncMessage';
export * from './ContactSyncMessage';
export * from './ClosedGroupSyncMessage';
export * from './OpenGroupSyncMessage';
export * from './SyncMessage';
export * from './SentSyncMessage';

@ -68,3 +68,20 @@ export async function getSyncContacts(): Promise<Array<any> | undefined> {
device => !!device
);
}
export async function filterOpenGroupsConvos(
conversations: Array<any>
): Promise<Array<any> | undefined> {
// If we haven't got a primaryDeviceKey then we are in the middle of pairing
// primaryDevicePubKey is set to our own number if we are the master device
const thisDevice = await UserUtil.getCurrentDevicePubKey();
if (!thisDevice) {
return [];
}
// We only want to sync across open groups that we haven't left
return conversations.filter(
c => c.isPublic() && !c.isRss() && !c.get('left')
);
}

Loading…
Cancel
Save