You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/ts/session/sending/MessageQueue.ts

181 lines
5.2 KiB
TypeScript

5 years ago
import * as _ from 'lodash';
5 years ago
import { getPairedDevicesFor } from '../../../js/modules/data';
5 years ago
import { ConversationController } from '../../window';
import { EventEmitter } from 'events';
import {
MessageQueueInterface,
MessageQueueInterfaceEvents,
} from './MessageQueueInterface';
5 years ago
import {
ClosedGroupMessage,
ContentMessage,
OpenGroupMessage,
SessionRequestMessage,
} from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
5 years ago
import { JobQueue, SyncMessageUtils, TypedEventEmitter } from '../utils';
5 years ago
import { PubKey } from '../types';
import { MessageSender } from '.';
import { SessionProtocol } from '../protocols';
5 years ago
import * as UserUtils from '../../util/user';
export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
5 years ago
private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly pendingMessageCache: PendingMessageCache;
constructor() {
this.events = new EventEmitter();
5 years ago
this.pendingMessageCache = new PendingMessageCache();
void this.processAllPending();
}
5 years ago
public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) {
5 years ago
const userLinked = await getPairedDevicesFor(user.key);
5 years ago
const userDevices = userLinked.map(d => new PubKey(d));
await this.sendMessageToDevices(userDevices, message);
}
5 years ago
public async send(device: PubKey, message: ContentMessage) {
await this.sendMessageToDevices([device], message);
}
5 years ago
public async sendMessageToDevices(
devices: Array<PubKey>,
message: ContentMessage
) {
let currentDevices = [...devices];
// Sync to our devices if syncable
if (SyncMessageUtils.canSync(message)) {
5 years ago
const currentDevice = await UserUtils.getCurrentDevicePubKey();
if (currentDevice) {
const otherDevices = await getPairedDevicesFor(currentDevice);
5 years ago
5 years ago
const ourDevices = [currentDevice, ...otherDevices].map(
device => new PubKey(device)
);
await this.sendSyncMessage(message, ourDevices);
5 years ago
5 years ago
// Remove our devices from currentDevices
const ourDeviceContacts = ourDevices.map(device =>
ConversationController.get(device.key)
);
currentDevices = _.xor(currentDevices, ourDeviceContacts);
}
5 years ago
}
const promises = currentDevices.map(async device => {
5 years ago
await this.process(device, message);
5 years ago
});
return Promise.all(promises);
}
5 years ago
5 years ago
public async sendToGroup(
message: OpenGroupMessage | ContentMessage
): Promise<boolean> {
5 years ago
if (
!(message instanceof OpenGroupMessage) &&
!(message instanceof ClosedGroupMessage)
) {
return false;
}
// Closed groups
if (message instanceof ClosedGroupMessage) {
// Get devices in closed group
const conversation = ConversationController.get(message.groupId);
const recipientsModels = conversation.contactCollection.models;
const recipients: Array<PubKey> = recipientsModels.map(
(recipient: any) => new PubKey(recipient.id)
);
await this.sendMessageToDevices(recipients, message);
return true;
}
// Open groups
if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly
await MessageSender.sendToOpenGroup(message);
return true;
}
return false;
}
5 years ago
public async sendSyncMessage(message: ContentMessage, sendTo: Array<PubKey>) {
5 years ago
// Sync with our devices
const promises = sendTo.map(async device => {
5 years ago
const syncMessage = await SyncMessageUtils.from(message);
5 years ago
5 years ago
return this.process(device, syncMessage);
5 years ago
});
return Promise.all(promises);
}
5 years ago
public async processPending(device: PubKey) {
const messages = this.pendingMessageCache.getForDevice(device);
const hasSession = SessionProtocol.hasSession(device);
const conversation = ConversationController.get(device.key);
const isMediumGroup = conversation.isMediumGroup();
if (!isMediumGroup && !hasSession) {
await SessionProtocol.sendSessionRequestIfNeeded(device);
return;
}
const jobQueue = this.getJobQueue(device);
5 years ago
messages.forEach(async message => {
const messageId = String(message.timestamp);
if (!jobQueue.has(messageId)) {
try {
await jobQueue.addWithId(messageId, async () =>
MessageSender.send(message)
);
void this.pendingMessageCache.remove(message);
this.events.emit('success', message);
} catch (e) {
this.events.emit('fail', message, e);
}
5 years ago
}
});
}
5 years ago
private async processAllPending() {
const devices = this.pendingMessageCache.getDevices();
const promises = devices.map(async device => this.processPending(device));
return Promise.all(promises);
}
5 years ago
private async process(device: PubKey, message?: ContentMessage) {
if (!message || message instanceof SessionRequestMessage) {
5 years ago
return;
}
await this.pendingMessageCache.add(device, message);
await this.processPending(device);
}
5 years ago
private getJobQueue(device: PubKey): JobQueue {
let queue = this.jobQueues.get(device);
if (!queue) {
queue = new JobQueue();
this.jobQueues.set(device, queue);
}
return queue;
}
}