From 02675312c59ee83b3dec4d8c664a963fcb00af59 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Thu, 26 Jul 2018 19:19:34 -0700 Subject: [PATCH] Introduce mandatary migration on startup, to minimum version --- js/background.js | 80 ++++++++++++++++++++-------- js/modules/data.js | 7 ++- js/modules/messages_data_migrator.js | 66 ++++++++++++++++------- js/modules/signal.js | 10 ++-- js/modules/types/message.js | 8 ++- 5 files changed, 122 insertions(+), 49 deletions(-) diff --git a/js/background.js b/js/background.js index a8c29c618..24cce0aac 100644 --- a/js/background.js +++ b/js/background.js @@ -165,11 +165,64 @@ window.log.info('Storage fetch'); storage.fetch(); + const MINIMUM_VERSION = 7; + + async function upgradeMessages() { + const NUM_MESSAGES_PER_BATCH = 10; + window.log.info( + 'upgradeMessages: Mandatory message schema upgrade started.', + `Target version: ${MINIMUM_VERSION}` + ); + + let isMigrationWithoutIndexComplete = false; + while (!isMigrationWithoutIndexComplete) { + const database = Migrations0DatabaseWithAttachmentData.getDatabase(); + // eslint-disable-next-line no-await-in-loop + const batchWithoutIndex = await MessageDataMigrator.processNextBatchWithoutIndex( + { + databaseName: database.name, + minDatabaseVersion: database.version, + numMessagesPerBatch: NUM_MESSAGES_PER_BATCH, + upgradeMessageSchema, + maxVersion: MINIMUM_VERSION, + BackboneMessage: Whisper.Message, + saveMessage: window.Signal.Data.saveMessage, + } + ); + window.log.info( + 'upgradeMessages: upgrade without index', + batchWithoutIndex + ); + isMigrationWithoutIndexComplete = batchWithoutIndex.done; + } + window.log.info('upgradeMessages: upgrade without index complete!'); + + let isMigrationWithIndexComplete = false; + while (!isMigrationWithIndexComplete) { + // eslint-disable-next-line no-await-in-loop + const batchWithIndex = await MessageDataMigrator.processNext({ + BackboneMessage: Whisper.Message, + BackboneMessageCollection: Whisper.MessageCollection, + numMessagesPerBatch: NUM_MESSAGES_PER_BATCH, + upgradeMessageSchema, + getMessagesNeedingUpgrade: window.Signal.Data.getMessagesNeedingUpgrade, + saveMessage: window.Signal.Data.saveMessage, + maxVersion: MINIMUM_VERSION, + }); + window.log.info('upgradeMessages: upgrade with index', batchWithIndex); + isMigrationWithIndexComplete = batchWithIndex.done; + } + window.log.info('upgradeMessages: upgrade with index complete!'); + + window.log.info('upgradeMessages: Message schema upgrade complete'); + } + + await upgradeMessages(); + const idleDetector = new IdleDetector(); let isMigrationWithIndexComplete = false; - let isMigrationWithoutIndexComplete = false; + window.log.info('Starting background data migration. Target version: latest'); idleDetector.on('idle', async () => { - window.log.info('Idle processing started'); const NUM_MESSAGES_PER_BATCH = 1; if (!isMigrationWithIndexComplete) { @@ -185,27 +238,8 @@ isMigrationWithIndexComplete = batchWithIndex.done; } - if (!isMigrationWithoutIndexComplete) { - const database = Migrations0DatabaseWithAttachmentData.getDatabase(); - const batchWithoutIndex = await MessageDataMigrator.processNextBatchWithoutIndex( - { - databaseName: database.name, - minDatabaseVersion: database.version, - numMessagesPerBatch: NUM_MESSAGES_PER_BATCH, - upgradeMessageSchema, - } - ); - window.log.info( - 'Upgrade message schema (without index):', - batchWithoutIndex - ); - isMigrationWithoutIndexComplete = batchWithoutIndex.done; - } - - const areAllMigrationsComplete = - isMigrationWithIndexComplete && isMigrationWithoutIndexComplete; - if (areAllMigrationsComplete) { - window.log.info('All migrations are complete. Stopping idle detector.'); + if (isMigrationWithIndexComplete) { + window.log.info('Background migration complete. Stopping idle detector.'); idleDetector.stop(); } }); diff --git a/js/modules/data.js b/js/modules/data.js index 540d99f0b..cdf967ac5 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -256,7 +256,10 @@ async function removeAll() { // erase everything in the database } -async function getMessagesNeedingUpgrade(limit, { MessageCollection }) { +async function getMessagesNeedingUpgrade( + limit, + { MessageCollection, maxVersion = MessageType.CURRENT_SCHEMA_VERSION } +) { const messages = new MessageCollection(); await deferredToPromise( @@ -264,7 +267,7 @@ async function getMessagesNeedingUpgrade(limit, { MessageCollection }) { limit, index: { name: 'schemaVersion', - upper: MessageType.CURRENT_SCHEMA_VERSION, + upper: maxVersion, excludeUpper: true, order: 'desc', }, diff --git a/js/modules/messages_data_migrator.js b/js/modules/messages_data_migrator.js index 13eca80d6..6b9581ba7 100644 --- a/js/modules/messages_data_migrator.js +++ b/js/modules/messages_data_migrator.js @@ -21,6 +21,7 @@ exports.processNext = async ({ upgradeMessageSchema, getMessagesNeedingUpgrade, saveMessage, + maxVersion = Message.CURRENT_SCHEMA_VERSION, } = {}) => { if (!isFunction(BackboneMessage)) { throw new TypeError( @@ -49,6 +50,7 @@ exports.processNext = async ({ const messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade( numMessagesPerBatch, { + maxVersion, MessageCollection: BackboneMessageCollection, } ); @@ -56,7 +58,9 @@ exports.processNext = async ({ const upgradeStartTime = Date.now(); const upgradedMessages = await Promise.all( - messagesRequiringSchemaUpgrade.map(upgradeMessageSchema) + messagesRequiringSchemaUpgrade.map(message => + upgradeMessageSchema(message, { maxVersion }) + ) ); const upgradeDuration = Date.now() - upgradeStartTime; @@ -87,6 +91,9 @@ exports.dangerouslyProcessAllWithoutIndex = async ({ numMessagesPerBatch, upgradeMessageSchema, logger, + maxVersion = Message.CURRENT_SCHEMA_VERSION, + saveMessage, + BackboneMessage, } = {}) => { if (!isString(databaseName)) { throw new TypeError("'databaseName' must be a string"); @@ -99,10 +106,15 @@ exports.dangerouslyProcessAllWithoutIndex = async ({ if (!isNumber(numMessagesPerBatch)) { throw new TypeError("'numMessagesPerBatch' must be a number"); } - if (!isFunction(upgradeMessageSchema)) { throw new TypeError("'upgradeMessageSchema' is required"); } + if (!isFunction(BackboneMessage)) { + throw new TypeError("'upgradeMessageSchema' is required"); + } + if (!isFunction(saveMessage)) { + throw new TypeError("'upgradeMessageSchema' is required"); + } const connection = await database.open(databaseName); const databaseVersion = connection.version; @@ -133,6 +145,9 @@ exports.dangerouslyProcessAllWithoutIndex = async ({ connection, numMessagesPerBatch, upgradeMessageSchema, + maxVersion, + saveMessage, + BackboneMessage, }); if (status.done) { break; @@ -162,6 +177,9 @@ exports.processNextBatchWithoutIndex = async ({ minDatabaseVersion, numMessagesPerBatch, upgradeMessageSchema, + maxVersion, + BackboneMessage, + saveMessage, } = {}) => { if (!isFunction(upgradeMessageSchema)) { throw new TypeError("'upgradeMessageSchema' is required"); @@ -172,6 +190,9 @@ exports.processNextBatchWithoutIndex = async ({ connection, numMessagesPerBatch, upgradeMessageSchema, + maxVersion, + BackboneMessage, + saveMessage, }); return batch; }; @@ -203,17 +224,29 @@ const _processBatch = async ({ connection, numMessagesPerBatch, upgradeMessageSchema, + maxVersion, + BackboneMessage, + saveMessage, } = {}) => { if (!isObject(connection)) { - throw new TypeError("'connection' must be a string"); + throw new TypeError('_processBatch: connection must be a string'); } if (!isFunction(upgradeMessageSchema)) { - throw new TypeError("'upgradeMessageSchema' is required"); + throw new TypeError('_processBatch: upgradeMessageSchema is required'); } if (!isNumber(numMessagesPerBatch)) { - throw new TypeError("'numMessagesPerBatch' is required"); + throw new TypeError('_processBatch: numMessagesPerBatch is required'); + } + if (!isNumber(maxVersion)) { + throw new TypeError('_processBatch: maxVersion is required'); + } + if (!isFunction(BackboneMessage)) { + throw new TypeError('_processBatch: BackboneMessage is required'); + } + if (!isFunction(saveMessage)) { + throw new TypeError('_processBatch: saveMessage is required'); } const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete( @@ -241,14 +274,20 @@ const _processBatch = async ({ const upgradeStartTime = Date.now(); const upgradedMessages = await Promise.all( - unprocessedMessages.map(upgradeMessageSchema) + unprocessedMessages.map(message => + upgradeMessageSchema(message, { maxVersion }) + ) ); const upgradeDuration = Date.now() - upgradeStartTime; const saveMessagesStartTime = Date.now(); const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite'); const transactionCompletion = database.completeTransaction(transaction); - await Promise.all(upgradedMessages.map(_saveMessage({ transaction }))); + await Promise.all( + upgradedMessages.map(message => + saveMessage(message, { Message: BackboneMessage }) + ) + ); await transactionCompletion; const saveDuration = Date.now() - saveMessagesStartTime; @@ -281,19 +320,6 @@ const _processBatch = async ({ }; }; -const _saveMessage = ({ transaction } = {}) => message => { - if (!isObject(transaction)) { - throw new TypeError("'transaction' is required"); - } - - const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME); - const request = messagesStore.put(message, message.id); - return new Promise((resolve, reject) => { - request.onsuccess = () => resolve(); - request.onerror = event => reject(event.target.error); - }); -}; - // NOTE: Named ‘dangerous’ because it is not as efficient as using our // `messages` `schemaVersion` index: const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({ diff --git a/js/modules/signal.js b/js/modules/signal.js index fcbcf3664..bfd61b736 100644 --- a/js/modules/signal.js +++ b/js/modules/signal.js @@ -125,8 +125,10 @@ function initializeMigrations({ loadMessage: MessageType.createAttachmentLoader(loadAttachmentData), Migrations0DatabaseWithAttachmentData, Migrations1DatabaseWithoutAttachmentData, - upgradeMessageSchema: message => - MessageType.upgradeSchema(message, { + upgradeMessageSchema: (message, options = {}) => { + const { maxVersion } = options; + + return MessageType.upgradeSchema(message, { writeNewAttachmentData: createWriterForNew(attachmentsPath), getRegionCode, getAbsoluteAttachmentPath, @@ -136,7 +138,9 @@ function initializeMigrations({ makeImageThumbnail, makeVideoScreenshot, logger, - }), + maxVersion, + }); + }, writeMessageAttachments: MessageType.createAttachmentDataWriter({ writeExistingAttachmentData: createWriterForExisting(attachmentsPath), logger, diff --git a/js/modules/types/message.js b/js/modules/types/message.js index 9f0bf3fb2..cd0609083 100644 --- a/js/modules/types/message.js +++ b/js/modules/types/message.js @@ -296,6 +296,7 @@ exports.upgradeSchema = async ( makeImageThumbnail, makeVideoScreenshot, logger, + maxVersion = exports.CURRENT_SCHEMA_VERSION, } = {} ) => { if (!isFunction(writeNewAttachmentData)) { @@ -328,7 +329,12 @@ exports.upgradeSchema = async ( let message = rawMessage; // eslint-disable-next-line no-restricted-syntax - for (const currentVersion of VERSIONS) { + for (let index = 0, max = VERSIONS.length; index < max; index += 1) { + if (maxVersion < index) { + break; + } + + const currentVersion = VERSIONS[index]; // We really do want this intra-loop await because this is a chained async action, // each step dependent on the previous // eslint-disable-next-line no-await-in-loop