diff --git a/js/background.js b/js/background.js index 72966ea42..a8c29c618 100644 --- a/js/background.js +++ b/js/background.js @@ -178,6 +178,8 @@ BackboneMessageCollection: Whisper.MessageCollection, numMessagesPerBatch: NUM_MESSAGES_PER_BATCH, upgradeMessageSchema, + getMessagesNeedingUpgrade: window.Signal.Data.getMessagesNeedingUpgrade, + saveMessage: window.Signal.Data.saveMessage, }); window.log.info('Upgrade message schema (with index):', batchWithIndex); isMigrationWithIndexComplete = batchWithIndex.done; @@ -907,31 +909,18 @@ createMessage: createSentMessage, }); - function isMessageDuplicate(message) { - return new Promise(resolve => { - const fetcher = new Whisper.Message(); - const options = { - index: { - name: 'unique', - value: [ - message.get('source'), - message.get('sourceDevice'), - message.get('sent_at'), - ], - }, - }; - - fetcher.fetch(options).always(() => { - if (fetcher.get('id')) { - return resolve(true); - } - - return resolve(false); + async function isMessageDuplicate(message) { + try { + const { attributes } = message; + const result = await window.Signal.Data.getMessageBySender(attributes, { + Message: Whisper.Message, }); - }).catch(error => { + + return Boolean(result); + } catch (error) { window.log.error('isMessageDuplicate error:', Errors.toLogFormat(error)); return false; - }); + } } function initIncomingMessage(data) { diff --git a/js/delivery_receipts.js b/js/delivery_receipts.js index e28b489b1..0d16d15df 100644 --- a/js/delivery_receipts.js +++ b/js/delivery_receipts.js @@ -27,74 +27,79 @@ this.remove(receipts); return receipts; }, - onReceipt(receipt) { - const messages = new Whisper.MessageCollection(); - return messages - .fetchSentAt(receipt.get('timestamp')) - .then(() => { - if (messages.length === 0) { - return null; - } - const message = messages.find( - item => - !item.isIncoming() && - receipt.get('source') === item.get('conversationId') - ); - if (message) { - return message; - } + async getTargetMessage(source, messages) { + if (messages.length === 0) { + return null; + } + const message = messages.find( + item => !item.isIncoming() && source === item.get('conversationId') + ); + if (message) { + return message; + } + + const groups = new Whisper.GroupCollection(); + await groups.fetchGroups(source); - const groups = new Whisper.GroupCollection(); - return groups.fetchGroups(receipt.get('source')).then(() => { - const ids = groups.pluck('id'); - ids.push(receipt.get('source')); - return messages.find( - item => - !item.isIncoming() && - _.contains(ids, item.get('conversationId')) - ); - }); - }) - .then(message => { - if (message) { - const deliveries = message.get('delivered') || 0; - const deliveredTo = message.get('delivered_to') || []; - return new Promise((resolve, reject) => { - message - .save({ - delivered_to: _.union(deliveredTo, [receipt.get('source')]), - delivered: deliveries + 1, - }) - .then(() => { - // notify frontend listeners - const conversation = ConversationController.get( - message.get('conversationId') - ); - if (conversation) { - conversation.trigger('delivered', message); - } + const ids = groups.pluck('id'); + ids.push(source); - this.remove(receipt); - resolve(); - }, reject); - }); - // TODO: consider keeping a list of numbers we've - // successfully delivered to? + return messages.find( + item => + !item.isIncoming() && _.contains(ids, item.get('conversationId')) + ); + }, + async onReceipt(receipt) { + try { + const messages = await window.Signal.Data.getMessagesBySentAt( + receipt.get('timestamp'), + { + MessageCollection: Whisper.MessageCollection, } + ); + + const message = await this.getTargetMessage( + receipt.get('source'), + messages + ); + if (!message) { window.log.info( 'No message for delivery receipt', receipt.get('source'), receipt.get('timestamp') ); + return; + } - return null; - }) - .catch(error => { - window.log.error( - 'DeliveryReceipts.onReceipt error:', - error && error.stack ? error.stack : error - ); + const deliveries = message.get('delivered') || 0; + const deliveredTo = message.get('delivered_to') || []; + + message.set({ + delivered_to: _.union(deliveredTo, [receipt.get('source')]), + delivered: deliveries + 1, + }); + + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, }); + // notify frontend listeners + const conversation = ConversationController.get( + message.get('conversationId') + ); + if (conversation) { + conversation.trigger('delivered', message); + } + + this.remove(receipt); + + // TODO: consider keeping a list of numbers we've + // successfully delivered to? + } catch (error) { + window.log.error( + 'DeliveryReceipts.onReceipt error:', + error && error.stack ? error.stack : error + ); + } }, }))(); })(); diff --git a/js/expiring_messages.js b/js/expiring_messages.js index d8b80e6db..34bab8f01 100644 --- a/js/expiring_messages.js +++ b/js/expiring_messages.js @@ -3,7 +3,6 @@ /* global i18n: false */ /* global moment: false */ /* global Whisper: false */ -/* global wrapDeferred: false */ // eslint-disable-next-line func-names (function() { @@ -11,57 +10,62 @@ window.Whisper = window.Whisper || {}; - function destroyExpiredMessages() { - // Load messages that have expired and destroy them - const expired = new Whisper.MessageCollection(); - expired.on('add', async message => { - window.log.info('Message expired', { - sentAt: message.get('sent_at'), - }); - const conversation = message.getConversation(); - if (conversation) { - conversation.trigger('expired', message); - } - - // We delete after the trigger to allow the conversation time to process - // the expiration before the message is removed from the database. - await wrapDeferred(message.destroy()); - if (conversation) { - conversation.updateLastMessage(); - } + async function destroyExpiredMessages() { + const messages = await window.Signal.Data.getExpiredMessages({ + MessageCollection: Whisper.MessageCollection, }); - expired.on('reset', throttledCheckExpiringMessages); - expired.fetchExpired(); + await Promise.all( + messages.map(async message => { + window.log.info('Message expired', { + sentAt: message.get('sent_at'), + }); + + // We delete after the trigger to allow the conversation time to process + // the expiration before the message is removed from the database. + await window.Signal.Data.removeMessage(message.id, { + Message: Whisper.Message, + }); + + const conversation = message.getConversation(); + if (conversation) { + conversation.trigger('expired', message); + } + }) + ); + + checkExpiringMessages(); } let timeout; - function checkExpiringMessages() { + async function checkExpiringMessages() { // Look up the next expiring message and set a timer to destroy it - const expiring = new Whisper.MessageCollection(); - expiring.once('add', next => { - const expiresAt = next.get('expires_at'); - window.log.info( - 'next message expires', - new Date(expiresAt).toISOString() - ); + const messages = await window.Signal.Data.getNextExpiringMessage({ + MessageCollection: Whisper.MessageCollection, + }); - let wait = expiresAt - Date.now(); + const next = messages.at(0); + if (!next) { + return; + } - // In the past - if (wait < 0) { - wait = 0; - } + const expiresAt = next.get('expires_at'); + window.log.info('next message expires', new Date(expiresAt).toISOString()); - // Too far in the future, since it's limited to a 32-bit value - if (wait > 2147483647) { - wait = 2147483647; - } + let wait = expiresAt - Date.now(); - clearTimeout(timeout); - timeout = setTimeout(destroyExpiredMessages, wait); - }); - expiring.fetchNextExpiring(); + // In the past + if (wait < 0) { + wait = 0; + } + + // Too far in the future, since it's limited to a 32-bit value + if (wait > 2147483647) { + wait = 2147483647; + } + + clearTimeout(timeout); + timeout = setTimeout(destroyExpiredMessages, wait); } const throttledCheckExpiringMessages = _.throttle( checkExpiringMessages, diff --git a/js/models/conversations.js b/js/models/conversations.js index c3e57e6e6..b5766a723 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -131,31 +131,23 @@ this.on('destroy', this.revokeAvatarUrl); // Listening for out-of-band data updates - this.on('newmessage', this.addSingleMessage); - this.on('delivered', this.updateMessage); - this.on('read', this.updateMessage); + this.on('delivered', this.updateLastMessage); + this.on('read', this.updateLastMessage); this.on('sent', this.updateLastMessage); this.on('expired', this.onExpired); - this.listenTo( - this.messageCollection, - 'expired', - this.onExpiredCollection - ); }, isMe() { return this.id === this.ourNumber; }, - onExpired(message) { - const mine = this.messageCollection.get(message.id); - if (mine && mine.cid !== message.cid) { - mine.trigger('expired', mine); - } - }, - - async onExpiredCollection(message) { + async onExpired(message) { const removeMessage = () => { + const existing = this.messageCollection.get(message.id); + if (!existing) { + return; + } + window.log.info('Remove expired message from collection', { sentAt: message.get('sent_at'), }); @@ -168,13 +160,8 @@ await this.inProgressFetch; removeMessage(); - }, - // Used to update existing messages when updated from out-of-band db access, - // like read and delivery receipts. - updateMessage(message) { this.updateLastMessage(); - this.messageCollection.add(message, { merge: true }); }, addSingleMessage(message) { @@ -420,10 +407,15 @@ messages.length, 'messages to process' ); - const safeDelete = message => - new Promise(resolve => { - message.destroy().always(resolve); - }); + const safeDelete = async message => { + try { + window.Signal.Data.removeMessage(message.id, { + Message: Whisper.Message, + }); + } catch (error) { + // nothing + } + }; const promise = this.getIdentityKeys(); return promise @@ -585,26 +577,37 @@ return this.setVerified(); }, - addKeyChange(id) { + async addKeyChange(keyChangedId) { window.log.info( 'adding key change advisory for', this.idForLogging(), - id, + keyChangedId, this.get('timestamp') ); const timestamp = Date.now(); - const message = new Whisper.Message({ + const message = { conversationId: this.id, type: 'keychange', sent_at: this.get('timestamp'), received_at: timestamp, - key_changed: id, + key_changed: keyChangedId, unread: 1, + }; + + const id = await window.Signal.Data.saveMessage(message, { + Message: Whisper.Message, }); - message.save().then(this.trigger.bind(this, 'newmessage', message)); + + this.trigger( + 'newmessage', + new Whisper.Message({ + ...message, + id, + }) + ); }, - addVerifiedChange(id, verified, providedOptions) { + async addVerifiedChange(verifiedChangeId, verified, providedOptions) { const options = providedOptions || {}; _.defaults(options, { local: true }); @@ -620,22 +623,33 @@ window.log.info( 'adding verified change advisory for', this.idForLogging(), - id, + verifiedChangeId, lastMessage ); const timestamp = Date.now(); - const message = new Whisper.Message({ + const message = { conversationId: this.id, type: 'verified-change', sent_at: lastMessage, received_at: timestamp, - verifiedChanged: id, + verifiedChanged: verifiedChangeId, verified, local: options.local, unread: 1, + }; + + const id = await window.Signal.Data.saveMessage(message, { + Message: Whisper.Message, }); - message.save().then(this.trigger.bind(this, 'newmessage', message)); + + this.trigger( + 'newmessage', + new Whisper.Message({ + ...message, + id, + }) + ); if (this.isPrivate()) { ConversationController.getAllGroupsInvolvingId(id).then(groups => { @@ -646,9 +660,13 @@ } }, - onReadMessage(message, readAt) { - if (this.messageCollection.get(message.id)) { - this.messageCollection.get(message.id).fetch(); + async onReadMessage(message, readAt) { + const existing = this.messageCollection.get(message.id); + if (existing) { + const fetched = await window.Signal.Data.getMessageById(existing.id, { + Message: Whisper.Message, + }); + existing.merge(fetched); } // We mark as read everything older than this message - to clean up old stuff @@ -671,22 +689,9 @@ }, getUnread() { - const conversationId = this.id; - const unreadMessages = new Whisper.MessageCollection(); - return new Promise(resolve => - unreadMessages - .fetch({ - index: { - // 'unread' index - name: 'unread', - lower: [conversationId], - upper: [conversationId, Number.MAX_VALUE], - }, - }) - .always(() => { - resolve(unreadMessages); - }) - ); + return window.Signal.Data.getUnreadByConversation(this.id, { + MessageCollection: Whisper.MessageCollection, + }); }, validate(attributes) { @@ -844,13 +849,8 @@ expireTimer, recipients, }); - const message = this.addSingleMessage(messageWithSchema); - - if (this.isPrivate()) { - message.set({ destination }); - } - message.save(); + const message = this.addSingleMessage(messageWithSchema); this.save({ active_at: now, timestamp: now, @@ -858,6 +858,15 @@ lastMessageStatus: 'sending', }); + if (this.isPrivate()) { + message.set({ destination }); + } + + const id = await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + }); + message.set({ id }); + const conversationType = this.get('type'); const sendFunction = (() => { switch (conversationType) { @@ -890,9 +899,15 @@ }, async updateLastMessage() { - const collection = new Whisper.MessageCollection(); - await collection.fetchConversation(this.id, 1); - const lastMessage = collection.at(0); + const messages = await window.Signal.Data.getMessagesByConversation( + this.id, + { limit: 1, MessageCollection: Whisper.MessageCollection } + ); + if (!messages.length) { + return; + } + + const lastMessage = messages.at(0); const lastMessageJSON = lastMessage ? lastMessage.toJSON() : null; const lastMessageStatus = lastMessage @@ -972,7 +987,9 @@ } return Promise.all([ - wrapDeferred(message.save()), + window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + }), wrapDeferred(this.save({ expireTimer })), ]).then(() => { // if change was made remotely, don't send it to the number/group @@ -1347,8 +1364,9 @@ return false; } - const collection = new Whisper.MessageCollection(); - await collection.fetchSentAt(id); + const collection = await window.Signal.Data.getMessagesBySentAt(id, { + MessageCollection: Whisper.MessageCollection, + }); const queryMessage = collection.find(m => this.doesMessageMatch(id, author, m) ); @@ -1365,7 +1383,9 @@ queryMessage.attributes ); queryMessage.set(upgradedMessage); - await wrapDeferred(message.save()); + await window.Signal.Data.saveMessage(upgradedMessage, { + Message: Whisper.Message, + }); } } catch (error) { window.log.error( @@ -1525,7 +1545,9 @@ const upgradedMessage = await upgradeMessageSchema(attributes); message.set(upgradedMessage); // eslint-disable-next-line no-await-in-loop - await wrapDeferred(message.save()); + await window.Signal.Data.saveMessage(upgradedMessage, { + Message: Whisper.Message, + }); } } }, @@ -1541,7 +1563,7 @@ this.inProgressFetch = this.messageCollection.fetchConversation( this.id, - null, + undefined, this.get('unreadCount') ); @@ -1593,29 +1615,11 @@ }, async destroyMessages() { - let loaded; - do { - // Yes, we really want the await in the loop. We're deleting 100 at a - // time so we don't use too much memory. - // eslint-disable-next-line no-await-in-loop - await wrapDeferred( - this.messageCollection.fetch({ - limit: 100, - index: { - // 'conversation' index on [conversationId, received_at] - name: 'conversation', - lower: [this.id], - upper: [this.id, Number.MAX_VALUE], - }, - }) - ); + await window.Signal.Data.removeAllMessagesInConversation(this.id, { + MessageCollection: Whisper.MessageCollection, + }); - loaded = this.messageCollection.models; - this.messageCollection.reset([]); - _.each(loaded, message => { - message.destroy(); - }); - } while (loaded.length > 0); + this.messageCollection.reset([]); this.save({ lastMessage: null, @@ -1809,15 +1813,7 @@ destroyAll() { return Promise.all( - this.models.map( - m => - new Promise((resolve, reject) => { - m - .destroy() - .then(resolve) - .fail(reject); - }) - ) + this.models.map(conversation => wrapDeferred(conversation.destroy())) ); }, diff --git a/js/models/messages.js b/js/models/messages.js index 312a8db15..f67164451 100644 --- a/js/models/messages.js +++ b/js/models/messages.js @@ -112,30 +112,19 @@ isUnread() { return !!this.get('unread'); }, - // overriding this to allow for this.unset('unread'), save to db, then fetch() - // to propagate. We don't want the unset key in the db so our unread index stays - // small. - /* eslint-disable */ - fetch(options) { - options = options ? _.clone(options) : {}; - if (options.parse === void 0) options.parse = true; - const model = this; - const success = options.success; - options.success = function(resp) { - model.attributes = {}; // this is the only changed line - if (!model.set(model.parse(resp, options), options)) return false; - if (success) success(model, resp, options); - model.trigger('sync', model, resp, options); - }; - const error = options.error; - options.error = function(resp) { - if (error) error(model, resp, options); - model.trigger('error', model, resp, options); - }; - return this.sync('read', this, options); + // Important to allow for this.unset('unread'), save to db, then fetch() + // to propagate. We don't want the unset key in the db so our unread index + // stays small. + merge(model) { + const attributes = model.attributes || model; + + const { unread } = attributes; + if (typeof unread === 'undefined') { + this.unset('unread'); + } + + this.set(attributes); }, - /* eslint-enable */ - /* eslint-disable more/no-then */ getNameForNumber(number) { const conversation = ConversationController.get(number); if (!conversation) { @@ -728,18 +717,22 @@ send(promise) { this.trigger('pending'); return promise - .then(result => { + .then(async result => { const now = Date.now(); this.trigger('done'); if (result.dataMessage) { this.set({ dataMessage: result.dataMessage }); } const sentTo = this.get('sent_to') || []; - this.save({ + this.set({ sent_to: _.union(sentTo, result.successfulNumbers), sent: true, expirationStartTimestamp: now, }); + + await window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); this.trigger('sent', this); this.sendSyncMessage(); }) @@ -821,12 +814,18 @@ this.get('expirationStartTimestamp') ) .then(() => { - this.save({ synced: true, dataMessage: null }); + this.set({ + synced: true, + dataMessage: null, + }); + return window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); }); }); }, - saveErrors(providedErrors) { + async saveErrors(providedErrors) { let errors = providedErrors; if (!(errors instanceof Array)) { @@ -851,7 +850,10 @@ }); errors = errors.concat(this.get('errors') || []); - return this.save({ errors }); + this.set({ errors }); + await window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); }, hasNetworkError() { @@ -911,290 +913,283 @@ const GROUP_TYPES = textsecure.protobuf.GroupContext.Type; const conversation = ConversationController.get(conversationId); - return conversation.queueJob( - () => - new Promise(resolve => { - const now = new Date().getTime(); - let attributes = { type: 'private' }; - if (dataMessage.group) { - let groupUpdate = null; + return conversation.queueJob(async () => { + try { + const now = new Date().getTime(); + let attributes = { type: 'private' }; + if (dataMessage.group) { + let groupUpdate = null; + attributes = { + type: 'group', + groupId: dataMessage.group.id, + }; + if (dataMessage.group.type === GROUP_TYPES.UPDATE) { attributes = { type: 'group', groupId: dataMessage.group.id, - }; - if (dataMessage.group.type === GROUP_TYPES.UPDATE) { - attributes = { - type: 'group', - groupId: dataMessage.group.id, - name: dataMessage.group.name, - avatar: dataMessage.group.avatar, - members: _.union( - dataMessage.group.members, - conversation.get('members') - ), - }; - groupUpdate = - conversation.changedAttributes( - _.pick(dataMessage.group, 'name', 'avatar') - ) || {}; - const difference = _.difference( - attributes.members, + name: dataMessage.group.name, + avatar: dataMessage.group.avatar, + members: _.union( + dataMessage.group.members, conversation.get('members') - ); - if (difference.length > 0) { - groupUpdate.joined = difference; - } - if (conversation.get('left')) { - window.log.warn('re-added to a left group'); - attributes.left = false; - } - } else if (dataMessage.group.type === GROUP_TYPES.QUIT) { - if (source === textsecure.storage.user.getNumber()) { - attributes.left = true; - groupUpdate = { left: 'You' }; - } else { - groupUpdate = { left: source }; - } - attributes.members = _.without( - conversation.get('members'), - source - ); + ), + }; + groupUpdate = + conversation.changedAttributes( + _.pick(dataMessage.group, 'name', 'avatar') + ) || {}; + const difference = _.difference( + attributes.members, + conversation.get('members') + ); + if (difference.length > 0) { + groupUpdate.joined = difference; } - - if (groupUpdate !== null) { - message.set({ group_update: groupUpdate }); + if (conversation.get('left')) { + window.log.warn('re-added to a left group'); + attributes.left = false; } - } - message.set({ - attachments: dataMessage.attachments, - body: dataMessage.body, - contact: dataMessage.contact, - conversationId: conversation.id, - decrypted_at: now, - errors: [], - flags: dataMessage.flags, - hasAttachments: dataMessage.hasAttachments, - hasFileAttachments: dataMessage.hasFileAttachments, - hasVisualMediaAttachments: dataMessage.hasVisualMediaAttachments, - quote: dataMessage.quote, - schemaVersion: dataMessage.schemaVersion, - }); - if (type === 'outgoing') { - const receipts = Whisper.DeliveryReceipts.forMessage( - conversation, - message - ); - receipts.forEach(() => - message.set({ - delivered: (message.get('delivered') || 0) + 1, - }) + } else if (dataMessage.group.type === GROUP_TYPES.QUIT) { + if (source === textsecure.storage.user.getNumber()) { + attributes.left = true; + groupUpdate = { left: 'You' }; + } else { + groupUpdate = { left: source }; + } + attributes.members = _.without( + conversation.get('members'), + source ); } - attributes.active_at = now; - conversation.set(attributes); - if (message.isExpirationTimerUpdate()) { - message.set({ - expirationTimerUpdate: { - source, - expireTimer: dataMessage.expireTimer, - }, - }); - conversation.set({ expireTimer: dataMessage.expireTimer }); - } else if (dataMessage.expireTimer) { - message.set({ expireTimer: dataMessage.expireTimer }); + if (groupUpdate !== null) { + message.set({ group_update: groupUpdate }); } + } + message.set({ + attachments: dataMessage.attachments, + body: dataMessage.body, + contact: dataMessage.contact, + conversationId: conversation.id, + decrypted_at: now, + errors: [], + flags: dataMessage.flags, + hasAttachments: dataMessage.hasAttachments, + hasFileAttachments: dataMessage.hasFileAttachments, + hasVisualMediaAttachments: dataMessage.hasVisualMediaAttachments, + quote: dataMessage.quote, + schemaVersion: dataMessage.schemaVersion, + }); + if (type === 'outgoing') { + const receipts = Whisper.DeliveryReceipts.forMessage( + conversation, + message + ); + receipts.forEach(() => + message.set({ + delivered: (message.get('delivered') || 0) + 1, + }) + ); + } + attributes.active_at = now; + conversation.set(attributes); - // NOTE: Remove once the above uses - // `Conversation::updateExpirationTimer`: - const { expireTimer } = dataMessage; - const shouldLogExpireTimerChange = - message.isExpirationTimerUpdate() || expireTimer; - if (shouldLogExpireTimerChange) { - window.log.info("Update conversation 'expireTimer'", { - id: conversation.idForLogging(), - expireTimer, - source: 'handleDataMessage', - }); - } + if (message.isExpirationTimerUpdate()) { + message.set({ + expirationTimerUpdate: { + source, + expireTimer: dataMessage.expireTimer, + }, + }); + conversation.set({ expireTimer: dataMessage.expireTimer }); + } else if (dataMessage.expireTimer) { + message.set({ expireTimer: dataMessage.expireTimer }); + } - if (!message.isEndSession()) { - if (dataMessage.expireTimer) { - if ( - dataMessage.expireTimer !== conversation.get('expireTimer') - ) { - conversation.updateExpirationTimer( - dataMessage.expireTimer, - source, - message.get('received_at'), - { - fromGroupUpdate: message.isGroupUpdate(), - } - ); - } - } else if ( - conversation.get('expireTimer') && - // We only turn off timers if it's not a group update - !message.isGroupUpdate() - ) { + // NOTE: Remove once the above uses + // `Conversation::updateExpirationTimer`: + const { expireTimer } = dataMessage; + const shouldLogExpireTimerChange = + message.isExpirationTimerUpdate() || expireTimer; + if (shouldLogExpireTimerChange) { + window.log.info("Update conversation 'expireTimer'", { + id: conversation.idForLogging(), + expireTimer, + source: 'handleDataMessage', + }); + } + + if (!message.isEndSession()) { + if (dataMessage.expireTimer) { + if (dataMessage.expireTimer !== conversation.get('expireTimer')) { conversation.updateExpirationTimer( - null, + dataMessage.expireTimer, source, - message.get('received_at') + message.get('received_at'), + { + fromGroupUpdate: message.isGroupUpdate(), + } ); } + } else if ( + conversation.get('expireTimer') && + // We only turn off timers if it's not a group update + !message.isGroupUpdate() + ) { + conversation.updateExpirationTimer( + null, + source, + message.get('received_at') + ); } - if (type === 'incoming') { - const readSync = Whisper.ReadSyncs.forMessage(message); - if (readSync) { - if ( - message.get('expireTimer') && - !message.get('expirationStartTimestamp') - ) { - message.set( - 'expirationStartTimestamp', - Math.min(readSync.get('read_at'), Date.now()) - ); - } - } - if (readSync || message.isExpirationTimerUpdate()) { - message.unset('unread'); - // This is primarily to allow the conversation to mark all older - // messages as read, as is done when we receive a read sync for - // a message we already know about. - Whisper.ReadSyncs.notifyConversation(message); - } else { - conversation.set( - 'unreadCount', - conversation.get('unreadCount') + 1 + } + if (type === 'incoming') { + const readSync = Whisper.ReadSyncs.forMessage(message); + if (readSync) { + if ( + message.get('expireTimer') && + !message.get('expirationStartTimestamp') + ) { + message.set( + 'expirationStartTimestamp', + Math.min(readSync.get('read_at'), Date.now()) ); } } - - if (type === 'outgoing') { - const reads = Whisper.ReadReceipts.forMessage( - conversation, - message + if (readSync || message.isExpirationTimerUpdate()) { + message.unset('unread'); + // This is primarily to allow the conversation to mark all older + // messages as read, as is done when we receive a read sync for + // a message we already know about. + Whisper.ReadSyncs.notifyConversation(message); + } else { + conversation.set( + 'unreadCount', + conversation.get('unreadCount') + 1 ); - if (reads.length) { - const readBy = reads.map(receipt => receipt.get('reader')); - message.set({ - read_by: _.union(message.get('read_by'), readBy), - }); - } - - message.set({ recipients: conversation.getRecipients() }); } + } - const conversationTimestamp = conversation.get('timestamp'); - if ( - !conversationTimestamp || - message.get('sent_at') > conversationTimestamp - ) { - conversation.set({ - lastMessage: message.getNotificationText(), - timestamp: message.get('sent_at'), + if (type === 'outgoing') { + const reads = Whisper.ReadReceipts.forMessage( + conversation, + message + ); + if (reads.length) { + const readBy = reads.map(receipt => receipt.get('reader')); + message.set({ + read_by: _.union(message.get('read_by'), readBy), }); } - if (dataMessage.profileKey) { - const profileKey = dataMessage.profileKey.toArrayBuffer(); - if (source === textsecure.storage.user.getNumber()) { - conversation.set({ profileSharing: true }); - } else if (conversation.isPrivate()) { - conversation.set({ profileKey }); - } else { - ConversationController.getOrCreateAndWait( - source, - 'private' - ).then(sender => { + message.set({ recipients: conversation.getRecipients() }); + } + + const conversationTimestamp = conversation.get('timestamp'); + if ( + !conversationTimestamp || + message.get('sent_at') > conversationTimestamp + ) { + conversation.set({ + lastMessage: message.getNotificationText(), + timestamp: message.get('sent_at'), + }); + } + + if (dataMessage.profileKey) { + const profileKey = dataMessage.profileKey.toArrayBuffer(); + if (source === textsecure.storage.user.getNumber()) { + conversation.set({ profileSharing: true }); + } else if (conversation.isPrivate()) { + conversation.set({ profileKey }); + } else { + ConversationController.getOrCreateAndWait(source, 'private').then( + sender => { sender.setProfileKey(profileKey); - }); - } + } + ); } + } + + const id = await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + }); + message.set({ id }); + + await wrapDeferred(conversation.save()); + + conversation.trigger('newmessage', message); + + try { + // We fetch() here because, between the message.save() above and + // the previous line's trigger() call, we might have marked all + // messages unread in the database. This message might already + // be read! + const fetched = await window.Signal.Data.getMessageById( + message.get('id'), + { + Message: Whisper.Message, + } + ); + const previousUnread = message.get('unread'); - const handleError = error => { - const errorForLog = error && error.stack ? error.stack : error; - window.log.error( - 'handleDataMessage', - message.idForLogging(), - 'error:', - errorForLog + // Important to update message with latest read state from database + message.merge(fetched); + + if (previousUnread !== message.get('unread')) { + window.log.warn( + 'Caught race condition on new message read state! ' + + 'Manually starting timers.' ); - return resolve(); - }; + // We call markRead() even though the message is already + // marked read because we need to start expiration + // timers, etc. + message.markRead(); + } + } catch (error) { + window.log.warn( + 'handleDataMessage: Message', + message.idForLogging(), + 'was deleted' + ); + } - message.save().then(() => { - conversation.save().then(() => { - try { - conversation.trigger('newmessage', message); - } catch (e) { - return handleError(e); - } - // We fetch() here because, between the message.save() above and - // the previous line's trigger() call, we might have marked all - // messages unread in the database. This message might already - // be read! - const previousUnread = message.get('unread'); - return message.fetch().then( - () => { - try { - if (previousUnread !== message.get('unread')) { - window.log.warn( - 'Caught race condition on new message read state! ' + - 'Manually starting timers.' - ); - // We call markRead() even though the message is already - // marked read because we need to start expiration - // timers, etc. - message.markRead(); - } - - if (message.get('unread')) { - return conversation.notify(message).then(() => { - confirm(); - return resolve(); - }, handleError); - } - - confirm(); - return resolve(); - } catch (e) { - return handleError(e); - } - }, - () => { - try { - window.log.warn( - 'handleDataMessage: Message', - message.idForLogging(), - 'was deleted' - ); - - confirm(); - return resolve(); - } catch (e) { - return handleError(e); - } - } - ); - }, handleError); - }, handleError); - }) - ); + if (message.get('unread')) { + await conversation.notify(message); + } + + confirm(); + } catch (error) { + const errorForLog = error && error.stack ? error.stack : error; + window.log.error( + 'handleDataMessage', + message.idForLogging(), + 'error:', + errorForLog + ); + } + }); }, async markRead(readAt) { this.unset('unread'); + if (this.get('expireTimer') && !this.get('expirationStartTimestamp')) { - const expireTimerStart = Math.min(Date.now(), readAt || Date.now()); - this.set('expirationStartTimestamp', expireTimerStart); + const expirationStartTimestamp = Math.min( + Date.now(), + readAt || Date.now() + ); + this.set({ expirationStartTimestamp }); } + Whisper.Notifications.remove( Whisper.Notifications.where({ messageId: this.id, }) ); - return wrapDeferred(this.save()); + + await window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); }, isExpiring() { return this.get('expireTimer') && this.get('expirationStartTimestamp'); @@ -1215,19 +1210,17 @@ } return msFromNow; }, - setToExpire() { + async setToExpire() { if (this.isExpiring() && !this.get('expires_at')) { const start = this.get('expirationStartTimestamp'); const delta = this.get('expireTimer') * 1000; const expiresAt = start + delta; - // This method can be called due to the expiration-related .set() calls in - // handleDataMessage(), but the .save() here would conflict with the - // same call at the end of handleDataMessage(). So we only call .save() - // here if we've previously saved this model. - if (!this.isNew()) { - this.save('expires_at', expiresAt); - } + this.set({ expires_at: expiresAt }); + const id = await window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); + this.set({ id }); Whisper.ExpiringMessagesListener.update(); window.log.info('Set message expiration', { @@ -1254,30 +1247,15 @@ this.conversation = options.conversation; } }, - destroyAll() { - return Promise.all( - this.models.map( - m => - new Promise((resolve, reject) => { - m - .destroy() - .then(resolve) - .fail(reject); - }) + async destroyAll() { + await Promise.all( + this.models.map(message => + window.Signal.Data.removeMessage(message.id, { + Message: Whisper.Message, + }) ) ); - }, - - fetchSentAt(timestamp) { - return new Promise(resolve => - this.fetch({ - index: { - // 'receipt' index on sent_at - name: 'receipt', - only: timestamp, - }, - }).always(resolve) - ); + this.reset([]); }, getLoadedUnreadCount() { @@ -1287,73 +1265,41 @@ }, 0); }, - fetchConversation(conversationId, providedLimit, providedUnreadCount) { - let limit = providedLimit; - let unreadCount = providedUnreadCount; - - if (typeof limit !== 'number') { - limit = 100; - } - if (typeof unreadCount !== 'number') { - unreadCount = 0; - } - - let startingLoadedUnread = 0; - if (unreadCount > 0) { - startingLoadedUnread = this.getLoadedUnreadCount(); - } - return new Promise(resolve => { - let upper; - if (this.length === 0) { - // fetch the most recent messages first - upper = Number.MAX_VALUE; - } else { - // not our first rodeo, fetch older messages. - upper = this.at(0).get('received_at'); - } - const options = { remove: false, limit }; - options.index = { - // 'conversation' index on [conversationId, received_at] - name: 'conversation', - lower: [conversationId], - upper: [conversationId, upper], - order: 'desc', - // SELECT messages WHERE conversationId = this.id ORDER - // received_at DESC - }; - this.fetch(options).always(resolve); - }).then(() => { - if (unreadCount <= 0) { - return Promise.resolve(); - } + async fetchConversation(conversationId, limit = 100, unreadCount = 0) { + const startingLoadedUnread = + unreadCount > 0 ? this.getLoadedUnreadCount() : 0; - const loadedUnread = this.getLoadedUnreadCount(); - if (loadedUnread >= unreadCount) { - return Promise.resolve(); - } + // We look for older messages if we've fetched once already + const receivedAt = + this.length === 0 ? Number.MAX_VALUE : this.at(0).get('received_at'); - if (startingLoadedUnread === loadedUnread) { - // that fetch didn't get us any more unread. stop fetching more. - return Promise.resolve(); + const messages = await window.Signal.Data.getMessagesByConversation( + conversationId, + { + limit, + receivedAt, + MessageCollection: Whisper.MessageCollection, } + ); - window.log.info( - 'fetchConversation: doing another fetch to get all unread' - ); - return this.fetchConversation(conversationId, limit, unreadCount); - }); - }, + this.add(messages.models); - fetchNextExpiring() { - this.fetch({ index: { name: 'expires_at' }, limit: 1 }); - }, + if (unreadCount <= 0) { + return; + } + const loadedUnread = this.getLoadedUnreadCount(); + if (loadedUnread >= unreadCount) { + return; + } + if (startingLoadedUnread === loadedUnread) { + // that fetch didn't get us any more unread. stop fetching more. + return; + } - fetchExpired() { - window.log.info('Load expired messages'); - this.fetch({ - conditions: { expires_at: { $lte: Date.now() } }, - addIndividually: true, - }); + window.log.info( + 'fetchConversation: doing another fetch to get all unread' + ); + await this.fetchConversation(conversationId, limit, unreadCount); }, }); })(); diff --git a/js/modules/backup.js b/js/modules/backup.js index 2037d3d3c..bc44b631e 100644 --- a/js/modules/backup.js +++ b/js/modules/backup.js @@ -674,6 +674,7 @@ async function exportConversation(db, conversation, options) { const writer = await createFileAndWriter(dir, 'messages.json'); return new Promise(async (resolve, reject) => { + // TODO: need to iterate through message ids, export using window.Signal.Data const transaction = db.transaction('messages', 'readwrite'); transaction.onerror = () => { Whisper.Database.handleDOMException( @@ -980,6 +981,8 @@ async function loadAttachments(dir, getName, options) { }) ); + // TODO: Handle video screenshots, and image/video thumbnails + window.log.info('loadAttachments', { message }); } @@ -989,63 +992,37 @@ function saveMessage(db, message) { async function saveAllMessages(db, rawMessages) { if (rawMessages.length === 0) { - return Promise.resolve(); + return; } - const { writeMessageAttachments, upgradeMessageSchema } = Signal.Migrations; - const importAndUpgrade = async message => - upgradeMessageSchema(await writeMessageAttachments(message)); - - const messages = await Promise.all(rawMessages.map(importAndUpgrade)); - - return new Promise((resolve, reject) => { - let finished = false; - const finish = via => { - window.log.info('messages done saving via', via); - if (finished) { - resolve(); - } - finished = true; - }; + try { + const { writeMessageAttachments, upgradeMessageSchema } = Signal.Migrations; + const importAndUpgrade = async message => + upgradeMessageSchema(await writeMessageAttachments(message)); - const transaction = db.transaction('messages', 'readwrite'); - transaction.onerror = () => { - Whisper.Database.handleDOMException( - 'saveAllMessages transaction error', - transaction.error, - reject - ); - }; - transaction.oncomplete = finish.bind(null, 'transaction complete'); + const messages = await Promise.all(rawMessages.map(importAndUpgrade)); - const store = transaction.objectStore('messages'); const { conversationId } = messages[0]; - let count = 0; - _.forEach(messages, message => { - const request = store.put(message, message.id); - request.onsuccess = () => { - count += 1; - if (count === messages.length) { - window.log.info( - 'Saved', - messages.length, - 'messages for conversation', - // Don't know if group or private conversation, so we blindly redact - `[REDACTED]${conversationId.slice(-3)}` - ); - finish('puts scheduled'); - } - }; - request.onerror = () => { - Whisper.Database.handleDOMException( - 'saveAllMessages request error', - request.error, - reject - ); - }; - }); - }); + for (let index = 0, max = messages.length; index < max; index += 1) { + // Yes, we really want to do these in order + // eslint-disable-next-line no-await-in-loop + await window.Signal.Data.saveMessage(messages[index]); + } + + window.log.info( + 'Saved', + messages.length, + 'messages for conversation', + // Don't know if group or private conversation, so we blindly redact + `[REDACTED]${conversationId.slice(-3)}` + ); + } catch (error) { + window.log.error( + 'saveAllMessages error', + error && error.message ? error.message : error + ); + } } // To reduce the memory impact of attachments, we make individual saves to the @@ -1095,8 +1072,9 @@ async function importConversation(db, dir, options) { message.quote && message.quote.attachments && message.quote.attachments.length > 0; + const hasContacts = message.contact && message.contact.length; - if (hasAttachments || hasQuotedAttachments) { + if (hasAttachments || hasQuotedAttachments || hasContacts) { const importMessage = async () => { const getName = attachmentsDir ? _getAnonymousAttachmentFileName @@ -1163,7 +1141,11 @@ function getMessageKey(message) { return `${source}.${sourceDevice} ${message.timestamp}`; } function loadMessagesLookup(db) { - return assembleLookup(db, 'messages', getMessageKey); + return window.Signal.Data.getAllMessageIds({ + db, + getMessageKey, + handleDOMException: Whisper.Database.handleDOMException, + }); } function getConversationKey(conversation) { diff --git a/js/modules/data.js b/js/modules/data.js new file mode 100644 index 000000000..540d99f0b --- /dev/null +++ b/js/modules/data.js @@ -0,0 +1,349 @@ +/* global window */ + +const { deferredToPromise } = require('./deferred_to_promise'); +const MessageType = require('./types/message'); + +// calls to search for: +// .fetch( +// .save( +// .destroy( + +async function saveMessage(data, { Message }) { + const message = new Message(data); + await deferredToPromise(message.save()); + return message.id; +} + +async function removeMessage(id, { Message }) { + const message = await getMessageById(id, { Message }); + // Note: It's important to have a fully database-hydrated model to delete here because + // it needs to delete all associated on-disk files along with the database delete. + if (message) { + await deferredToPromise(message.destroy()); + } +} + +async function getMessageById(id, { Message }) { + const message = new Message({ id }); + try { + await deferredToPromise(message.fetch()); + return message; + } catch (error) { + return null; + } +} + +async function getAllMessageIds({ db, handleDOMException, getMessageKey }) { + const lookup = Object.create(null); + const storeName = 'messages'; + + return new Promise((resolve, reject) => { + const transaction = db.transaction(storeName, 'readwrite'); + transaction.onerror = () => { + handleDOMException( + `assembleLookup(${storeName}) transaction error`, + transaction.error, + reject + ); + }; + transaction.oncomplete = () => { + // not really very useful - fires at unexpected times + }; + + const store = transaction.objectStore(storeName); + const request = store.openCursor(); + request.onerror = () => { + handleDOMException( + `assembleLookup(${storeName}) request error`, + request.error, + reject + ); + }; + request.onsuccess = event => { + const cursor = event.target.result; + if (cursor && cursor.value) { + lookup[getMessageKey(cursor.value)] = true; + cursor.continue(); + } else { + window.log.info(`Done creating ${storeName} lookup`); + resolve(lookup); + } + }; + }); +} + +async function getMessageBySender( + // eslint-disable-next-line camelcase + { source, sourceDevice, sent_at }, + { Message } +) { + const fetcher = new Message(); + const options = { + index: { + name: 'unique', + // eslint-disable-next-line camelcase + value: [source, sourceDevice, sent_at], + }, + }; + + try { + await deferredToPromise(fetcher.fetch(options)); + if (fetcher.get('id')) { + return fetcher; + } + + return null; + } catch (error) { + return null; + } +} + +async function getUnreadByConversation(conversationId, { MessageCollection }) { + const messages = new MessageCollection(); + + await deferredToPromise( + messages.fetch({ + index: { + // 'unread' index + name: 'unread', + lower: [conversationId], + upper: [conversationId, Number.MAX_VALUE], + }, + }) + ); + + return messages; +} + +async function getMessagesByConversation( + conversationId, + { limit = 100, receivedAt = Number.MAX_VALUE, MessageCollection } +) { + const messages = new MessageCollection(); + + const options = { + limit, + index: { + // 'conversation' index on [conversationId, received_at] + name: 'conversation', + lower: [conversationId], + upper: [conversationId, receivedAt], + order: 'desc', + // SELECT messages WHERE conversationId = this.id ORDER + // received_at DESC + }, + }; + await deferredToPromise(messages.fetch(options)); + + return messages; +} + +async function removeAllMessagesInConversation( + conversationId, + { MessageCollection } +) { + const messages = new MessageCollection(); + + let loaded; + do { + // Yes, we really want the await in the loop. We're deleting 100 at a + // time so we don't use too much memory. + // eslint-disable-next-line no-await-in-loop + await deferredToPromise( + messages.fetch({ + limit: 100, + index: { + // 'conversation' index on [conversationId, received_at] + name: 'conversation', + lower: [conversationId], + upper: [conversationId, Number.MAX_VALUE], + }, + }) + ); + + loaded = messages.models; + messages.reset([]); + + // Note: It's very important that these models are fully hydrated because + // we need to delete all associated on-disk files along with the database delete. + loaded.map(message => message.destroy()); + } while (loaded.length > 0); +} + +async function getMessagesBySentAt(sentAt, { MessageCollection }) { + const messages = new MessageCollection(); + + await deferredToPromise( + messages.fetch({ + index: { + // 'receipt' index on sent_at + name: 'receipt', + only: sentAt, + }, + }) + ); + + return messages; +} + +async function getExpiredMessages({ MessageCollection }) { + window.log.info('Load expired messages'); + const messages = new MessageCollection(); + + await deferredToPromise( + messages.fetch({ + conditions: { + expires_at: { + $lte: Date.now(), + }, + }, + }) + ); + + return messages; +} + +async function getNextExpiringMessage({ MessageCollection }) { + const messages = new MessageCollection(); + + await deferredToPromise( + messages.fetch({ + limit: 1, + index: { + name: 'expires_at', + }, + }) + ); + + return messages; +} + +async function saveUnprocessed(data, { Unprocessed }) { + const unprocessed = new Unprocessed(data); + return deferredToPromise(unprocessed.save()); +} + +async function getAllUnprocessed({ UnprocessedCollection }) { + const collection = new UnprocessedCollection(); + await deferredToPromise(collection.fetch()); + return collection.map(model => model.attributes); +} + +async function updateUnprocessed(id, updates, { Unprocessed }) { + const unprocessed = new Unprocessed({ + id, + }); + + await deferredToPromise(unprocessed.fetch()); + + unprocessed.set(updates); + await saveUnprocessed(unprocessed.attributes, { Unprocessed }); +} + +async function removeUnprocessed(id, { Unprocessed }) { + const unprocessed = new Unprocessed({ + id, + }); + + await deferredToPromise(unprocessed.destroy()); +} + +async function removeAllUnprocessed() { + // erase everything in unprocessed table +} + +async function removeAll() { + // erase everything in the database +} + +async function getMessagesNeedingUpgrade(limit, { MessageCollection }) { + const messages = new MessageCollection(); + + await deferredToPromise( + messages.fetch({ + limit, + index: { + name: 'schemaVersion', + upper: MessageType.CURRENT_SCHEMA_VERSION, + excludeUpper: true, + order: 'desc', + }, + }) + ); + + const models = messages.models || []; + return models.map(model => model.toJSON()); +} + +async function getMessagesWithVisualMediaAttachments( + conversationId, + { limit, MessageCollection } +) { + const messages = new MessageCollection(); + const lowerReceivedAt = 0; + const upperReceivedAt = Number.MAX_VALUE; + + await deferredToPromise( + messages.fetch({ + limit, + index: { + name: 'hasVisualMediaAttachments', + lower: [conversationId, lowerReceivedAt, 1], + upper: [conversationId, upperReceivedAt, 1], + order: 'desc', + }, + }) + ); + + return messages.models.map(model => model.toJSON()); +} + +async function getMessagesWithFileAttachments( + conversationId, + { limit, MessageCollection } +) { + const messages = new MessageCollection(); + const lowerReceivedAt = 0; + const upperReceivedAt = Number.MAX_VALUE; + + await deferredToPromise( + messages.fetch({ + limit, + index: { + name: 'hasFileAttachments', + lower: [conversationId, lowerReceivedAt, 1], + upper: [conversationId, upperReceivedAt, 1], + order: 'desc', + }, + }) + ); + + return messages.models.map(model => model.toJSON()); +} + +module.exports = { + saveMessage, + removeMessage, + getUnreadByConversation, + removeAllMessagesInConversation, + getMessageBySender, + getMessageById, + getAllMessageIds, + getMessagesBySentAt, + getExpiredMessages, + getNextExpiringMessage, + getMessagesByConversation, + + getAllUnprocessed, + saveUnprocessed, + updateUnprocessed, + removeUnprocessed, + removeAllUnprocessed, + + removeAll, + + // Returning plain JSON + getMessagesNeedingUpgrade, + getMessagesWithVisualMediaAttachments, + getMessagesWithFileAttachments, +}; diff --git a/js/modules/debug.js b/js/modules/debug.js index b5b302ff0..8328dee5c 100644 --- a/js/modules/debug.js +++ b/js/modules/debug.js @@ -1,5 +1,5 @@ /* eslint-env node */ -/* global log */ +/* global log, Signal */ const fs = require('fs-extra'); const path = require('path'); @@ -58,9 +58,8 @@ exports.createConversation = async ({ range(0, numMessages).map(async index => { await sleep(index * 100); log.info(`Create message ${index + 1}`); - const messageAttributes = await createRandomMessage({ conversationId }); - const message = new WhisperMessage(messageAttributes); - return deferredToPromise(message.save()); + const message = await createRandomMessage({ conversationId }); + return Signal.Data.saveMessage(message); }) ); }; diff --git a/js/modules/messages_data_migrator.js b/js/modules/messages_data_migrator.js index b5e0c4c41..13eca80d6 100644 --- a/js/modules/messages_data_migrator.js +++ b/js/modules/messages_data_migrator.js @@ -11,7 +11,6 @@ const { isFunction, isNumber, isObject, isString, last } = require('lodash'); const database = require('./database'); const Message = require('./types/message'); const settings = require('./settings'); -const { deferredToPromise } = require('./deferred_to_promise'); const MESSAGES_STORE_NAME = 'messages'; @@ -20,6 +19,8 @@ exports.processNext = async ({ BackboneMessageCollection, numMessagesPerBatch, upgradeMessageSchema, + getMessagesNeedingUpgrade, + saveMessage, } = {}) => { if (!isFunction(BackboneMessage)) { throw new TypeError( @@ -45,10 +46,10 @@ exports.processNext = async ({ const startTime = Date.now(); const fetchStartTime = Date.now(); - const messagesRequiringSchemaUpgrade = await _fetchMessagesRequiringSchemaUpgrade( + const messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade( + numMessagesPerBatch, { - BackboneMessageCollection, - count: numMessagesPerBatch, + MessageCollection: BackboneMessageCollection, } ); const fetchDuration = Date.now() - fetchStartTime; @@ -60,8 +61,11 @@ exports.processNext = async ({ const upgradeDuration = Date.now() - upgradeStartTime; const saveStartTime = Date.now(); - const saveMessage = _saveMessageBackbone({ BackboneMessage }); - await Promise.all(upgradedMessages.map(saveMessage)); + await Promise.all( + upgradedMessages.map(message => + saveMessage(message, { Message: BackboneMessage }) + ) + ); const saveDuration = Date.now() - saveStartTime; const totalDuration = Date.now() - startTime; @@ -277,11 +281,6 @@ const _processBatch = async ({ }; }; -const _saveMessageBackbone = ({ BackboneMessage } = {}) => message => { - const backboneMessage = new BackboneMessage(message); - return deferredToPromise(backboneMessage.save()); -}; - const _saveMessage = ({ transaction } = {}) => message => { if (!isObject(transaction)) { throw new TypeError("'transaction' is required"); @@ -295,41 +294,6 @@ const _saveMessage = ({ transaction } = {}) => message => { }); }; -const _fetchMessagesRequiringSchemaUpgrade = async ({ - BackboneMessageCollection, - count, -} = {}) => { - if (!isFunction(BackboneMessageCollection)) { - throw new TypeError( - "'BackboneMessageCollection' (Whisper.MessageCollection)" + - ' constructor is required' - ); - } - - if (!isNumber(count)) { - throw new TypeError("'count' is required"); - } - - const collection = new BackboneMessageCollection(); - return new Promise(resolve => - collection - .fetch({ - limit: count, - index: { - name: 'schemaVersion', - upper: Message.CURRENT_SCHEMA_VERSION, - excludeUpper: true, - order: 'desc', - }, - }) - .always(() => { - const models = collection.models || []; - const messages = models.map(model => model.toJSON()); - resolve(messages); - }) - ); -}; - // NOTE: Named ‘dangerous’ because it is not as efficient as using our // `messages` `schemaVersion` index: const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({ diff --git a/js/modules/signal.js b/js/modules/signal.js index 3357baf6f..fcbcf3664 100644 --- a/js/modules/signal.js +++ b/js/modules/signal.js @@ -2,6 +2,7 @@ const Backbone = require('../../ts/backbone'); const Crypto = require('./crypto'); +const Data = require('./data'); const Database = require('./database'); const Emoji = require('../../ts/util/emoji'); const Notifications = require('../../ts/notifications'); @@ -205,6 +206,7 @@ exports.setup = (options = {}) => { Backbone, Components, Crypto, + Data, Database, Emoji, Migrations, diff --git a/js/modules/types/message.js b/js/modules/types/message.js index 711ec6c1a..9f0bf3fb2 100644 --- a/js/modules/types/message.js +++ b/js/modules/types/message.js @@ -498,6 +498,8 @@ exports.createAttachmentDataWriter = ({ }); }; + // TODO: need to handle attachment thumbnails and video screenshots + const messageWithoutAttachmentData = Object.assign( {}, await writeThumbnails(message, { logger }), diff --git a/js/read_receipts.js b/js/read_receipts.js index 8b9c370cd..458444515 100644 --- a/js/read_receipts.js +++ b/js/read_receipts.js @@ -1,4 +1,4 @@ -/* global Whisper, Backbone, _, ConversationController */ +/* global Whisper, Backbone, _, ConversationController, window */ /* eslint-disable more/no-then */ @@ -29,66 +29,74 @@ } return receipts; }, - onReceipt(receipt) { - const messages = new Whisper.MessageCollection(); - return messages - .fetchSentAt(receipt.get('timestamp')) - .then(() => { - if (messages.length === 0) { - return null; - } - const message = messages.find( - item => - item.isOutgoing() && - receipt.get('reader') === item.get('conversationId') - ); - if (message) { - return message; + async getTargetMessage(reader, messages) { + if (messages.length === 0) { + return null; + } + const message = messages.find( + item => item.isOutgoing() && reader === item.get('conversationId') + ); + if (message) { + return message; + } + + const groups = new Whisper.GroupCollection(); + return groups.fetchGroups(reader).then(() => { + const ids = groups.pluck('id'); + ids.push(reader); + return messages.find( + item => + item.isOutgoing() && _.contains(ids, item.get('conversationId')) + ); + }); + }, + async onReceipt(receipt) { + try { + const messages = await window.Signal.Data.getMessagesBySentAt( + receipt.get('timestamp'), + { + MessageCollection: Whisper.MessageCollection, } + ); - const groups = new Whisper.GroupCollection(); - return groups.fetchGroups(receipt.get('reader')).then(() => { - const ids = groups.pluck('id'); - ids.push(receipt.get('reader')); - return messages.find( - item => - item.isOutgoing() && _.contains(ids, item.get('conversationId')) - ); - }); - }) - .then(message => { - if (message) { - const readBy = message.get('read_by') || []; - readBy.push(receipt.get('reader')); - return new Promise((resolve, reject) => { - message.save({ read_by: readBy }).then(() => { - // notify frontend listeners - const conversation = ConversationController.get( - message.get('conversationId') - ); - if (conversation) { - conversation.trigger('read', message); - } + const message = await this.getTargetMessage( + receipt.get('reader'), + messages + ); - this.remove(receipt); - resolve(); - }, reject); - }); - } + if (!message) { window.log.info( 'No message for read receipt', receipt.get('reader'), receipt.get('timestamp') ); + return; + } - return null; - }) - .catch(error => { - window.log.error( - 'ReadReceipts.onReceipt error:', - error && error.stack ? error.stack : error - ); + const readBy = message.get('read_by') || []; + readBy.push(receipt.get('reader')); + + message.set({ read_by: readBy }); + + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, }); + + // notify frontend listeners + const conversation = ConversationController.get( + message.get('conversationId') + ); + if (conversation) { + conversation.trigger('read', message); + } + + this.remove(receipt); + } catch (error) { + window.log.error( + 'ReadReceipts.onReceipt error:', + error && error.stack ? error.stack : error + ); + } }, }))(); })(); diff --git a/js/read_syncs.js b/js/read_syncs.js index 729aa51ea..81a0c0705 100644 --- a/js/read_syncs.js +++ b/js/read_syncs.js @@ -21,51 +21,56 @@ return null; }, - onReceipt(receipt) { - const messages = new Whisper.MessageCollection(); - return messages - .fetchSentAt(receipt.get('timestamp')) - .then(() => { - const message = messages.find( - item => - item.isIncoming() && - item.isUnread() && - item.get('source') === receipt.get('sender') - ); - const notificationForMessage = message - ? Whisper.Notifications.findWhere({ messageId: message.id }) - : null; - const removedNotification = Whisper.Notifications.remove( - notificationForMessage - ); - const receiptSender = receipt.get('sender'); - const receiptTimestamp = receipt.get('timestamp'); - const wasMessageFound = Boolean(message); - const wasNotificationFound = Boolean(notificationForMessage); - const wasNotificationRemoved = Boolean(removedNotification); - window.log.info('Receive read sync:', { - receiptSender, - receiptTimestamp, - wasMessageFound, - wasNotificationFound, - wasNotificationRemoved, - }); - return message - ? message.markRead(receipt.get('read_at')).then(() => { - // This notification may result in messages older than this one being - // marked read. We want those messages to have the same expire timer - // start time as this one, so we pass the read_at value through. - this.notifyConversation(message, receipt.get('read_at')); - this.remove(receipt); - }) - : Promise.resolve(); - }) - .catch(error => { - window.log.error( - 'ReadSyncs.onReceipt error:', - error && error.stack ? error.stack : error - ); + async onReceipt(receipt) { + try { + const messages = await window.Signal.Data.getMessagesBySentAt( + receipt.get('timestamp'), + { + MessageCollection: Whisper.MessageCollection, + } + ); + + const message = messages.find( + item => + item.isIncoming() && + item.isUnread() && + item.get('source') === receipt.get('sender') + ); + const notificationForMessage = message + ? Whisper.Notifications.findWhere({ messageId: message.id }) + : null; + const removedNotification = Whisper.Notifications.remove( + notificationForMessage + ); + const receiptSender = receipt.get('sender'); + const receiptTimestamp = receipt.get('timestamp'); + const wasMessageFound = Boolean(message); + const wasNotificationFound = Boolean(notificationForMessage); + const wasNotificationRemoved = Boolean(removedNotification); + window.log.info('Receive read sync:', { + receiptSender, + receiptTimestamp, + wasMessageFound, + wasNotificationFound, + wasNotificationRemoved, }); + + if (!message) { + return; + } + + await message.markRead(receipt.get('read_at')); + // This notification may result in messages older than this one being + // marked read. We want those messages to have the same expire timer + // start time as this one, so we pass the read_at value through. + this.notifyConversation(message, receipt.get('read_at')); + this.remove(receipt); + } catch (error) { + window.log.error( + 'ReadSyncs.onReceipt error:', + error && error.stack ? error.stack : error + ); + } }, notifyConversation(message, readAt) { const conversation = ConversationController.get({ diff --git a/js/signal_protocol_store.js b/js/signal_protocol_store.js index 2a1cb4bc6..e8a81afc4 100644 --- a/js/signal_protocol_store.js +++ b/js/signal_protocol_store.js @@ -946,53 +946,33 @@ // Not yet processed messages - for resiliency getAllUnprocessed() { - let collection; - return new Promise((resolve, reject) => { - collection = new UnprocessedCollection(); - return collection.fetch().then(resolve, reject); - }).then(() => - // Return a plain array of plain objects - collection.map(model => model.attributes) - ); + return window.Signal.Data.getAllUnprocessed({ UnprocessedCollection }); }, addUnprocessed(data) { - return new Promise((resolve, reject) => { - const unprocessed = new Unprocessed(data); - return unprocessed.save().then(resolve, reject); - }); + return window.Signal.Data.saveUnprocessed(data, { Unprocessed }); }, updateUnprocessed(id, updates) { - return new Promise((resolve, reject) => { - const unprocessed = new Unprocessed({ - id, - }); - return unprocessed - .fetch() - .then(() => unprocessed.save(updates).then(resolve, reject), reject); - }); + return window.Signal.Data.updateUnprocessed(id, updates, { Unprocessed }); }, removeUnprocessed(id) { - return new Promise((resolve, reject) => { - const unprocessed = new Unprocessed({ - id, - }); - return unprocessed.destroy().then(resolve, reject); - }); + return window.Signal.Data.removeUnprocessed(id, { Unprocessed }); }, - removeAllData() { + async removeAllData() { // First the in-memory caches: window.storage.reset(); // items store ConversationController.reset(); // conversations store // Then, the entire database: - return Whisper.Database.clear(); + await Whisper.Database.clear(); + + await window.Signal.Data.removeAll(); }, - removeAllConfiguration() { + async removeAllConfiguration() { // First the in-memory cache for the items store: window.storage.reset(); // Then anything in the database that isn't a message/conversation/group: - return Whisper.Database.clearStores([ + await Whisper.Database.clearStores([ 'items', 'identityKeys', 'sessions', @@ -1000,6 +980,8 @@ 'preKeys', 'unprocessed', ]); + + await window.Signal.Data.removeAllUnprocessed(); }, }; _.extend(SignalProtocolStore.prototype, Backbone.Events); diff --git a/js/views/conversation_view.js b/js/views/conversation_view.js index 3bb571a73..410ed1b4a 100644 --- a/js/views/conversation_view.js +++ b/js/views/conversation_view.js @@ -7,7 +7,6 @@ /* global Signal: false */ /* global storage: false */ /* global Whisper: false */ -/* global wrapDeferred: false */ // eslint-disable-next-line func-names (function() { @@ -592,19 +591,18 @@ const DEFAULT_DOCUMENTS_FETCH_COUNT = 150; const conversationId = this.model.get('id'); - const WhisperMessageCollection = Whisper.MessageCollection; - const rawMedia = await Signal.Backbone.Conversation.fetchVisualMediaAttachments( + const rawMedia = await Signal.Data.getMessagesWithVisualMediaAttachments( + conversationId, { - conversationId, - count: DEFAULT_MEDIA_FETCH_COUNT, - WhisperMessageCollection, + limit: DEFAULT_MEDIA_FETCH_COUNT, + MessageCollection: Whisper.MessageCollection, } ); - const documents = await Signal.Backbone.Conversation.fetchFileAttachments( + const documents = await Signal.Data.getMessagesWithFileAttachments( + conversationId, { - conversationId, - count: DEFAULT_DOCUMENTS_FETCH_COUNT, - WhisperMessageCollection, + limit: DEFAULT_DOCUMENTS_FETCH_COUNT, + MessageCollection: Whisper.MessageCollection, } ); @@ -617,9 +615,10 @@ // Yep, we really do want to wait for each of these // eslint-disable-next-line no-await-in-loop rawMedia[i] = await upgradeMessageSchema(message); - const model = new Whisper.Message(rawMedia[i]); // eslint-disable-next-line no-await-in-loop - await wrapDeferred(model.save()); + await window.Signal.Data.saveMessage(rawMedia[i], { + Message: Whisper.Message, + }); } } @@ -784,7 +783,6 @@ this.view.$el.scrollTop(newScrollPosition); }, 1); }, - fetchMessages() { window.log.info('fetchMessages'); this.$('.bar-container').show(); @@ -820,6 +818,11 @@ // This is debounced, so it won't hit the database too often. this.lazyUpdateVerified(); + // We do this here because we don't want convo.messageCollection to have + // anything in it unless it has an associated view. This is so, when we + // fetch on open, it's clean. + this.model.addSingleMessage(message); + if (message.isOutgoing()) { this.removeLastSeenIndicator(); } @@ -992,7 +995,10 @@ message: i18n('deleteWarning'), okText: i18n('delete'), resolve: () => { - message.destroy(); + window.Signal.Data.removeMessage(message.id, { + Message: Whisper.Message, + }); + message.trigger('unload'); this.resetPanel(); this.updateHeader(); }, diff --git a/test/backup_test.js b/test/backup_test.js index f36d094e8..2751ffe95 100644 --- a/test/backup_test.js +++ b/test/backup_test.js @@ -553,7 +553,13 @@ describe('Backup', () => { const message = await upgradeMessageSchema(messageWithAttachments); console.log({ message }); const messageModel = new Whisper.Message(message); - await window.wrapDeferred(messageModel.save()); + const id = await window.Signal.Data.saveMessage( + messageModel.attributes, + { + Message: Whisper.Message, + } + ); + messageModel.set({ id }); const conversation = { active_at: 1524185933350, diff --git a/test/fixtures.js b/test/fixtures.js index aafcdaae9..6d9909863 100644 --- a/test/fixtures.js +++ b/test/fixtures.js @@ -231,21 +231,20 @@ Whisper.Fixtures = function() { conversationCollection.saveAll = function() { return Promise.all( - this.map(function(convo) { - return new Promise(function(resolve) { - convo.save().then(resolve); - }).then(function() { - return Promise.all( - convo.messageCollection.map(function(message) { - return new Promise(function(resolve) { - message.save().then(resolve); - }); - }) - ); - }); + this.map(async (convo) => { + await wrapDeferred(convo.save()); + + await Promise.all( + convo.messageCollection.map(async (message) => { + const id = await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message + }); + message.set({ id }); + }) + ); }) ); - }; + } function getImage1() { return ( diff --git a/test/models/conversations_test.js b/test/models/conversations_test.js index 0d6b30b68..98066877c 100644 --- a/test/models/conversations_test.js +++ b/test/models/conversations_test.js @@ -83,17 +83,19 @@ describe('Conversation', function() { var attributes = { type: 'private', id: '+18085555555' }; - before(function(done) { + before(async () => { var convo = new Whisper.ConversationCollection().add(attributes); - convo.save().then(function() { - var message = convo.messageCollection.add({ - body: 'hello world', - conversationId: convo.id, - type: 'outgoing', - sent_at: Date.now(), - received_at: Date.now(), - }); - message.save().then(done); + await wrapDeferred(convo.save()); + + var message = convo.messageCollection.add({ + body: 'hello world', + conversationId: convo.id, + type: 'outgoing', + sent_at: Date.now(), + received_at: Date.now(), + }); + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, }); }); after(clearDatabase); diff --git a/test/models/messages_test.js b/test/models/messages_test.js index e6708fc86..67066c8e0 100644 --- a/test/models/messages_test.js +++ b/test/models/messages_test.js @@ -1,16 +1,6 @@ (function() { 'use strict'; - function deleteAllMessages() { - return new Promise(function(resolve, reject) { - var messages = new Whisper.MessageCollection(); - return messages.fetch().then(function() { - messages.destroyAll(); - resolve(); - }, reject); - }); - } - var attributes = { type: 'outgoing', body: 'hi', @@ -28,12 +18,12 @@ describe('MessageCollection', function() { before(async function() { - await deleteAllMessages(); + await clearDatabase(); ConversationController.reset(); await ConversationController.load(); }); after(function() { - return deleteAllMessages(); + return clearDatabase(); }); it('gets outgoing contact', function() { @@ -60,39 +50,6 @@ assert.strictEqual(messages.length, 0); }); - it('saves asynchronously', function(done) { - new Whisper.MessageCollection() - .add(attributes) - .save() - .then(done); - }); - - it('fetches persistent messages', function(done) { - var messages = new Whisper.MessageCollection(); - assert.strictEqual(messages.length, 0); - messages.fetch().then(function() { - assert.notEqual(messages.length, 0); - var m = messages.at(0).attributes; - _.each(attributes, function(val, key) { - assert.deepEqual(m[key], val); - }); - done(); - }); - }); - - it('destroys persistent messages', function(done) { - var messages = new Whisper.MessageCollection(); - messages.fetch().then(function() { - messages.destroyAll().then(function() { - var messages = new Whisper.MessageCollection(); - messages.fetch().then(function() { - assert.strictEqual(messages.length, 0); - done(); - }); - }); - }); - }); - it('should be ordered oldest to newest', function() { var messages = new Whisper.MessageCollection(); // Timestamps diff --git a/ts/backbone/Conversation.ts b/ts/backbone/Conversation.ts deleted file mode 100644 index e64b017a0..000000000 --- a/ts/backbone/Conversation.ts +++ /dev/null @@ -1,76 +0,0 @@ -import is from '@sindresorhus/is'; - -import { Collection as BackboneCollection } from '../types/backbone/Collection'; -import { deferredToPromise } from '../../js/modules/deferred_to_promise'; -import { IndexableBoolean } from '../types/IndexedDB'; -import { Message } from '../types/Message'; - -export const fetchVisualMediaAttachments = async ({ - conversationId, - count, - WhisperMessageCollection, -}: { - conversationId: string; - count: number; - WhisperMessageCollection: BackboneCollection; -}): Promise> => - fetchFromAttachmentsIndex({ - name: 'hasVisualMediaAttachments', - conversationId, - WhisperMessageCollection, - count, - }); - -export const fetchFileAttachments = async ({ - conversationId, - count, - WhisperMessageCollection, -}: { - conversationId: string; - count: number; - WhisperMessageCollection: BackboneCollection; -}): Promise> => - fetchFromAttachmentsIndex({ - name: 'hasFileAttachments', - conversationId, - WhisperMessageCollection, - count, - }); - -const fetchFromAttachmentsIndex = async ({ - name, - conversationId, - WhisperMessageCollection, - count, -}: { - name: 'hasVisualMediaAttachments' | 'hasFileAttachments'; - conversationId: string; - WhisperMessageCollection: BackboneCollection; - count: number; -}): Promise> => { - if (!is.string(conversationId)) { - throw new TypeError("'conversationId' is required"); - } - - if (!is.object(WhisperMessageCollection)) { - throw new TypeError("'WhisperMessageCollection' is required"); - } - - const collection = new WhisperMessageCollection(); - const lowerReceivedAt = 0; - const upperReceivedAt = Number.MAX_VALUE; - const condition: IndexableBoolean = 1; - await deferredToPromise( - collection.fetch({ - index: { - name, - lower: [conversationId, lowerReceivedAt, condition], - upper: [conversationId, upperReceivedAt, condition], - order: 'desc', - }, - limit: count, - }) - ); - - return collection.models.map(model => model.toJSON()); -}; diff --git a/ts/backbone/index.ts b/ts/backbone/index.ts index fc5fabadc..26dc5f620 100644 --- a/ts/backbone/index.ts +++ b/ts/backbone/index.ts @@ -1,4 +1,3 @@ -import * as Conversation from './Conversation'; import * as Views from './views'; -export { Conversation, Views }; +export { Views };