From 255c7ada68270d65496ffd4bd44f89074cc12d7b Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Thu, 2 Jul 2020 12:32:13 +1000 Subject: [PATCH] More clean up in message_receiver.js --- js/background.js | 1 - js/modules/attachment_downloads.js | 12 +- libtextsecure/message_receiver.js | 118 +------------------- ts/receiver/contentMessage.ts | 2 +- ts/receiver/dataMessage.ts | 2 +- ts/receiver/queuedJob.ts | 4 +- ts/receiver/receiver.ts | 45 +++++++- ts/session/sending/MessageQueueInterface.ts | 1 + 8 files changed, 48 insertions(+), 137 deletions(-) diff --git a/js/background.js b/js/background.js index 0bf798b24..26e72a198 100644 --- a/js/background.js +++ b/js/background.js @@ -1580,7 +1580,6 @@ // messageReceiver.addEventListener('typing', onTyping); window.Signal.AttachmentDownloads.start({ - getMessageReceiver: () => messageReceiver, logger: window.log, }); diff --git a/js/modules/attachment_downloads.js b/js/modules/attachment_downloads.js index 64fcf1426..7518fc4ee 100644 --- a/js/modules/attachment_downloads.js +++ b/js/modules/attachment_downloads.js @@ -34,17 +34,11 @@ const RETRY_BACKOFF = { let enabled = false; let timeout; -let getMessageReceiver; let logger; const _activeAttachmentDownloadJobs = {}; async function start(options = {}) { - ({ getMessageReceiver, logger } = options); - if (!isFunction(getMessageReceiver)) { - throw new Error( - 'attachment_downloads/start: getMessageReceiver must be a function' - ); - } + ({ logger } = options); if (!logger) { throw new Error('attachment_downloads/start: logger must be provided!'); } @@ -162,10 +156,6 @@ async function _runJob(job) { await setAttachmentDownloadJobPending(id, pending); let downloaded; - const messageReceiver = getMessageReceiver(); - if (!messageReceiver) { - throw new Error('_runJob: messageReceiver not found'); - } try { downloaded = await NewReceiver.downloadAttachment(attachment); diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index 14fb81d76..4c0f198b6 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -38,7 +38,7 @@ function MessageReceiver(username, password, signalingKey) { // bind events lokiPublicChatAPI.on( 'publicMessage', - this.handleUnencryptedMessage.bind(this) + window.NewReceiver.handleUnencryptedMessage ); openGroupBound = true; } @@ -80,48 +80,13 @@ MessageReceiver.prototype.extend({ } // set up pollers for any RSS feeds feeds.forEach(feed => { - feed.on('rssMessage', this.handleUnencryptedMessage.bind(this)); + feed.on('rssMessage', window.NewReceiver.handleUnencryptedMessage); }); // Ensures that an immediate 'empty' event from the websocket will fire only after // all cached envelopes are processed. this.incoming = [this.pending]; }, - async handleUnencryptedMessage({ message }) { - const isMe = message.source === textsecure.storage.user.getNumber(); - if (!isMe && message.message.profile) { - const conversation = await window.ConversationController.getOrCreateAndWait( - message.source, - 'private' - ); - await window.NewReceiver.updateProfile( - conversation, - message.message.profile, - message.message.profileKey - ); - } - - const ourNumber = textsecure.storage.user.getNumber(); - const primaryDevice = window.storage.get('primaryDevicePubKey'); - const isOurDevice = - message.source && - (message.source === ourNumber || message.source === primaryDevice); - const isPublicChatMessage = - message.message.group && - message.message.group.id && - !!message.message.group.id.match(/^publicChat:/); - let ev; - - if (isPublicChatMessage && isOurDevice) { - // Public chat messages from ourselves should be outgoing - ev = new Event('sent'); - } else { - ev = new Event('message'); - } - ev.confirm = function confirmTerm() {}; - ev.data = message; - this.dispatchAndWait(ev); - }, stopProcessing() { window.log.info('MessageReceiver: stopProcessing requested'); this.stoppingProcessing = true; @@ -145,15 +110,6 @@ MessageReceiver.prototype.extend({ onerror() { window.log.error('websocket error'); }, - dispatchAndWait(event) { - const promise = this.appPromise || Promise.resolve(); - const appJobPromise = Promise.all(this.dispatchEvent(event)); - const job = () => appJobPromise; - - this.appPromise = promise.then(job, job); - - return Promise.resolve(); - }, onclose(ev) { window.log.info( 'websocket closed', @@ -163,40 +119,6 @@ MessageReceiver.prototype.extend({ this.calledClose ); }, - - onEmpty() { - const { incoming } = this; - this.incoming = []; - - const emitEmpty = () => { - window.log.info("MessageReceiver: emitting 'empty' event"); - const ev = new Event('empty'); - this.dispatchAndWait(ev); - }; - - const waitForApplication = async () => { - window.log.info( - "MessageReceiver: finished processing messages after 'empty', now waiting for application" - ); - const promise = this.appPromise || Promise.resolve(); - this.appPromise = Promise.resolve(); - - // We don't await here because we don't this to gate future message processing - promise.then(emitEmpty, emitEmpty); - }; - - const waitForEmptyQueue = () => { - // resetting count to zero so everything queued after this starts over again - this.count = 0; - - this.addToQueue(waitForApplication); - }; - - // We first wait for all recently-received messages (this.incoming) to be queued, - // then we queue a task to wait for the application to finish its processing, then - // finally we emit the 'empty' event to the queue. - Promise.all(incoming).then(waitForEmptyQueue, waitForEmptyQueue); - }, drain() { const { incoming } = this; this.incoming = []; @@ -219,42 +141,6 @@ MessageReceiver.prototype.extend({ } return -1; }, - unpad(paddedData) { - const paddedPlaintext = new Uint8Array(paddedData); - let plaintext; - - for (let i = paddedPlaintext.length - 1; i >= 0; i -= 1) { - if (paddedPlaintext[i] === 0x80) { - plaintext = new Uint8Array(i); - plaintext.set(paddedPlaintext.subarray(0, i)); - plaintext = plaintext.buffer; - break; - } else if (paddedPlaintext[i] !== 0x00) { - throw new Error('Invalid padding'); - } - } - - return plaintext; - }, - async decryptPreKeyWhisperMessage(ciphertext, sessionCipher, address) { - const padded = await sessionCipher.decryptPreKeyWhisperMessage(ciphertext); - - try { - return this.unpad(padded); - } catch (e) { - if (e.message === 'Unknown identity key') { - // create an error that the UI will pick up and ask the - // user if they want to re-negotiate - const buffer = dcodeIO.ByteBuffer.wrap(ciphertext); - throw new textsecure.IncomingIdentityKeyError( - address.toString(), - buffer.toArrayBuffer(), - e.identityKey - ); - } - throw e; - } - }, }); window.textsecure = window.textsecure || {}; diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index 77a086cb5..c17aebddc 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -34,7 +34,7 @@ async function decryptForMediumGroup( envelope: EnvelopePlus, ciphertextObj: ArrayBuffer ) { - const { textsecure, dcodeIO, libloki } = window; + const { dcodeIO, libloki } = window; const groupId = envelope.source; diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index 186381a09..893d0ca3d 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -185,7 +185,7 @@ export async function processDecrypted(envelope: EnvelopePlus, decrypted: any) { } if (decrypted.group) { - decrypted.group.id = decrypted.group.id?.toBinary(); + decrypted.group.id = new TextDecoder('utf-8').decode(decrypted.group.id); switch (decrypted.group.type) { case SignalService.GroupContext.Type.UPDATE: diff --git a/ts/receiver/queuedJob.ts b/ts/receiver/queuedJob.ts index 6d80899aa..0cdf57a9c 100644 --- a/ts/receiver/queuedJob.ts +++ b/ts/receiver/queuedJob.ts @@ -584,7 +584,9 @@ export async function handleMessageJob( conversation.notify(message); } - confirm(); + if (confirm) { + confirm(); + } } catch (error) { const errorForLog = error && error.stack ? error.stack : error; window.log.error( diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index f76c34770..e26f5fd57 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -241,12 +241,6 @@ async function queueCached(item: any) { if (decrypted) { const payloadPlaintext = StringUtils.encode(decrypted, 'base64'); - // Convert preKeys to array buffer - if (typeof envelope.preKeyBundleMessage === 'string') { - // envelope.preKeyBundleMessage = await MessageReceiver.stringToArrayBuffer( - // envelope.preKeyBundleMessage - // ); - } await queueDecryptedEnvelope(envelope, payloadPlaintext); } else { queueEnvelope(envelope); @@ -306,3 +300,42 @@ async function handleDecryptedEnvelope( await removeFromCache(envelope); } } + + +export async function handleUnencryptedMessage({message : outerMessage} : any) { + + const { source } = outerMessage; + const { group, profile, profileKey } = outerMessage.message; + + const ourNumber = window.textsecure.storage.user.getNumber(); + const isMe = source === ourNumber; + + if (!isMe && profile) { + const conversation = await window.ConversationController.getOrCreateAndWait( + source, + 'private' + ); + await updateProfile( + conversation, + profile, + profileKey + ); + } + + const primaryDevice = window.storage.get('primaryDevicePubKey'); + const isOurDevice = source && + (source === ourNumber || source === primaryDevice); + const isPublicChatMessage = + group && + group.id && + !!group.id.match(/^publicChat:/); + + const ev = { + // Public chat messages from ourselves should be outgoing + type: (isPublicChatMessage && isOurDevice) ? 'sent' : 'message', + data: outerMessage, + }; + + await handleMessageEvent(ev); + +} diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index 3a66616c5..3fb6df28a 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -21,4 +21,5 @@ export interface MessageQueueInterface { send(device: PubKey, message: ContentMessage): Promise; sendToGroup(message: GroupMessageType): Promise; sendSyncMessage(message: SyncMessage | undefined): Promise; + processPending(device: PubKey): Promise; }