move events from MessageQueue to MessageSentHandler

pull/1495/head
Audric Ackermann 4 years ago
parent 8ea9f02cec
commit 370158951a

@ -773,24 +773,6 @@
} }
}); });
Whisper.events.on(
'publicMessageSent',
({ identifier, pubKey, timestamp, serverId, serverTimestamp }) => {
try {
const conversation = window.getConversationController().get(pubKey);
conversation.onPublicMessageSent({
identifier,
pubKey,
timestamp,
serverId,
serverTimestamp,
});
} catch (e) {
window.log.error('Error setting public on message');
}
}
);
Whisper.events.on('password-updated', () => { Whisper.events.on('password-updated', () => {
if (appView && appView.inboxView) { if (appView && appView.inboxView) {
appView.inboxView.trigger('password-updated'); appView.inboxView.trigger('password-updated');

@ -13,7 +13,10 @@ import { getFocusedSection } from '../../state/selectors/section';
import { getTheme } from '../../state/selectors/theme'; import { getTheme } from '../../state/selectors/theme';
import { getOurNumber } from '../../state/selectors/user'; import { getOurNumber } from '../../state/selectors/user';
import { UserUtils } from '../../session/utils'; import { UserUtils } from '../../session/utils';
import { syncConfigurationIfNeeded } from '../../session/utils/syncUtils'; import {
forceSyncConfigurationNowIfNeeded,
syncConfigurationIfNeeded,
} from '../../session/utils/syncUtils';
import { DAYS } from '../../session/utils/Number'; import { DAYS } from '../../session/utils/Number';
import { removeItemById } from '../../data/data'; import { removeItemById } from '../../data/data';
// tslint:disable-next-line: no-import-side-effect no-submodule-imports // tslint:disable-next-line: no-import-side-effect no-submodule-imports

@ -46,11 +46,6 @@ export class SessionInboxView extends React.Component<Props, State> {
isExpired: false, isExpired: false,
}; };
this.fetchHandleMessageSentData = this.fetchHandleMessageSentData.bind(
this
);
this.handleMessageSentFailure = this.handleMessageSentFailure.bind(this);
this.handleMessageSentSuccess = this.handleMessageSentSuccess.bind(this);
this.showSessionSettingsCategory = this.showSessionSettingsCategory.bind( this.showSessionSettingsCategory = this.showSessionSettingsCategory.bind(
this this
); );
@ -117,51 +112,6 @@ export class SessionInboxView extends React.Component<Props, State> {
); );
} }
private async fetchHandleMessageSentData(m: RawMessage | OpenGroupMessage) {
// if a message was sent and this message was created after the last app restart,
// this message is still in memory in the MessageController
const msg = MessageController.getInstance().get(m.identifier);
if (!msg || !msg.message) {
// otherwise, look for it in the database
// nobody is listening to this freshly fetched message .trigger calls
const dbMessage = await getMessageById(m.identifier);
if (!dbMessage) {
return null;
}
return { msg: dbMessage };
}
return { msg: msg.message };
}
private async handleMessageSentSuccess(
sentMessage: RawMessage | OpenGroupMessage,
wrappedEnvelope: any
) {
const fetchedData = await this.fetchHandleMessageSentData(sentMessage);
if (!fetchedData) {
return;
}
const { msg } = fetchedData;
void msg.handleMessageSentSuccess(sentMessage, wrappedEnvelope);
}
private async handleMessageSentFailure(
sentMessage: RawMessage | OpenGroupMessage,
error: any
) {
const fetchedData = await this.fetchHandleMessageSentData(sentMessage);
if (!fetchedData) {
return;
}
const { msg } = fetchedData;
await msg.handleMessageSentFailure(sentMessage, error);
}
private async setupLeftPane() { private async setupLeftPane() {
// Here we set up a full redux store with initial state for our LeftPane Root // Here we set up a full redux store with initial state for our LeftPane Root
const convoCollection = ConversationController.getInstance().getConversations(); const convoCollection = ConversationController.getInstance().getConversations();
@ -206,22 +156,6 @@ export class SessionInboxView extends React.Component<Props, State> {
this.store.dispatch this.store.dispatch
); );
this.fetchHandleMessageSentData = this.fetchHandleMessageSentData.bind(
this
);
this.handleMessageSentFailure = this.handleMessageSentFailure.bind(this);
this.handleMessageSentSuccess = this.handleMessageSentSuccess.bind(this);
getMessageQueue().events.addListener(
'sendSuccess',
this.handleMessageSentSuccess
);
getMessageQueue().events.addListener(
'sendFail',
this.handleMessageSentFailure
);
window.Whisper.events.on('messageExpired', messageExpired); window.Whisper.events.on('messageExpired', messageExpired);
window.Whisper.events.on('messageChanged', messageChanged); window.Whisper.events.on('messageChanged', messageChanged);
window.Whisper.events.on('messageAdded', messageAdded); window.Whisper.events.on('messageAdded', messageAdded);

@ -228,6 +228,15 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
public isMediumGroup() { public isMediumGroup() {
return this.get('is_medium_group'); return this.get('is_medium_group');
} }
/**
* Returns true if this conversation is active
* i.e. the conversation is visibie on the left pane. (Either we or another user created this convo).
* This is useful because we do not want bumpTyping on the first message typing to a new convo to
* send a message.
*/
public isActive() {
return Boolean(this.get('active_at'));
}
public async block() { public async block() {
if (!this.id || this.isPublic()) { if (!this.id || this.isPublic()) {
return; return;
@ -251,13 +260,15 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
await this.commit(); await this.commit();
} }
public async bumpTyping() { public async bumpTyping() {
if (this.isPublic() || this.isMediumGroup()) {
return;
}
// We don't send typing messages if the setting is disabled // We don't send typing messages if the setting is disabled
// or we blocked that user // or we blocked that user
if (
if (!window.storage.get('typing-indicators-setting') || this.isBlocked()) { this.isPublic() ||
this.isMediumGroup() ||
!this.isActive() ||
!window.storage.get('typing-indicators-setting') ||
this.isBlocked()
) {
return; return;
} }
@ -408,27 +419,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
await Promise.all(messages.map((m: any) => m.setCalculatingPoW())); await Promise.all(messages.map((m: any) => m.setCalculatingPoW()));
} }
public async onPublicMessageSent({
identifier,
serverId,
serverTimestamp,
}: {
identifier: string;
serverId: number;
serverTimestamp: number;
}) {
const registeredMessage = MessageController.getInstance().get(identifier);
if (!registeredMessage || !registeredMessage.message) {
return null;
}
const model = registeredMessage.message;
await model.setIsPublic(true);
await model.setServerId(serverId);
await model.setServerTimestamp(serverTimestamp);
return undefined;
}
public format() { public format() {
return this.cachedProps; return this.cachedProps;
} }
@ -679,7 +669,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
}; };
const openGroupMessage = new OpenGroupMessage(openGroupParams); const openGroupMessage = new OpenGroupMessage(openGroupParams);
// we need the return await so that errors are caught in the catch {} // we need the return await so that errors are caught in the catch {}
await getMessageQueue().sendToGroup(openGroupMessage); await getMessageQueue().sendToOpenGroup(openGroupMessage);
return; return;
} }
const chatMessageParams: ChatMessageParams = { const chatMessageParams: ChatMessageParams = {
@ -800,19 +790,17 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
messageWithSchema.source = UserUtils.getOurPubKeyStrFromCache(); messageWithSchema.source = UserUtils.getOurPubKeyStrFromCache();
messageWithSchema.sourceDevice = 1; messageWithSchema.sourceDevice = 1;
// set the serverTimestamp only if this conversation is a public one.
const attributes: MessageAttributesOptionals = { const attributes: MessageAttributesOptionals = {
...messageWithSchema, ...messageWithSchema,
groupInvitation, groupInvitation,
conversationId: this.id, conversationId: this.id,
destination: isPrivate ? destination : undefined, destination: isPrivate ? destination : undefined,
serverTimestamp: this.isPublic() ? new Date().getTime() : undefined,
}; };
const model = await this.addSingleMessage(attributes); const model = await this.addSingleMessage(attributes);
if (this.isPublic()) {
await model.setServerTimestamp(new Date().getTime());
}
this.set({ this.set({
lastMessage: model.getNotificationText(), lastMessage: model.getNotificationText(),
lastMessageStatus: 'sending', lastMessageStatus: 'sending',

@ -872,7 +872,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
...uploaded, ...uploaded,
}; };
const openGroupMessage = new OpenGroupMessage(openGroupParams); const openGroupMessage = new OpenGroupMessage(openGroupParams);
return getMessageQueue().sendToGroup(openGroupMessage); return getMessageQueue().sendToOpenGroup(openGroupMessage);
} }
const { body, attachments, preview, quote } = await this.uploadData(); const { body, attachments, preview, quote } = await this.uploadData();
@ -1148,42 +1148,6 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
await this.commit(); await this.commit();
} }
public async setServerId(serverId: number) {
if (_.isEqual(this.get('serverId'), serverId)) {
return;
}
this.set({
serverId,
});
await this.commit();
}
public async setServerTimestamp(serverTimestamp?: number) {
if (_.isEqual(this.get('serverTimestamp'), serverTimestamp)) {
return;
}
this.set({
serverTimestamp,
});
await this.commit();
}
public async setIsPublic(isPublic: boolean) {
if (_.isEqual(this.get('isPublic'), isPublic)) {
return;
}
this.set({
isPublic: !!isPublic,
});
await this.commit();
}
public async sendSyncMessageOnly(dataMessage: any) { public async sendSyncMessageOnly(dataMessage: any) {
const now = Date.now(); const now = Date.now();
this.set({ this.set({

@ -6,7 +6,6 @@ import { fromHex, fromHexToArray, toHex } from '../utils/String';
import { BlockedNumberController } from '../../util/blockedNumberController'; import { BlockedNumberController } from '../../util/blockedNumberController';
import { ConversationController } from '../conversations'; import { ConversationController } from '../conversations';
import { updateOpenGroup } from '../../receiver/openGroups'; import { updateOpenGroup } from '../../receiver/openGroups';
import { getMessageQueue } from '../instance';
import { import {
addClosedGroupEncryptionKeyPair, addClosedGroupEncryptionKeyPair,
getIdentityKeyById, getIdentityKeyById,
@ -33,6 +32,7 @@ import { MessageModel } from '../../models/message';
import { MessageModelType } from '../../models/messageType'; import { MessageModelType } from '../../models/messageType';
import { MessageController } from '../messages'; import { MessageController } from '../messages';
import { distributingClosedGroupEncryptionKeyPairs } from '../../receiver/closedGroups'; import { distributingClosedGroupEncryptionKeyPairs } from '../../receiver/closedGroups';
import { getMessageQueue } from '..';
export interface GroupInfo { export interface GroupInfo {
id: string; id: string;

@ -6,7 +6,7 @@ import * as Sending from './sending';
import * as Constants from './constants'; import * as Constants from './constants';
import * as ClosedGroup from './group'; import * as ClosedGroup from './group';
export * from './instance'; const getMessageQueue = Sending.getMessageQueue;
export { export {
Conversations, Conversations,
@ -16,4 +16,5 @@ export {
Sending, Sending,
Constants, Constants,
ClosedGroup, ClosedGroup,
getMessageQueue,
}; };

@ -1,12 +0,0 @@
import { MessageQueue } from './sending/';
let messageQueue: MessageQueue;
function getMessageQueue(): MessageQueue {
if (!messageQueue) {
messageQueue = new MessageQueue();
}
return messageQueue;
}
export { getMessageQueue };

@ -30,6 +30,11 @@ export class MessageController {
} }
public register(id: string, message: MessageModel) { public register(id: string, message: MessageModel) {
if (!(message instanceof MessageModel)) {
throw new Error(
'Only MessageModels can be registered to the MessageController.'
);
}
const existing = this.messageLookup.get(id); const existing = this.messageLookup.get(id);
if (existing) { if (existing) {
this.messageLookup.set(id, { this.messageLookup.set(id, {

@ -1,15 +1,12 @@
import { EventEmitter } from 'events';
import { import {
ChatMessage,
ClosedGroupChatMessage, ClosedGroupChatMessage,
ClosedGroupNewMessage, ClosedGroupNewMessage,
ContentMessage, ContentMessage,
DataMessage,
ExpirationTimerUpdateMessage, ExpirationTimerUpdateMessage,
OpenGroupMessage, OpenGroupMessage,
} from '../messages/outgoing'; } from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache'; import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, TypedEventEmitter, UserUtils } from '../utils'; import { JobQueue, UserUtils } from '../utils';
import { PubKey, RawMessage } from '../types'; import { PubKey, RawMessage } from '../types';
import { MessageSender } from '.'; import { MessageSender } from '.';
import { ClosedGroupMessage } from '../messages/outgoing/content/data/group/ClosedGroupMessage'; import { ClosedGroupMessage } from '../messages/outgoing/content/data/group/ClosedGroupMessage';
@ -23,9 +20,9 @@ import {
ClosedGroupUpdateMessage, ClosedGroupUpdateMessage,
} from '../messages/outgoing/content/data/group'; } from '../messages/outgoing/content/data/group';
import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/content/data/group/ClosedGroupMemberLeftMessage'; import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/content/data/group/ClosedGroupMemberLeftMessage';
import { MessageSentHandler } from './MessageSentHandler';
export type GroupMessageType = type ClosedGroupMessageType =
| OpenGroupMessage
| ClosedGroupChatMessage | ClosedGroupChatMessage
| ClosedGroupAddedMembersMessage | ClosedGroupAddedMembersMessage
| ClosedGroupRemovedMembersMessage | ClosedGroupRemovedMembersMessage
@ -37,21 +34,12 @@ export type GroupMessageType =
| ClosedGroupEncryptionPairRequestMessage; | ClosedGroupEncryptionPairRequestMessage;
// ClosedGroupEncryptionPairReplyMessage must be sent to a user pubkey. Not a group. // ClosedGroupEncryptionPairReplyMessage must be sent to a user pubkey. Not a group.
export interface MessageQueueInterfaceEvents {
sendSuccess: (
message: RawMessage | OpenGroupMessage,
wrappedEnvelope?: Uint8Array
) => void;
sendFail: (message: RawMessage | OpenGroupMessage, error: Error) => void;
}
export class MessageQueue { export class MessageQueue {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
private readonly jobQueues: Map<string, JobQueue> = new Map(); private readonly jobQueues: Map<string, JobQueue> = new Map();
private readonly pendingMessageCache: PendingMessageCache; private readonly pendingMessageCache: PendingMessageCache;
constructor(cache?: PendingMessageCache) { constructor(cache?: PendingMessageCache) {
this.events = new EventEmitter();
this.pendingMessageCache = cache ?? new PendingMessageCache(); this.pendingMessageCache = cache ?? new PendingMessageCache();
void this.processAllPending(); void this.processAllPending();
} }
@ -70,18 +58,37 @@ export class MessageQueue {
await this.process(user, message, sentCb); await this.process(user, message, sentCb);
} }
public async send( /**
device: PubKey, * This function is synced. It will wait for the message to be delivered to the open
message: ContentMessage, * group to return.
sentCb?: (message: RawMessage) => Promise<void> * So there is no need for a sendCb callback
): Promise<void> { *
if ( */
message instanceof ConfigurationMessage || public async sendToOpenGroup(message: OpenGroupMessage) {
!!(message as any).syncTarget // Open groups
) { if (!(message instanceof OpenGroupMessage)) {
throw new Error('SyncMessage needs to be sent with sendSyncMessage'); throw new Error('sendToOpenGroup can only be used with OpenGroupMessage');
}
// No queue needed for Open Groups; send directly
const error = new Error('Failed to send message to open group.');
// This is absolutely yucky ... we need to make it not use Promise<boolean>
try {
const result = await MessageSender.sendToOpenGroup(message);
// sendToOpenGroup returns -1 if failed or an id if succeeded
if (result.serverId < 0) {
void MessageSentHandler.handleMessageSentFailure(message, error);
} else {
void MessageSentHandler.handleMessageSentSuccess(message);
void MessageSentHandler.handlePublicMessageSentSuccess(message, result);
}
} catch (e) {
window?.log?.warn(
`Failed to send message to open group: ${message.group.server}`,
e
);
void MessageSentHandler.handleMessageSentFailure(message, error);
} }
await this.process(device, message, sentCb);
} }
/** /**
@ -89,43 +96,9 @@ export class MessageQueue {
* @param sentCb currently only called for medium groups sent message * @param sentCb currently only called for medium groups sent message
*/ */
public async sendToGroup( public async sendToGroup(
message: GroupMessageType, message: ClosedGroupMessageType,
sentCb?: (message: RawMessage) => Promise<void> sentCb?: (message: RawMessage) => Promise<void>
): Promise<void> { ): Promise<void> {
// Open groups
if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly
const error = new Error('Failed to send message to open group.');
// This is absolutely yucky ... we need to make it not use Promise<boolean>
try {
const result = await MessageSender.sendToOpenGroup(message);
// sendToOpenGroup returns -1 if failed or an id if succeeded
if (result.serverId < 0) {
this.events.emit('sendFail', message, error);
} else {
const messageEventData = {
identifier: message.identifier,
pubKey: message.group.groupId,
timestamp: message.timestamp,
serverId: result.serverId,
serverTimestamp: result.serverTimestamp,
};
this.events.emit('sendSuccess', message);
window.Whisper.events.trigger('publicMessageSent', messageEventData);
}
} catch (e) {
window?.log?.warn(
`Failed to send message to open group: ${message.group.server}`,
e
);
this.events.emit('sendFail', message, error);
}
return;
}
let groupId: PubKey | undefined; let groupId: PubKey | undefined;
if ( if (
message instanceof ExpirationTimerUpdateMessage || message instanceof ExpirationTimerUpdateMessage ||
@ -138,7 +111,7 @@ export class MessageQueue {
throw new Error('Invalid group message passed in sendToGroup.'); throw new Error('Invalid group message passed in sendToGroup.');
} }
// if groupId is set here, it means it's for a medium group. So send it as it // if groupId is set here, it means it's for a medium group. So send it as it
return this.send(PubKey.cast(groupId), message, sentCb); return this.sendToPubKey(PubKey.cast(groupId), message, sentCb);
} }
public async sendSyncMessage( public async sendSyncMessage(
@ -157,10 +130,6 @@ export class MessageQueue {
const ourPubKey = UserUtils.getOurPubKeyStrFromCache(); const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
if (!ourPubKey) {
throw new Error('ourNumber is not set');
}
await this.process(PubKey.cast(ourPubKey), message, sentCb); await this.process(PubKey.cast(ourPubKey), message, sentCb);
} }
@ -176,7 +145,11 @@ export class MessageQueue {
const job = async () => { const job = async () => {
try { try {
const wrappedEnvelope = await MessageSender.send(message); const wrappedEnvelope = await MessageSender.send(message);
this.events.emit('sendSuccess', message, wrappedEnvelope); void MessageSentHandler.handleMessageSentSuccess(
message,
wrappedEnvelope
);
const cb = this.pendingMessageCache.callbacks.get( const cb = this.pendingMessageCache.callbacks.get(
message.identifier message.identifier
); );
@ -185,8 +158,8 @@ export class MessageQueue {
await cb(message); await cb(message);
} }
this.pendingMessageCache.callbacks.delete(message.identifier); this.pendingMessageCache.callbacks.delete(message.identifier);
} catch (e) { } catch (error) {
this.events.emit('sendFail', message, e); void MessageSentHandler.handleMessageSentFailure(message, error);
} finally { } finally {
// Remove from the cache because retrying is done in the sender // Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(message); void this.pendingMessageCache.remove(message);
@ -243,3 +216,12 @@ export class MessageQueue {
return queue; return queue;
} }
} }
let messageQueue: MessageQueue;
export function getMessageQueue(): MessageQueue {
if (!messageQueue) {
messageQueue = new MessageQueue();
}
return messageQueue;
}

@ -0,0 +1,93 @@
import { getMessageById } from '../../data/data';
import { MessageController } from '../messages';
import { OpenGroupMessage } from '../messages/outgoing';
import { RawMessage } from '../types';
export class MessageSentHandler {
/**
* This function tries to find a message by messageId by first looking on the MessageController.
* The MessageController holds all messages being in memory.
* Those are the messages sent recently, recieved recently, or the one shown to the user.
*
* If the app restarted, it's very likely those messages won't be on the memory anymore.
* In this case, this function will look for it in the database and return it.
* If the message is found on the db, it will also register it to the MessageController so our subsequent calls are quicker.
*/
private static async fetchHandleMessageSentData(
m: RawMessage | OpenGroupMessage
) {
// if a message was sent and this message was created after the last app restart,
// this message is still in memory in the MessageController
const msg = MessageController.getInstance().get(m.identifier);
if (!msg || !msg.message) {
// otherwise, look for it in the database
// nobody is listening to this freshly fetched message .trigger calls
const dbMessage = await getMessageById(m.identifier);
if (!dbMessage) {
return null;
}
MessageController.getInstance().register(m.identifier, dbMessage);
return dbMessage;
}
return msg.message;
}
public static async handlePublicMessageSentSuccess(
sentMessage: OpenGroupMessage,
result: { serverId: number; serverTimestamp: number }
) {
const { serverId, serverTimestamp } = result;
try {
const foundMessage = await MessageSentHandler.fetchHandleMessageSentData(
sentMessage
);
if (!foundMessage) {
throw new Error(
'handlePublicMessageSentSuccess(): The message should be in memory for an openGroup message'
);
}
foundMessage.set({
serverTimestamp,
serverId,
isPublic: true,
});
await foundMessage.commit();
} catch (e) {
window.log.error('Error setting public on message');
}
}
public static async handleMessageSentSuccess(
sentMessage: RawMessage | OpenGroupMessage,
wrappedEnvelope?: Uint8Array
) {
// The wrappedEnvelope will be set only if the message is not one of OpenGroupMessage type.
const fetchedMessage = await MessageSentHandler.fetchHandleMessageSentData(
sentMessage
);
if (!fetchedMessage) {
return;
}
void fetchedMessage.handleMessageSentSuccess(sentMessage, wrappedEnvelope);
}
public static async handleMessageSentFailure(
sentMessage: RawMessage | OpenGroupMessage,
error: any
) {
const fetchedMessage = await MessageSentHandler.fetchHandleMessageSentData(
sentMessage
);
if (!fetchedMessage) {
return;
}
await fetchedMessage.handleMessageSentFailure(sentMessage, error);
}
}

@ -4,7 +4,6 @@ import { PartialRawMessage, RawMessage } from '../types/RawMessage';
import { ContentMessage } from '../messages/outgoing'; import { ContentMessage } from '../messages/outgoing';
import { PubKey } from '../types'; import { PubKey } from '../types';
import { MessageUtils } from '../utils'; import { MessageUtils } from '../utils';
import { GroupMessageType } from '.';
// This is an abstraction for storing pending messages. // This is an abstraction for storing pending messages.
// Ideally we want to store pending messages in the database so that // Ideally we want to store pending messages in the database so that

@ -45,36 +45,33 @@ export const forceSyncConfigurationNowIfNeeded = async (
) => { ) => {
const allConvos = ConversationController.getInstance().getConversations(); const allConvos = ConversationController.getInstance().getConversations();
const configMessage = await getCurrentConfigurationMessage(allConvos); const configMessage = await getCurrentConfigurationMessage(allConvos);
window.log.info('forceSyncConfigurationNowIfNeeded with', configMessage);
const waitForMessageSentEvent = new Promise(resolve => { async function waitForMessageSentEvent(message: RawMessage) {
const ourResolver = (message: any) => { return new Promise(resolve => {
if (message.identifier === configMessage.identifier) { if (message.identifier === configMessage.identifier) {
getMessageQueue().events.off('sendSuccess', ourResolver);
getMessageQueue().events.off('sendFail', ourResolver);
resolve(true); resolve(true);
} }
}; });
getMessageQueue().events.on('sendSuccess', ourResolver); }
getMessageQueue().events.on('sendFail', ourResolver);
});
try { try {
// this just adds the message to the sending queue. // passing the callback like that
// if waitForMessageSent is set, we need to effectively wait until then if (waitForMessageSent) {
await Promise.all([ await getMessageQueue().sendSyncMessage(
getMessageQueue().sendSyncMessage(configMessage), configMessage,
waitForMessageSentEvent, waitForMessageSentEvent as any
]); );
return Promise.resolve();
} else {
await getMessageQueue().sendSyncMessage(configMessage);
return waitForMessageSentEvent;
}
} catch (e) { } catch (e) {
window.log.warn( window.log.warn(
'Caught an error while sending our ConfigurationMessage:', 'Caught an error while sending our ConfigurationMessage:',
e e
); );
} }
if (!waitForMessageSent) {
return;
}
return waitForMessageSentEvent; return Promise.resolve();
}; };

@ -18,6 +18,7 @@ import { PendingMessageCacheStub } from '../../../test-utils/stubs';
import { ClosedGroupMessage } from '../../../../session/messages/outgoing/content/data/group/ClosedGroupMessage'; import { ClosedGroupMessage } from '../../../../session/messages/outgoing/content/data/group/ClosedGroupMessage';
import chaiAsPromised from 'chai-as-promised'; import chaiAsPromised from 'chai-as-promised';
import { MessageSentHandler } from '../../../../session/sending/MessageSentHandler';
chai.use(chaiAsPromised as any); chai.use(chaiAsPromised as any);
chai.should(); chai.should();
@ -32,6 +33,9 @@ describe('MessageQueue', () => {
// Initialize new stubbed queue // Initialize new stubbed queue
let pendingMessageCache: PendingMessageCacheStub; let pendingMessageCache: PendingMessageCacheStub;
let messageSentHandlerFailedStub: sinon.SinonStub;
let messageSentHandlerSuccessStub: sinon.SinonStub;
let messageSentPublicHandlerSuccessStub: sinon.SinonStub;
let messageQueueStub: MessageQueue; let messageQueueStub: MessageQueue;
// Message Sender Stubs // Message Sender Stubs
@ -47,6 +51,15 @@ describe('MessageQueue', () => {
// Message Sender Stubs // Message Sender Stubs
sendStub = sandbox.stub(MessageSender, 'send').resolves(); sendStub = sandbox.stub(MessageSender, 'send').resolves();
messageSentHandlerFailedStub = sandbox
.stub(MessageSentHandler as any, 'handleMessageSentFailure')
.resolves();
messageSentHandlerSuccessStub = sandbox
.stub(MessageSentHandler as any, 'handleMessageSentSuccess')
.resolves();
messageSentPublicHandlerSuccessStub = sandbox
.stub(MessageSentHandler as any, 'handlePublicMessageSentSuccess')
.resolves();
// Init Queue // Init Queue
pendingMessageCache = new PendingMessageCacheStub(); pendingMessageCache = new PendingMessageCacheStub();
@ -59,16 +72,22 @@ describe('MessageQueue', () => {
}); });
describe('processPending', () => { describe('processPending', () => {
it('will send messages', async () => { it('will send messages', done => {
const device = TestUtils.generateFakePubKey(); const device = TestUtils.generateFakePubKey();
await pendingMessageCache.add(device, TestUtils.generateChatMessage());
const successPromise = PromiseUtils.waitForTask(done => { const waitForMessageSentEvent = new Promise(resolve => {
messageQueueStub.events.once('sendSuccess', done); resolve(true);
done();
}); });
await messageQueueStub.processPending(device);
// tslint:disable-next-line: no-unused-expression pendingMessageCache.add(
expect(successPromise).to.eventually.be.fulfilled; device,
TestUtils.generateChatMessage(),
waitForMessageSentEvent as any
);
messageQueueStub.processPending(device);
expect(waitForMessageSentEvent).to.be.fulfilled;
}); });
it('should remove message from cache', async () => { it('should remove message from cache', async () => {
@ -96,46 +115,48 @@ describe('MessageQueue', () => {
}).timeout(15000); }).timeout(15000);
describe('events', () => { describe('events', () => {
it('should send a success event if message was sent', async () => { it('should send a success event if message was sent', done => {
const device = TestUtils.generateFakePubKey(); const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage(); const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message); const waitForMessageSentEvent = new Promise(resolve => {
resolve(true);
const eventPromise = PromiseUtils.waitForTask< done();
RawMessage | OpenGroupMessage
>(complete => {
messageQueueStub.events.once('sendSuccess', complete);
}); });
await messageQueueStub.processPending(device); pendingMessageCache
.add(device, message, waitForMessageSentEvent as any)
const rawMessage = await eventPromise; .then(() => messageQueueStub.processPending(device))
expect(rawMessage.identifier).to.equal(message.identifier); .then(() => {
expect(messageSentHandlerSuccessStub.callCount).to.be.equal(1);
expect(
messageSentHandlerSuccessStub.lastCall.args[0].identifier
).to.be.equal(message.identifier);
});
}); });
it('should send a fail event if something went wrong while sending', async () => { it('should send a fail event if something went wrong while sending', done => {
sendStub.throws(new Error('failure')); sendStub.throws(new Error('failure'));
const spy = sandbox.spy();
messageQueueStub.events.on('sendFail', spy);
const device = TestUtils.generateFakePubKey(); const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateChatMessage(); const message = TestUtils.generateChatMessage();
await pendingMessageCache.add(device, message); const waitForMessageSentEvent = new Promise(resolve => {
resolve(true);
const eventPromise = PromiseUtils.waitForTask< done();
[RawMessage | OpenGroupMessage, Error]
>(complete => {
messageQueueStub.events.once('sendFail', (...args) => {
complete(args);
});
}); });
await messageQueueStub.processPending(device); pendingMessageCache
.add(device, message, waitForMessageSentEvent as any)
const [rawMessage, error] = await eventPromise; .then(() => messageQueueStub.processPending(device))
expect(rawMessage.identifier).to.equal(message.identifier); .then(() => {
expect(error.message).to.equal('failure'); expect(messageSentHandlerFailedStub.callCount).to.be.equal(1);
expect(
messageSentHandlerFailedStub.lastCall.args[0].identifier
).to.be.equal(message.identifier);
expect(
messageSentHandlerFailedStub.lastCall.args[1].message
).to.equal('failure');
expect(waitForMessageSentEvent).to.be.eventually.fulfilled;
});
}); });
}); });
}); });
@ -155,12 +176,11 @@ describe('MessageQueue', () => {
}); });
describe('sendToGroup', () => { describe('sendToGroup', () => {
it('should throw an error if invalid non-group message was passed', () => { it('should throw an error if invalid non-group message was passed', async () => {
// const chatMessage = TestUtils.generateChatMessage(); const chatMessage = TestUtils.generateChatMessage();
// await expect( await expect(
// messageQueueStub.sendToGroup(chatMessage) messageQueueStub.sendToGroup(chatMessage as any)
// ).to.be.rejectedWith('Invalid group message passed in sendToGroup.'); ).to.be.rejectedWith('Invalid group message passed in sendToGroup.');
// Cannot happen with typescript as this function only accept group message now
}); });
describe('closed groups', () => { describe('closed groups', () => {
@ -170,7 +190,7 @@ describe('MessageQueue', () => {
); );
sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members); sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members);
const send = sandbox.stub(messageQueueStub, 'send').resolves(); const send = sandbox.stub(messageQueueStub, 'sendToPubKey').resolves();
const message = TestUtils.generateClosedGroupMessage(); const message = TestUtils.generateClosedGroupMessage();
await messageQueueStub.sendToGroup(message); await messageQueueStub.sendToGroup(message);
@ -196,34 +216,43 @@ describe('MessageQueue', () => {
it('can send to open group', async () => { it('can send to open group', async () => {
const message = TestUtils.generateOpenGroupMessage(); const message = TestUtils.generateOpenGroupMessage();
await messageQueueStub.sendToGroup(message); await messageQueueStub.sendToOpenGroup(message);
expect(sendToOpenGroupStub.callCount).to.equal(1); expect(sendToOpenGroupStub.callCount).to.equal(1);
}); });
it('should emit a success event when send was successful', async () => { it('should emit a success event when send was successful', async () => {
sendToOpenGroupStub.resolves({ sendToOpenGroupStub.resolves({
serverId: 5125, serverId: 5125,
serverTimestamp: 5125, serverTimestamp: 5126,
}); });
const message = TestUtils.generateOpenGroupMessage(); const message = TestUtils.generateOpenGroupMessage();
const eventPromise = PromiseUtils.waitForTask(complete => { await messageQueueStub.sendToOpenGroup(message);
messageQueueStub.events.once('sendSuccess', complete); expect(messageSentHandlerSuccessStub.callCount).to.equal(1);
}, 2000); expect(
messageSentHandlerSuccessStub.lastCall.args[0].identifier
await messageQueueStub.sendToGroup(message); ).to.equal(message.identifier);
return eventPromise.should.be.fulfilled; expect(messageSentPublicHandlerSuccessStub.callCount).to.equal(1);
expect(
messageSentPublicHandlerSuccessStub.lastCall.args[0].identifier
).to.equal(message.identifier);
expect(
messageSentPublicHandlerSuccessStub.lastCall.args[1].serverId
).to.equal(5125);
expect(
messageSentPublicHandlerSuccessStub.lastCall.args[1].serverTimestamp
).to.equal(5126);
}); });
it('should emit a fail event if something went wrong', async () => { it('should emit a fail event if something went wrong', async () => {
sendToOpenGroupStub.resolves({ serverId: -1, serverTimestamp: -1 }); sendToOpenGroupStub.resolves({ serverId: -1, serverTimestamp: -1 });
const message = TestUtils.generateOpenGroupMessage(); const message = TestUtils.generateOpenGroupMessage();
const eventPromise = PromiseUtils.waitForTask(complete => {
messageQueueStub.events.once('sendFail', complete);
}, 2000);
await messageQueueStub.sendToGroup(message); await messageQueueStub.sendToOpenGroup(message);
return eventPromise.should.be.fulfilled; expect(messageSentHandlerFailedStub.callCount).to.equal(1);
expect(
messageSentHandlerFailedStub.lastCall.args[0].identifier
).to.equal(message.identifier);
}); });
}); });
}); });

Loading…
Cancel
Save