import * as _ from 'lodash'; import * as Data from '../../../js/modules/data'; import { ConversationController } from '../../window'; import { EventEmitter } from 'events'; import { MessageQueueInterface, MessageQueueInterfaceEvents, } from './MessageQueueInterface'; import { ClosedGroupMessage, ContentMessage, OpenGroupMessage, SessionRequestMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; import { JobQueue, SyncMessageUtils, TypedEventEmitter, } from '../utils'; import { PubKey } from '../types'; import { MessageSender } from '.'; import { SessionProtocol } from '../protocols'; export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; private readonly jobQueues: Map = new Map(); private readonly pendingMessageCache: PendingMessageCache; constructor() { this.events = new EventEmitter(); this.pendingMessageCache = new PendingMessageCache(); void this.processAllPending(); } public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) { const userLinked = await Data.getPairedDevicesFor(user.key); const userDevices = userLinked.map(d => new PubKey(d)); await this.sendMessageToDevices(userDevices, message); } public async send(device: PubKey, message: ContentMessage) { await this.sendMessageToDevices([device], message); } public async sendMessageToDevices( devices: Array, message: ContentMessage ) { let currentDevices = [...devices]; // Sync to our devices if syncable if (SyncMessageUtils.canSync(message)) { const ourDevices = await SyncMessageUtils.getOurPairedDevices(); await this.sendSyncMessage(message, ourDevices); // Remove our devices from currentDevices const ourDeviceContacts = ourDevices.map(device => ConversationController.get(device.key)); currentDevices = _.xor(currentDevices, ourDeviceContacts); } const promises = currentDevices.map(async device => { await this.queue(device, message); }); return Promise.all(promises); } public async sendToGroup(message: OpenGroupMessage | ContentMessage): Promise { if ( !(message instanceof OpenGroupMessage) && !(message instanceof ClosedGroupMessage) ) { return false; } // Closed groups if (message instanceof ClosedGroupMessage) { // Get devices in closed group const conversation = ConversationController.get(message.groupId); const recipientsModels = conversation.contactCollection.models; const recipients: Array = recipientsModels.map( (recipient: any) => new PubKey(recipient.id) ); await this.sendMessageToDevices(recipients, message); return true; } // Open groups if (message instanceof OpenGroupMessage) { // No queue needed for Open Groups; send directly await MessageSender.sendToOpenGroup(message); return true; } return false; } public async sendSyncMessage( message: ContentMessage, sendTo: Array ) { // Sync with our devices const promises = sendTo.map(async device => { const syncMessage = await SyncMessageUtils.from(message, device); return this.queue(device, syncMessage); }); return Promise.all(promises); } public async processPending(device: PubKey) { const messages = this.pendingMessageCache.getForDevice(device); const hasSession = SessionProtocol.hasSession(device); const conversation = ConversationController.get(device.key); const isMediumGroup = conversation.isMediumGroup(); if (!isMediumGroup && !hasSession) { await SessionProtocol.sendSessionRequestIfNeeded(device); return; } const jobQueue = this.getJobQueue(device); messages.forEach(message => { if (!jobQueue.has(message.identifier)) { const promise = jobQueue.add(async () => MessageSender.send(message)); promise .then(() => { // Message sent; remove from cache void this.pendingMessageCache.remove(message); }) // Message failed to send .catch(() => null); } }); } private async processAllPending() { const devices = this.pendingMessageCache.getDevices(); const promises = devices.map(async device => this.processPending(device)); return Promise.all(promises); } private async queue(device: PubKey, message: ContentMessage) { if (message instanceof SessionRequestMessage) { return; } await this.pendingMessageCache.add(device, message); await this.processPending(device); } private getJobQueue(device: PubKey): JobQueue { let queue = this.jobQueues.get(device); if (!queue) { queue = new JobQueue(); this.jobQueues.set(device, queue); } return queue; } }