diff --git a/js/background.js b/js/background.js index 1248aeadf..a36ef16c8 100644 --- a/js/background.js +++ b/js/background.js @@ -773,24 +773,6 @@ } }); - Whisper.events.on( - 'publicMessageSent', - ({ identifier, pubKey, timestamp, serverId, serverTimestamp }) => { - try { - const conversation = window.getConversationController().get(pubKey); - conversation.onPublicMessageSent({ - identifier, - pubKey, - timestamp, - serverId, - serverTimestamp, - }); - } catch (e) { - window.log.error('Error setting public on message'); - } - } - ); - Whisper.events.on('password-updated', () => { if (appView && appView.inboxView) { appView.inboxView.trigger('password-updated'); diff --git a/ts/components/session/ActionsPanel.tsx b/ts/components/session/ActionsPanel.tsx index 4a035c987..e5b23b4ff 100644 --- a/ts/components/session/ActionsPanel.tsx +++ b/ts/components/session/ActionsPanel.tsx @@ -13,7 +13,10 @@ import { getFocusedSection } from '../../state/selectors/section'; import { getTheme } from '../../state/selectors/theme'; import { getOurNumber } from '../../state/selectors/user'; import { UserUtils } from '../../session/utils'; -import { syncConfigurationIfNeeded } from '../../session/utils/syncUtils'; +import { + forceSyncConfigurationNowIfNeeded, + syncConfigurationIfNeeded, +} from '../../session/utils/syncUtils'; import { DAYS } from '../../session/utils/Number'; import { removeItemById } from '../../data/data'; // tslint:disable-next-line: no-import-side-effect no-submodule-imports diff --git a/ts/components/session/SessionInboxView.tsx b/ts/components/session/SessionInboxView.tsx index 9948e553f..923821458 100644 --- a/ts/components/session/SessionInboxView.tsx +++ b/ts/components/session/SessionInboxView.tsx @@ -46,11 +46,6 @@ export class SessionInboxView extends React.Component { isExpired: false, }; - this.fetchHandleMessageSentData = this.fetchHandleMessageSentData.bind( - this - ); - this.handleMessageSentFailure = this.handleMessageSentFailure.bind(this); - this.handleMessageSentSuccess = this.handleMessageSentSuccess.bind(this); this.showSessionSettingsCategory = this.showSessionSettingsCategory.bind( this ); @@ -117,51 +112,6 @@ export class SessionInboxView extends React.Component { ); } - private async fetchHandleMessageSentData(m: RawMessage | OpenGroupMessage) { - // if a message was sent and this message was created after the last app restart, - // this message is still in memory in the MessageController - const msg = MessageController.getInstance().get(m.identifier); - - if (!msg || !msg.message) { - // otherwise, look for it in the database - // nobody is listening to this freshly fetched message .trigger calls - const dbMessage = await getMessageById(m.identifier); - - if (!dbMessage) { - return null; - } - return { msg: dbMessage }; - } - - return { msg: msg.message }; - } - - private async handleMessageSentSuccess( - sentMessage: RawMessage | OpenGroupMessage, - wrappedEnvelope: any - ) { - const fetchedData = await this.fetchHandleMessageSentData(sentMessage); - if (!fetchedData) { - return; - } - const { msg } = fetchedData; - - void msg.handleMessageSentSuccess(sentMessage, wrappedEnvelope); - } - - private async handleMessageSentFailure( - sentMessage: RawMessage | OpenGroupMessage, - error: any - ) { - const fetchedData = await this.fetchHandleMessageSentData(sentMessage); - if (!fetchedData) { - return; - } - const { msg } = fetchedData; - - await msg.handleMessageSentFailure(sentMessage, error); - } - private async setupLeftPane() { // Here we set up a full redux store with initial state for our LeftPane Root const convoCollection = ConversationController.getInstance().getConversations(); @@ -206,22 +156,6 @@ export class SessionInboxView extends React.Component { this.store.dispatch ); - this.fetchHandleMessageSentData = this.fetchHandleMessageSentData.bind( - this - ); - this.handleMessageSentFailure = this.handleMessageSentFailure.bind(this); - this.handleMessageSentSuccess = this.handleMessageSentSuccess.bind(this); - - getMessageQueue().events.addListener( - 'sendSuccess', - this.handleMessageSentSuccess - ); - - getMessageQueue().events.addListener( - 'sendFail', - this.handleMessageSentFailure - ); - window.Whisper.events.on('messageExpired', messageExpired); window.Whisper.events.on('messageChanged', messageChanged); window.Whisper.events.on('messageAdded', messageAdded); diff --git a/ts/models/conversation.ts b/ts/models/conversation.ts index e1cf5ad70..36c06bcb0 100644 --- a/ts/models/conversation.ts +++ b/ts/models/conversation.ts @@ -228,6 +228,15 @@ export class ConversationModel extends Backbone.Model { public isMediumGroup() { return this.get('is_medium_group'); } + /** + * Returns true if this conversation is active + * i.e. the conversation is visibie on the left pane. (Either we or another user created this convo). + * This is useful because we do not want bumpTyping on the first message typing to a new convo to + * send a message. + */ + public isActive() { + return Boolean(this.get('active_at')); + } public async block() { if (!this.id || this.isPublic()) { return; @@ -251,13 +260,15 @@ export class ConversationModel extends Backbone.Model { await this.commit(); } public async bumpTyping() { - if (this.isPublic() || this.isMediumGroup()) { - return; - } // We don't send typing messages if the setting is disabled // or we blocked that user - - if (!window.storage.get('typing-indicators-setting') || this.isBlocked()) { + if ( + this.isPublic() || + this.isMediumGroup() || + !this.isActive() || + !window.storage.get('typing-indicators-setting') || + this.isBlocked() + ) { return; } @@ -408,27 +419,6 @@ export class ConversationModel extends Backbone.Model { await Promise.all(messages.map((m: any) => m.setCalculatingPoW())); } - public async onPublicMessageSent({ - identifier, - serverId, - serverTimestamp, - }: { - identifier: string; - serverId: number; - serverTimestamp: number; - }) { - const registeredMessage = MessageController.getInstance().get(identifier); - - if (!registeredMessage || !registeredMessage.message) { - return null; - } - const model = registeredMessage.message; - await model.setIsPublic(true); - await model.setServerId(serverId); - await model.setServerTimestamp(serverTimestamp); - return undefined; - } - public format() { return this.cachedProps; } @@ -679,7 +669,7 @@ export class ConversationModel extends Backbone.Model { }; const openGroupMessage = new OpenGroupMessage(openGroupParams); // we need the return await so that errors are caught in the catch {} - await getMessageQueue().sendToGroup(openGroupMessage); + await getMessageQueue().sendToOpenGroup(openGroupMessage); return; } const chatMessageParams: ChatMessageParams = { @@ -800,19 +790,17 @@ export class ConversationModel extends Backbone.Model { messageWithSchema.source = UserUtils.getOurPubKeyStrFromCache(); messageWithSchema.sourceDevice = 1; + // set the serverTimestamp only if this conversation is a public one. const attributes: MessageAttributesOptionals = { ...messageWithSchema, groupInvitation, conversationId: this.id, destination: isPrivate ? destination : undefined, + serverTimestamp: this.isPublic() ? new Date().getTime() : undefined, }; const model = await this.addSingleMessage(attributes); - if (this.isPublic()) { - await model.setServerTimestamp(new Date().getTime()); - } - this.set({ lastMessage: model.getNotificationText(), lastMessageStatus: 'sending', diff --git a/ts/models/message.ts b/ts/models/message.ts index 2e0252be1..bca525922 100644 --- a/ts/models/message.ts +++ b/ts/models/message.ts @@ -872,7 +872,7 @@ export class MessageModel extends Backbone.Model { ...uploaded, }; const openGroupMessage = new OpenGroupMessage(openGroupParams); - return getMessageQueue().sendToGroup(openGroupMessage); + return getMessageQueue().sendToOpenGroup(openGroupMessage); } const { body, attachments, preview, quote } = await this.uploadData(); @@ -1148,42 +1148,6 @@ export class MessageModel extends Backbone.Model { await this.commit(); } - public async setServerId(serverId: number) { - if (_.isEqual(this.get('serverId'), serverId)) { - return; - } - - this.set({ - serverId, - }); - - await this.commit(); - } - - public async setServerTimestamp(serverTimestamp?: number) { - if (_.isEqual(this.get('serverTimestamp'), serverTimestamp)) { - return; - } - - this.set({ - serverTimestamp, - }); - - await this.commit(); - } - - public async setIsPublic(isPublic: boolean) { - if (_.isEqual(this.get('isPublic'), isPublic)) { - return; - } - - this.set({ - isPublic: !!isPublic, - }); - - await this.commit(); - } - public async sendSyncMessageOnly(dataMessage: any) { const now = Date.now(); this.set({ diff --git a/ts/session/group/index.ts b/ts/session/group/index.ts index 4e46c5efa..152b2059a 100644 --- a/ts/session/group/index.ts +++ b/ts/session/group/index.ts @@ -6,7 +6,6 @@ import { fromHex, fromHexToArray, toHex } from '../utils/String'; import { BlockedNumberController } from '../../util/blockedNumberController'; import { ConversationController } from '../conversations'; import { updateOpenGroup } from '../../receiver/openGroups'; -import { getMessageQueue } from '../instance'; import { addClosedGroupEncryptionKeyPair, getIdentityKeyById, @@ -33,6 +32,7 @@ import { MessageModel } from '../../models/message'; import { MessageModelType } from '../../models/messageType'; import { MessageController } from '../messages'; import { distributingClosedGroupEncryptionKeyPairs } from '../../receiver/closedGroups'; +import { getMessageQueue } from '..'; export interface GroupInfo { id: string; diff --git a/ts/session/index.ts b/ts/session/index.ts index 639c2eb17..37486ad13 100644 --- a/ts/session/index.ts +++ b/ts/session/index.ts @@ -6,7 +6,7 @@ import * as Sending from './sending'; import * as Constants from './constants'; import * as ClosedGroup from './group'; -export * from './instance'; +const getMessageQueue = Sending.getMessageQueue; export { Conversations, @@ -16,4 +16,5 @@ export { Sending, Constants, ClosedGroup, + getMessageQueue, }; diff --git a/ts/session/instance.ts b/ts/session/instance.ts deleted file mode 100644 index cf7dc55a3..000000000 --- a/ts/session/instance.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { MessageQueue } from './sending/'; - -let messageQueue: MessageQueue; - -function getMessageQueue(): MessageQueue { - if (!messageQueue) { - messageQueue = new MessageQueue(); - } - return messageQueue; -} - -export { getMessageQueue }; diff --git a/ts/session/messages/MessageController.ts b/ts/session/messages/MessageController.ts index 26b40baf5..66a84e494 100644 --- a/ts/session/messages/MessageController.ts +++ b/ts/session/messages/MessageController.ts @@ -30,6 +30,11 @@ export class MessageController { } public register(id: string, message: MessageModel) { + if (!(message instanceof MessageModel)) { + throw new Error( + 'Only MessageModels can be registered to the MessageController.' + ); + } const existing = this.messageLookup.get(id); if (existing) { this.messageLookup.set(id, { diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index d4475426e..f742678d8 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -1,15 +1,12 @@ -import { EventEmitter } from 'events'; import { - ChatMessage, ClosedGroupChatMessage, ClosedGroupNewMessage, ContentMessage, - DataMessage, ExpirationTimerUpdateMessage, OpenGroupMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { JobQueue, TypedEventEmitter, UserUtils } from '../utils'; +import { JobQueue, UserUtils } from '../utils'; import { PubKey, RawMessage } from '../types'; import { MessageSender } from '.'; import { ClosedGroupMessage } from '../messages/outgoing/content/data/group/ClosedGroupMessage'; @@ -23,9 +20,9 @@ import { ClosedGroupUpdateMessage, } from '../messages/outgoing/content/data/group'; import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/content/data/group/ClosedGroupMemberLeftMessage'; +import { MessageSentHandler } from './MessageSentHandler'; -export type GroupMessageType = - | OpenGroupMessage +type ClosedGroupMessageType = | ClosedGroupChatMessage | ClosedGroupAddedMembersMessage | ClosedGroupRemovedMembersMessage @@ -37,21 +34,12 @@ export type GroupMessageType = | ClosedGroupEncryptionPairRequestMessage; // ClosedGroupEncryptionPairReplyMessage must be sent to a user pubkey. Not a group. -export interface MessageQueueInterfaceEvents { - sendSuccess: ( - message: RawMessage | OpenGroupMessage, - wrappedEnvelope?: Uint8Array - ) => void; - sendFail: (message: RawMessage | OpenGroupMessage, error: Error) => void; -} export class MessageQueue { - public readonly events: TypedEventEmitter; private readonly jobQueues: Map = new Map(); private readonly pendingMessageCache: PendingMessageCache; constructor(cache?: PendingMessageCache) { - this.events = new EventEmitter(); this.pendingMessageCache = cache ?? new PendingMessageCache(); void this.processAllPending(); } @@ -70,18 +58,37 @@ export class MessageQueue { await this.process(user, message, sentCb); } - public async send( - device: PubKey, - message: ContentMessage, - sentCb?: (message: RawMessage) => Promise - ): Promise { - if ( - message instanceof ConfigurationMessage || - !!(message as any).syncTarget - ) { - throw new Error('SyncMessage needs to be sent with sendSyncMessage'); + /** + * This function is synced. It will wait for the message to be delivered to the open + * group to return. + * So there is no need for a sendCb callback + * + */ + public async sendToOpenGroup(message: OpenGroupMessage) { + // Open groups + if (!(message instanceof OpenGroupMessage)) { + throw new Error('sendToOpenGroup can only be used with OpenGroupMessage'); + } + // No queue needed for Open Groups; send directly + const error = new Error('Failed to send message to open group.'); + + // This is absolutely yucky ... we need to make it not use Promise + try { + const result = await MessageSender.sendToOpenGroup(message); + // sendToOpenGroup returns -1 if failed or an id if succeeded + if (result.serverId < 0) { + void MessageSentHandler.handleMessageSentFailure(message, error); + } else { + void MessageSentHandler.handleMessageSentSuccess(message); + void MessageSentHandler.handlePublicMessageSentSuccess(message, result); + } + } catch (e) { + window?.log?.warn( + `Failed to send message to open group: ${message.group.server}`, + e + ); + void MessageSentHandler.handleMessageSentFailure(message, error); } - await this.process(device, message, sentCb); } /** @@ -89,43 +96,9 @@ export class MessageQueue { * @param sentCb currently only called for medium groups sent message */ public async sendToGroup( - message: GroupMessageType, + message: ClosedGroupMessageType, sentCb?: (message: RawMessage) => Promise ): Promise { - // Open groups - if (message instanceof OpenGroupMessage) { - // No queue needed for Open Groups; send directly - const error = new Error('Failed to send message to open group.'); - - // This is absolutely yucky ... we need to make it not use Promise - try { - const result = await MessageSender.sendToOpenGroup(message); - // sendToOpenGroup returns -1 if failed or an id if succeeded - if (result.serverId < 0) { - this.events.emit('sendFail', message, error); - } else { - const messageEventData = { - identifier: message.identifier, - pubKey: message.group.groupId, - timestamp: message.timestamp, - serverId: result.serverId, - serverTimestamp: result.serverTimestamp, - }; - this.events.emit('sendSuccess', message); - - window.Whisper.events.trigger('publicMessageSent', messageEventData); - } - } catch (e) { - window?.log?.warn( - `Failed to send message to open group: ${message.group.server}`, - e - ); - this.events.emit('sendFail', message, error); - } - - return; - } - let groupId: PubKey | undefined; if ( message instanceof ExpirationTimerUpdateMessage || @@ -138,7 +111,7 @@ export class MessageQueue { throw new Error('Invalid group message passed in sendToGroup.'); } // if groupId is set here, it means it's for a medium group. So send it as it - return this.send(PubKey.cast(groupId), message, sentCb); + return this.sendToPubKey(PubKey.cast(groupId), message, sentCb); } public async sendSyncMessage( @@ -157,10 +130,6 @@ export class MessageQueue { const ourPubKey = UserUtils.getOurPubKeyStrFromCache(); - if (!ourPubKey) { - throw new Error('ourNumber is not set'); - } - await this.process(PubKey.cast(ourPubKey), message, sentCb); } @@ -176,7 +145,11 @@ export class MessageQueue { const job = async () => { try { const wrappedEnvelope = await MessageSender.send(message); - this.events.emit('sendSuccess', message, wrappedEnvelope); + void MessageSentHandler.handleMessageSentSuccess( + message, + wrappedEnvelope + ); + const cb = this.pendingMessageCache.callbacks.get( message.identifier ); @@ -185,8 +158,8 @@ export class MessageQueue { await cb(message); } this.pendingMessageCache.callbacks.delete(message.identifier); - } catch (e) { - this.events.emit('sendFail', message, e); + } catch (error) { + void MessageSentHandler.handleMessageSentFailure(message, error); } finally { // Remove from the cache because retrying is done in the sender void this.pendingMessageCache.remove(message); @@ -243,3 +216,12 @@ export class MessageQueue { return queue; } } + +let messageQueue: MessageQueue; + +export function getMessageQueue(): MessageQueue { + if (!messageQueue) { + messageQueue = new MessageQueue(); + } + return messageQueue; +} diff --git a/ts/session/sending/MessageSentHandler.ts b/ts/session/sending/MessageSentHandler.ts new file mode 100644 index 000000000..dd30f5777 --- /dev/null +++ b/ts/session/sending/MessageSentHandler.ts @@ -0,0 +1,93 @@ +import { getMessageById } from '../../data/data'; +import { MessageController } from '../messages'; +import { OpenGroupMessage } from '../messages/outgoing'; +import { RawMessage } from '../types'; + +export class MessageSentHandler { + /** + * This function tries to find a message by messageId by first looking on the MessageController. + * The MessageController holds all messages being in memory. + * Those are the messages sent recently, recieved recently, or the one shown to the user. + * + * If the app restarted, it's very likely those messages won't be on the memory anymore. + * In this case, this function will look for it in the database and return it. + * If the message is found on the db, it will also register it to the MessageController so our subsequent calls are quicker. + */ + private static async fetchHandleMessageSentData( + m: RawMessage | OpenGroupMessage + ) { + // if a message was sent and this message was created after the last app restart, + // this message is still in memory in the MessageController + const msg = MessageController.getInstance().get(m.identifier); + + if (!msg || !msg.message) { + // otherwise, look for it in the database + // nobody is listening to this freshly fetched message .trigger calls + const dbMessage = await getMessageById(m.identifier); + + if (!dbMessage) { + return null; + } + MessageController.getInstance().register(m.identifier, dbMessage); + return dbMessage; + } + + return msg.message; + } + + public static async handlePublicMessageSentSuccess( + sentMessage: OpenGroupMessage, + result: { serverId: number; serverTimestamp: number } + ) { + const { serverId, serverTimestamp } = result; + try { + const foundMessage = await MessageSentHandler.fetchHandleMessageSentData( + sentMessage + ); + + if (!foundMessage) { + throw new Error( + 'handlePublicMessageSentSuccess(): The message should be in memory for an openGroup message' + ); + } + + foundMessage.set({ + serverTimestamp, + serverId, + isPublic: true, + }); + await foundMessage.commit(); + } catch (e) { + window.log.error('Error setting public on message'); + } + } + + public static async handleMessageSentSuccess( + sentMessage: RawMessage | OpenGroupMessage, + wrappedEnvelope?: Uint8Array + ) { + // The wrappedEnvelope will be set only if the message is not one of OpenGroupMessage type. + const fetchedMessage = await MessageSentHandler.fetchHandleMessageSentData( + sentMessage + ); + if (!fetchedMessage) { + return; + } + + void fetchedMessage.handleMessageSentSuccess(sentMessage, wrappedEnvelope); + } + + public static async handleMessageSentFailure( + sentMessage: RawMessage | OpenGroupMessage, + error: any + ) { + const fetchedMessage = await MessageSentHandler.fetchHandleMessageSentData( + sentMessage + ); + if (!fetchedMessage) { + return; + } + + await fetchedMessage.handleMessageSentFailure(sentMessage, error); + } +} diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 28b9b8708..fd6742478 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -4,7 +4,6 @@ import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; import { PubKey } from '../types'; import { MessageUtils } from '../utils'; -import { GroupMessageType } from '.'; // This is an abstraction for storing pending messages. // Ideally we want to store pending messages in the database so that diff --git a/ts/session/utils/syncUtils.ts b/ts/session/utils/syncUtils.ts index db1720390..d8cba68b6 100644 --- a/ts/session/utils/syncUtils.ts +++ b/ts/session/utils/syncUtils.ts @@ -45,36 +45,33 @@ export const forceSyncConfigurationNowIfNeeded = async ( ) => { const allConvos = ConversationController.getInstance().getConversations(); const configMessage = await getCurrentConfigurationMessage(allConvos); - window.log.info('forceSyncConfigurationNowIfNeeded with', configMessage); - const waitForMessageSentEvent = new Promise(resolve => { - const ourResolver = (message: any) => { + async function waitForMessageSentEvent(message: RawMessage) { + return new Promise(resolve => { if (message.identifier === configMessage.identifier) { - getMessageQueue().events.off('sendSuccess', ourResolver); - getMessageQueue().events.off('sendFail', ourResolver); resolve(true); } - }; - getMessageQueue().events.on('sendSuccess', ourResolver); - getMessageQueue().events.on('sendFail', ourResolver); - }); + }); + } try { - // this just adds the message to the sending queue. - // if waitForMessageSent is set, we need to effectively wait until then - await Promise.all([ - getMessageQueue().sendSyncMessage(configMessage), - waitForMessageSentEvent, - ]); + // passing the callback like that + if (waitForMessageSent) { + await getMessageQueue().sendSyncMessage( + configMessage, + waitForMessageSentEvent as any + ); + return Promise.resolve(); + } else { + await getMessageQueue().sendSyncMessage(configMessage); + return waitForMessageSentEvent; + } } catch (e) { window.log.warn( 'Caught an error while sending our ConfigurationMessage:', e ); } - if (!waitForMessageSent) { - return; - } - return waitForMessageSentEvent; + return Promise.resolve(); }; diff --git a/ts/test/session/unit/sending/MessageQueue_test.ts b/ts/test/session/unit/sending/MessageQueue_test.ts index 778125d4f..9ad6102ff 100644 --- a/ts/test/session/unit/sending/MessageQueue_test.ts +++ b/ts/test/session/unit/sending/MessageQueue_test.ts @@ -18,6 +18,7 @@ import { PendingMessageCacheStub } from '../../../test-utils/stubs'; import { ClosedGroupMessage } from '../../../../session/messages/outgoing/content/data/group/ClosedGroupMessage'; import chaiAsPromised from 'chai-as-promised'; +import { MessageSentHandler } from '../../../../session/sending/MessageSentHandler'; chai.use(chaiAsPromised as any); chai.should(); @@ -32,6 +33,9 @@ describe('MessageQueue', () => { // Initialize new stubbed queue let pendingMessageCache: PendingMessageCacheStub; + let messageSentHandlerFailedStub: sinon.SinonStub; + let messageSentHandlerSuccessStub: sinon.SinonStub; + let messageSentPublicHandlerSuccessStub: sinon.SinonStub; let messageQueueStub: MessageQueue; // Message Sender Stubs @@ -47,6 +51,15 @@ describe('MessageQueue', () => { // Message Sender Stubs sendStub = sandbox.stub(MessageSender, 'send').resolves(); + messageSentHandlerFailedStub = sandbox + .stub(MessageSentHandler as any, 'handleMessageSentFailure') + .resolves(); + messageSentHandlerSuccessStub = sandbox + .stub(MessageSentHandler as any, 'handleMessageSentSuccess') + .resolves(); + messageSentPublicHandlerSuccessStub = sandbox + .stub(MessageSentHandler as any, 'handlePublicMessageSentSuccess') + .resolves(); // Init Queue pendingMessageCache = new PendingMessageCacheStub(); @@ -59,16 +72,22 @@ describe('MessageQueue', () => { }); describe('processPending', () => { - it('will send messages', async () => { + it('will send messages', done => { const device = TestUtils.generateFakePubKey(); - await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const successPromise = PromiseUtils.waitForTask(done => { - messageQueueStub.events.once('sendSuccess', done); + const waitForMessageSentEvent = new Promise(resolve => { + resolve(true); + done(); }); - await messageQueueStub.processPending(device); - // tslint:disable-next-line: no-unused-expression - expect(successPromise).to.eventually.be.fulfilled; + + pendingMessageCache.add( + device, + TestUtils.generateChatMessage(), + waitForMessageSentEvent as any + ); + + messageQueueStub.processPending(device); + expect(waitForMessageSentEvent).to.be.fulfilled; }); it('should remove message from cache', async () => { @@ -96,46 +115,48 @@ describe('MessageQueue', () => { }).timeout(15000); describe('events', () => { - it('should send a success event if message was sent', async () => { + it('should send a success event if message was sent', done => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); - await pendingMessageCache.add(device, message); - - const eventPromise = PromiseUtils.waitForTask< - RawMessage | OpenGroupMessage - >(complete => { - messageQueueStub.events.once('sendSuccess', complete); + const waitForMessageSentEvent = new Promise(resolve => { + resolve(true); + done(); }); - await messageQueueStub.processPending(device); - - const rawMessage = await eventPromise; - expect(rawMessage.identifier).to.equal(message.identifier); + pendingMessageCache + .add(device, message, waitForMessageSentEvent as any) + .then(() => messageQueueStub.processPending(device)) + .then(() => { + expect(messageSentHandlerSuccessStub.callCount).to.be.equal(1); + expect( + messageSentHandlerSuccessStub.lastCall.args[0].identifier + ).to.be.equal(message.identifier); + }); }); - it('should send a fail event if something went wrong while sending', async () => { + it('should send a fail event if something went wrong while sending', done => { sendStub.throws(new Error('failure')); - const spy = sandbox.spy(); - messageQueueStub.events.on('sendFail', spy); - const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); - await pendingMessageCache.add(device, message); - - const eventPromise = PromiseUtils.waitForTask< - [RawMessage | OpenGroupMessage, Error] - >(complete => { - messageQueueStub.events.once('sendFail', (...args) => { - complete(args); - }); + const waitForMessageSentEvent = new Promise(resolve => { + resolve(true); + done(); }); - await messageQueueStub.processPending(device); - - const [rawMessage, error] = await eventPromise; - expect(rawMessage.identifier).to.equal(message.identifier); - expect(error.message).to.equal('failure'); + pendingMessageCache + .add(device, message, waitForMessageSentEvent as any) + .then(() => messageQueueStub.processPending(device)) + .then(() => { + expect(messageSentHandlerFailedStub.callCount).to.be.equal(1); + expect( + messageSentHandlerFailedStub.lastCall.args[0].identifier + ).to.be.equal(message.identifier); + expect( + messageSentHandlerFailedStub.lastCall.args[1].message + ).to.equal('failure'); + expect(waitForMessageSentEvent).to.be.eventually.fulfilled; + }); }); }); }); @@ -155,12 +176,11 @@ describe('MessageQueue', () => { }); describe('sendToGroup', () => { - it('should throw an error if invalid non-group message was passed', () => { - // const chatMessage = TestUtils.generateChatMessage(); - // await expect( - // messageQueueStub.sendToGroup(chatMessage) - // ).to.be.rejectedWith('Invalid group message passed in sendToGroup.'); - // Cannot happen with typescript as this function only accept group message now + it('should throw an error if invalid non-group message was passed', async () => { + const chatMessage = TestUtils.generateChatMessage(); + await expect( + messageQueueStub.sendToGroup(chatMessage as any) + ).to.be.rejectedWith('Invalid group message passed in sendToGroup.'); }); describe('closed groups', () => { @@ -170,7 +190,7 @@ describe('MessageQueue', () => { ); sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members); - const send = sandbox.stub(messageQueueStub, 'send').resolves(); + const send = sandbox.stub(messageQueueStub, 'sendToPubKey').resolves(); const message = TestUtils.generateClosedGroupMessage(); await messageQueueStub.sendToGroup(message); @@ -196,34 +216,43 @@ describe('MessageQueue', () => { it('can send to open group', async () => { const message = TestUtils.generateOpenGroupMessage(); - await messageQueueStub.sendToGroup(message); + await messageQueueStub.sendToOpenGroup(message); expect(sendToOpenGroupStub.callCount).to.equal(1); }); it('should emit a success event when send was successful', async () => { sendToOpenGroupStub.resolves({ serverId: 5125, - serverTimestamp: 5125, + serverTimestamp: 5126, }); const message = TestUtils.generateOpenGroupMessage(); - const eventPromise = PromiseUtils.waitForTask(complete => { - messageQueueStub.events.once('sendSuccess', complete); - }, 2000); - - await messageQueueStub.sendToGroup(message); - return eventPromise.should.be.fulfilled; + await messageQueueStub.sendToOpenGroup(message); + expect(messageSentHandlerSuccessStub.callCount).to.equal(1); + expect( + messageSentHandlerSuccessStub.lastCall.args[0].identifier + ).to.equal(message.identifier); + expect(messageSentPublicHandlerSuccessStub.callCount).to.equal(1); + expect( + messageSentPublicHandlerSuccessStub.lastCall.args[0].identifier + ).to.equal(message.identifier); + expect( + messageSentPublicHandlerSuccessStub.lastCall.args[1].serverId + ).to.equal(5125); + expect( + messageSentPublicHandlerSuccessStub.lastCall.args[1].serverTimestamp + ).to.equal(5126); }); it('should emit a fail event if something went wrong', async () => { sendToOpenGroupStub.resolves({ serverId: -1, serverTimestamp: -1 }); const message = TestUtils.generateOpenGroupMessage(); - const eventPromise = PromiseUtils.waitForTask(complete => { - messageQueueStub.events.once('sendFail', complete); - }, 2000); - await messageQueueStub.sendToGroup(message); - return eventPromise.should.be.fulfilled; + await messageQueueStub.sendToOpenGroup(message); + expect(messageSentHandlerFailedStub.callCount).to.equal(1); + expect( + messageSentHandlerFailedStub.lastCall.args[0].identifier + ).to.equal(message.identifier); }); }); });