Migrate attachments on startup (#2193)

- [x] Run initial Backbone migrations 12–17 upon startup.
- [x] ~~Run attachment migration (without Backbone references to avoid
      migrations): `MessageDataMigrator.processAll`.~~ Disabled in favor of
      background processing without index.
- [x] ~~Run new migrations that can cover entire database (18+).~~
      Not until we have such migrations.
- [x] Add `runMigrations` module that ensures migrations are only run once.
- [x] Add `settings` (`Signal.Settings`) module to interact with our app
      settings (`items` store) using a domain API, e.g.
      `isAttachmentMigrationComplete(…)` vs
      `storage.get('attachmentMigration_isComplete')`. Required to check
      attachment migration status without using Backbone.
- [x] Add `database` (`Signal.Database`) CommonJS module to provide
      `Promise`-based interface to IndexedDB.
- [x] Add `debug` (`Signal.Debug`) module for generating synthetic data to test
      performance of migration.
- [x] Minor: Add `sleep` module for doing promise based sleeps.
- [x] Minor: Extract `wrapDeferred` as CommonJS module named `deferredToPromise`.
pull/1/head
Daniel Gasienica 7 years ago committed by GitHub
commit 887bd83852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -97,6 +97,7 @@ module.exports = function(grunt) {
files: [
'Gruntfile.js',
'js/**/*.js',
'!js/background.js',
'!js/jquery.js',
'!js/libtextsecure.js',
'!js/WebAudioRecorderMp3.js',

@ -1,9 +1,9 @@
const crypto = require('crypto');
const fse = require('fs-extra');
const isArrayBuffer = require('lodash/isArrayBuffer');
const isString = require('lodash/isString');
const path = require('path');
const fse = require('fs-extra');
const toArrayBuffer = require('to-arraybuffer');
const { isArrayBuffer, isString } = require('lodash');
const PATH = 'attachments.noindex';

@ -1,4 +1,4 @@
const isString = require('lodash/isString');
const { isString } = require('lodash');
exports.createTemplate = (options, messages) => {

@ -11,12 +11,16 @@
/* global Whisper: false */
/* global wrapDeferred: false */
;(function() {
;(async function() {
'use strict';
const { IdleDetector, MessageDataMigrator } = Signal.Workflow;
const { Errors, Message } = window.Signal.Types;
const { upgradeMessageSchema } = window.Signal.Migrations;
const {
Migrations0DatabaseWithAttachmentData,
// Migrations1DatabaseWithoutAttachmentData,
} = window.Signal.Migrations;
const { Views } = window.Signal;
// Implicitly used in `indexeddb-backbonejs-adapter`:
@ -75,22 +79,40 @@
return accountManager;
};
const cancelInitializationMessage = Views.Initialization.setMessage();
console.log('Start IndexedDB migrations');
storage.fetch();
/* eslint-enable */
const cancelInitializationMessage = Views.Initialization.setMessage();
console.log('Start IndexedDB migrations');
console.log('Migrate database with attachments');
await Migrations0DatabaseWithAttachmentData.run({ Backbone });
// console.log('Migrate attachments to disk');
// const database = Migrations0DatabaseWithAttachmentData.getDatabase();
// await MessageDataMigrator.processAll({
// Backbone,
// databaseName: database.name,
// minDatabaseVersion: database.version,
// upgradeMessageSchema,
// });
// console.log('Migrate database without attachments');
// await Migrations1DatabaseWithoutAttachmentData.run({
// Backbone,
// database: Whisper.Database,
// });
console.log('Storage fetch');
storage.fetch();
const idleDetector = new IdleDetector();
/* eslint-enable */
/* jshint ignore:start */
const NUM_MESSAGE_UPGRADES_PER_IDLE = 2;
const idleDetector = new IdleDetector();
idleDetector.on('idle', async () => {
const results = await MessageDataMigrator.processNext({
BackboneMessage: Whisper.Message,
BackboneMessageCollection: Whisper.MessageCollection,
count: NUM_MESSAGE_UPGRADES_PER_IDLE,
upgradeMessageSchema,
wrapDeferred,
});
console.log('Upgrade message schema:', results);
@ -98,7 +120,6 @@
idleDetector.stop();
}
});
/* jshint ignore:end */
/* eslint-disable */
// We need this 'first' check because we don't want to start the app up any other time
@ -558,7 +579,6 @@
}
/* eslint-enable */
/* jshint ignore:start */
// Descriptors
const getGroupDescriptor = group => ({
@ -667,7 +687,6 @@
getMessageDescriptor: getDescriptorForSent,
createMessage: createSentMessage,
});
/* jshint ignore:end */
/* eslint-disable */
function isMessageDuplicate(message) {

@ -2,13 +2,11 @@
/* global Backbone: false */
/* global _: false */
/* eslint-disable more/no-then */
// eslint-disable-next-line func-names
(function () {
'use strict';
const { Migrations } = window.Signal;
const { Migrations0DatabaseWithAttachmentData } = window.Signal.Migrations;
window.Whisper = window.Whisper || {};
window.Whisper.Database = window.Whisper.Database || {};
@ -125,132 +123,5 @@
request.onsuccess = resolve;
}));
Whisper.Database.migrations = [
{
version: '12.0',
migrate(transaction, next) {
console.log('migration 12.0');
console.log('creating object stores');
const messages = transaction.db.createObjectStore('messages');
messages.createIndex('conversation', ['conversationId', 'received_at'], {
unique: false,
});
messages.createIndex('receipt', 'sent_at', { unique: false });
messages.createIndex('unread', ['conversationId', 'unread'], { unique: false });
messages.createIndex('expires_at', 'expires_at', { unique: false });
const conversations = transaction.db.createObjectStore('conversations');
conversations.createIndex('inbox', 'active_at', { unique: false });
conversations.createIndex('group', 'members', {
unique: false,
multiEntry: true,
});
conversations.createIndex('type', 'type', {
unique: false,
});
conversations.createIndex('search', 'tokens', {
unique: false,
multiEntry: true,
});
transaction.db.createObjectStore('groups');
transaction.db.createObjectStore('sessions');
transaction.db.createObjectStore('identityKeys');
transaction.db.createObjectStore('preKeys');
transaction.db.createObjectStore('signedPreKeys');
transaction.db.createObjectStore('items');
console.log('creating debug log');
transaction.db.createObjectStore('debug');
next();
},
},
{
version: '13.0',
migrate(transaction, next) {
console.log('migration 13.0');
console.log('Adding fields to identity keys');
const identityKeys = transaction.objectStore('identityKeys');
const request = identityKeys.openCursor();
const promises = [];
request.onsuccess = (event) => {
const cursor = event.target.result;
if (cursor) {
const attributes = cursor.value;
attributes.timestamp = 0;
attributes.firstUse = false;
attributes.nonblockingApproval = false;
attributes.verified = 0;
promises.push(new Promise(((resolve, reject) => {
const putRequest = identityKeys.put(attributes, attributes.id);
putRequest.onsuccess = resolve;
putRequest.onerror = (e) => {
console.log(e);
reject(e);
};
})));
cursor.continue();
} else {
// no more results
Promise.all(promises).then(() => {
next();
});
}
};
request.onerror = (event) => {
console.log(event);
};
},
},
{
version: '14.0',
migrate(transaction, next) {
console.log('migration 14.0');
console.log('Adding unprocessed message store');
const unprocessed = transaction.db.createObjectStore('unprocessed');
unprocessed.createIndex('received', 'timestamp', { unique: false });
next();
},
},
{
version: '15.0',
migrate(transaction, next) {
console.log('migration 15.0');
console.log('Adding messages index for de-duplication');
const messages = transaction.objectStore('messages');
messages.createIndex('unique', ['source', 'sourceDevice', 'sent_at'], {
unique: true,
});
next();
},
},
{
version: '16.0',
migrate(transaction, next) {
console.log('migration 16.0');
console.log('Dropping log table, since we now log to disk');
transaction.db.deleteObjectStore('debug');
next();
},
},
{
version: 17,
async migrate(transaction, next) {
console.log('migration 17');
console.log('Start migration to database version 17');
const start = Date.now();
await Migrations.V17.run(transaction);
const duration = Date.now() - start;
console.log(
'Complete migration to database version 17.',
`Duration: ${duration}ms`
);
next();
},
},
];
Whisper.Database.migrations = Migrations0DatabaseWithAttachmentData.migrations;
}());

@ -0,0 +1,56 @@
/* global indexedDB */
// Module for interacting with IndexedDB without Backbone IndexedDB adapter
// and using promises. Revisit use of `idb` dependency as it might cover
// this functionality.
const { isObject } = require('lodash');
exports.open = (name, version) => {
const request = indexedDB.open(name, version);
return new Promise((resolve, reject) => {
request.onblocked = () =>
reject(new Error('Database blocked'));
request.onupgradeneeded = event =>
reject(new Error('Unexpected database upgrade required:' +
`oldVersion: ${event.oldVersion}, newVersion: ${event.newVersion}`));
request.onerror = event =>
reject(event.target.error);
request.onsuccess = (event) => {
const connection = event.target.result;
resolve(connection);
};
});
};
exports.completeTransaction = transaction =>
new Promise((resolve, reject) => {
transaction.addEventListener('abort', event => reject(event.target.error));
transaction.addEventListener('error', event => reject(event.target.error));
transaction.addEventListener('complete', () => resolve());
});
exports.getVersion = async (name) => {
const connection = await exports.open(name);
const { version } = connection;
connection.close();
return version;
};
exports.getCount = async ({ store } = {}) => {
if (!isObject(store)) {
throw new TypeError('"store" is required');
}
const request = store.count();
return new Promise((resolve, reject) => {
request.onerror = event =>
reject(event.target.error);
request.onsuccess = event =>
resolve(event.target.result);
});
};

@ -0,0 +1,135 @@
const {
isFunction,
isNumber,
isObject,
isString,
random,
range,
sample,
} = require('lodash');
const Message = require('./types/message');
const { deferredToPromise } = require('./deferred_to_promise');
const { sleep } = require('./sleep');
// See: https://en.wikipedia.org/wiki/Fictitious_telephone_number#North_American_Numbering_Plan
const SENDER_ID = '+12126647665';
exports.createConversation = async ({
ConversationController,
numMessages,
WhisperMessage,
} = {}) => {
if (!isObject(ConversationController) ||
!isFunction(ConversationController.getOrCreateAndWait)) {
throw new TypeError('"ConversationController" is required');
}
if (!isNumber(numMessages) || numMessages <= 0) {
throw new TypeError('"numMessages" must be a positive number');
}
if (!isFunction(WhisperMessage)) {
throw new TypeError('"WhisperMessage" is required');
}
const conversation =
await ConversationController.getOrCreateAndWait(SENDER_ID, 'private');
conversation.set({
active_at: Date.now(),
unread: numMessages,
});
await deferredToPromise(conversation.save());
const conversationId = conversation.get('id');
await Promise.all(range(0, numMessages).map(async (index) => {
await sleep(index * 100);
console.log(`Create message ${index + 1}`);
const message = new WhisperMessage(createRandomMessage({ conversationId }));
return deferredToPromise(message.save());
}));
};
const SAMPLE_MESSAGES = [
'Lorem ipsum dolor sit amet, consectetur adipiscing elit.',
'Integer et rutrum leo, eu ultrices ligula.',
'Nam vel aliquam quam.',
'Suspendisse posuere nunc vitae pulvinar lobortis.',
'Nunc et sapien ex.',
'Duis nec neque eu arcu ultrices ullamcorper in et mauris.',
'Praesent mi felis, hendrerit a nulla id, mattis consectetur est.',
'Duis venenatis posuere est sit amet congue.',
'Vestibulum vitae sapien ultricies, auctor purus vitae, laoreet lacus.',
'Fusce laoreet nisi dui, a bibendum metus consequat in.',
'Nulla sed iaculis odio, sed lobortis lacus.',
'Etiam massa felis, gravida at nibh viverra, tincidunt convallis justo.',
'Maecenas ut egestas urna.',
'Pellentesque consectetur mattis imperdiet.',
'Maecenas pulvinar efficitur justo a cursus.',
];
const ATTACHMENT_SAMPLE_RATE = 0.33;
const createRandomMessage = ({ conversationId } = {}) => {
if (!isString(conversationId)) {
throw new TypeError('"conversationId" must be a string');
}
const sentAt = Date.now() - random(100 * 24 * 60 * 60 * 1000);
const receivedAt = sentAt + random(30 * 1000);
const hasAttachment = Math.random() <= ATTACHMENT_SAMPLE_RATE;
const attachments = hasAttachment
? [createRandomInMemoryAttachment()] : [];
const type = sample(['incoming', 'outgoing']);
const commonProperties = {
attachments,
body: sample(SAMPLE_MESSAGES),
conversationId,
received_at: receivedAt,
sent_at: sentAt,
timestamp: receivedAt,
type,
};
const message = _createMessage({ commonProperties, conversationId, type });
return Message.initializeSchemaVersion(message);
};
const _createMessage = ({ commonProperties, conversationId, type } = {}) => {
switch (type) {
case 'incoming':
return Object.assign({}, commonProperties, {
flags: 0,
source: conversationId,
sourceDevice: 1,
});
case 'outgoing':
return Object.assign({}, commonProperties, {
delivered: 1,
delivered_to: [conversationId],
expireTimer: 0,
recipients: [conversationId],
sent_to: [conversationId],
synced: true,
});
default:
throw new TypeError(`Unknown message type: '${type}'`);
}
};
const MEGA_BYTE = 1e6;
const createRandomInMemoryAttachment = () => {
const numBytes = (1 + Math.ceil((Math.random() * 50))) * MEGA_BYTE;
const array = new Uint32Array(numBytes).fill(1);
const data = array.buffer;
const fileName = Math.random().toString().slice(2);
return {
contentType: 'application/octet-stream',
data,
fileName,
size: numBytes,
};
};

@ -0,0 +1,3 @@
exports.deferredToPromise = deferred =>
// eslint-disable-next-line more/no-then
new Promise((resolve, reject) => deferred.then(resolve, reject));

@ -1,52 +1,67 @@
const isNumber = require('lodash/isNumber');
const isFunction = require('lodash/isFunction');
// Module to upgrade the schema of messages, e.g. migrate attachments to disk.
// `processAll` purposely doesnt rely on our Backbone IndexedDB adapter to
// prevent automatic migrations. Rather, it uses direct IndexedDB access.
// This includes avoiding usage of `storage` module which uses Backbone under
// the hood.
/* global IDBKeyRange */
const {
isFunction,
isNumber,
isObject,
isString,
last,
} = require('lodash');
const database = require('./database');
const Message = require('./types/message');
const settings = require('./settings');
const { deferredToPromise } = require('./deferred_to_promise');
const MESSAGES_STORE_NAME = 'messages';
const NUM_MESSAGES_PER_BATCH = 1;
const processNext = async ({
exports.processNext = async ({
BackboneMessage,
BackboneMessageCollection,
count,
upgradeMessageSchema,
wrapDeferred,
} = {}) => {
if (!isFunction(BackboneMessage)) {
throw new TypeError('`BackboneMessage` (Whisper.Message) constructor is required');
throw new TypeError('"BackboneMessage" (Whisper.Message) constructor is required');
}
if (!isFunction(BackboneMessageCollection)) {
throw new TypeError('`BackboneMessageCollection` (Whisper.MessageCollection)' +
throw new TypeError('"BackboneMessageCollection" (Whisper.MessageCollection)' +
' constructor is required');
}
if (!isNumber(count)) {
throw new TypeError('`count` is required');
throw new TypeError('"count" is required');
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('`upgradeMessageSchema` is required');
}
if (!isFunction(wrapDeferred)) {
throw new TypeError('`wrapDeferred` is required');
throw new TypeError('"upgradeMessageSchema" is required');
}
const startTime = Date.now();
const startFetchTime = Date.now();
const fetchStartTime = Date.now();
const messagesRequiringSchemaUpgrade =
await _fetchMessagesRequiringSchemaUpgrade({ BackboneMessageCollection, count });
const fetchDuration = Date.now() - startFetchTime;
const fetchDuration = Date.now() - fetchStartTime;
const startUpgradeTime = Date.now();
const upgradeStartTime = Date.now();
const upgradedMessages =
await Promise.all(messagesRequiringSchemaUpgrade.map(upgradeMessageSchema));
const upgradeDuration = Date.now() - startUpgradeTime;
const upgradeDuration = Date.now() - upgradeStartTime;
const startSaveTime = Date.now();
const saveMessage = _saveMessage({ BackboneMessage, wrapDeferred });
const saveStartTime = Date.now();
const saveMessage = _saveMessageBackbone({ BackboneMessage });
await Promise.all(upgradedMessages.map(saveMessage));
const saveDuration = Date.now() - startSaveTime;
const saveDuration = Date.now() - saveStartTime;
const totalDuration = Date.now() - startTime;
const numProcessed = messagesRequiringSchemaUpgrade.length;
@ -61,20 +76,154 @@ const processNext = async ({
};
};
const _saveMessage = ({ BackboneMessage, wrapDeferred } = {}) => (message) => {
exports.processAll = async ({
databaseName,
minDatabaseVersion,
upgradeMessageSchema,
} = {}) => {
if (!isString(databaseName)) {
throw new TypeError('"databaseName" must be a string');
}
if (!isNumber(minDatabaseVersion)) {
throw new TypeError('"minDatabaseVersion" must be a number');
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('"upgradeMessageSchema" is required');
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
console.log('Database status', {
databaseVersion,
isValidDatabaseVersion,
minDatabaseVersion,
});
if (!isValidDatabaseVersion) {
throw new Error(`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`);
}
const isComplete = await settings.isAttachmentMigrationComplete(connection);
console.log('Attachment migration status:', isComplete ? 'complete' : 'incomplete');
if (isComplete) {
return;
}
let numTotalMessages = null;
// eslint-disable-next-line more/no-then
getNumMessages({ connection }).then((numMessages) => {
numTotalMessages = numMessages;
});
const migrationStartTime = Date.now();
let unprocessedMessages = [];
let totalMessagesProcessed = 0;
do {
const lastProcessedIndex =
// eslint-disable-next-line no-await-in-loop
await settings.getAttachmentMigrationLastProcessedIndex(connection);
const fetchUnprocessedMessagesStartTime = Date.now();
unprocessedMessages =
// eslint-disable-next-line no-await-in-loop
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
connection,
count: NUM_MESSAGES_PER_BATCH,
lastIndex: lastProcessedIndex,
});
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const numUnprocessedMessages = unprocessedMessages.length;
if (numUnprocessedMessages === 0) {
break;
}
const upgradeStartTime = Date.now();
const upgradedMessages =
// eslint-disable-next-line no-await-in-loop
await Promise.all(unprocessedMessages.map(upgradeMessageSchema));
const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction);
// eslint-disable-next-line no-await-in-loop
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
// eslint-disable-next-line no-await-in-loop
await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime;
// TODO: Confirm transaction is complete
const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (newLastProcessedIndex) {
// eslint-disable-next-line no-await-in-loop
await settings.setAttachmentMigrationLastProcessedIndex(
connection,
newLastProcessedIndex
);
}
totalMessagesProcessed += numUnprocessedMessages;
console.log('Upgrade message schema:', {
lastProcessedIndex,
numUnprocessedMessages,
numCumulativeMessagesProcessed: totalMessagesProcessed,
numTotalMessages,
fetchDuration,
saveDuration,
upgradeDuration,
newLastProcessedIndex,
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
});
} while (unprocessedMessages.length > 0);
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
console.log('Close database connection');
connection.close();
const totalDuration = Date.now() - migrationStartTime;
console.log('Attachment migration complete:', {
totalDuration,
totalMessagesProcessed,
});
};
const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => {
const backboneMessage = new BackboneMessage(message);
return wrapDeferred(backboneMessage.save());
return deferredToPromise(backboneMessage.save());
};
const _saveMessage = ({ transaction } = {}) => (message) => {
if (!isObject(transaction)) {
throw new TypeError('"transaction" is required');
}
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const request = messagesStore.put(message, message.id);
return new Promise((resolve, reject) => {
request.onsuccess = () =>
resolve();
request.onerror = event =>
reject(event.target.error);
});
};
const _fetchMessagesRequiringSchemaUpgrade =
async ({ BackboneMessageCollection, count } = {}) => {
if (!isFunction(BackboneMessageCollection)) {
throw new TypeError('`BackboneMessageCollection` (Whisper.MessageCollection)' +
throw new TypeError('"BackboneMessageCollection" (Whisper.MessageCollection)' +
' constructor is required');
}
if (!isNumber(count)) {
throw new TypeError('`count` is required');
throw new TypeError('"count" is required');
}
const collection = new BackboneMessageCollection();
@ -93,7 +242,49 @@ const _fetchMessagesRequiringSchemaUpgrade =
}));
};
// NOTE: Named dangerous because it is not as efficient as using our
// `messages` `schemaVersion` index:
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex =
({ connection, count, lastIndex } = {}) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
if (!isNumber(count)) {
throw new TypeError('"count" is required');
}
if (lastIndex && !isString(lastIndex)) {
throw new TypeError('"lastIndex" must be a string');
}
const hasLastIndex = Boolean(lastIndex);
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const excludeLowerBound = true;
const query = hasLastIndex
? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound)
: undefined;
const request = messagesStore.getAll(query, count);
return new Promise((resolve, reject) => {
request.onsuccess = event =>
resolve(event.target.result);
request.onerror = event =>
reject(event.target.error);
});
};
const getNumMessages = async ({ connection } = {}) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const numTotalMessages = await database.getCount({ store: messagesStore });
await database.completeTransaction(transaction);
module.exports = {
processNext,
return numTotalMessages;
};

@ -1,55 +0,0 @@
const Message = require('../../types/message');
exports.run = async (transaction) => {
const messagesStore = transaction.objectStore('messages');
console.log('Initialize messages schema version');
const numUpgradedMessages = await _initializeMessageSchemaVersion(messagesStore);
console.log('Complete messages schema version initialization', { numUpgradedMessages });
console.log('Create index from attachment schema version to attachment');
messagesStore.createIndex('schemaVersion', 'schemaVersion', { unique: false });
};
const _initializeMessageSchemaVersion = messagesStore =>
new Promise((resolve, reject) => {
const messagePutOperations = [];
const cursorRequest = messagesStore.openCursor();
cursorRequest.onsuccess = async (event) => {
const cursor = event.target.result;
const hasMoreData = Boolean(cursor);
if (!hasMoreData) {
await Promise.all(messagePutOperations);
return resolve(messagePutOperations.length);
}
const message = cursor.value;
const messageWithSchemaVersion = Message.initializeSchemaVersion(message);
messagePutOperations.push(putItem(
messagesStore,
messageWithSchemaVersion,
messageWithSchemaVersion.id
));
return cursor.continue();
};
cursorRequest.onerror = event =>
reject(event.target.error);
});
// putItem :: IDBObjectStore -> Item -> Key -> Promise Item
const putItem = (store, item, key) =>
new Promise((resolve, reject) => {
try {
const request = store.put(item, key);
request.onsuccess = event =>
resolve(event.target.result);
request.onerror = event =>
reject(event.target.error);
} catch (error) {
reject(error);
}
});

@ -0,0 +1,156 @@
const { last } = require('lodash');
const { runMigrations } = require('./run_migrations');
// IMPORTANT: The migrations below are run on a database that may be very large
// due to attachments being directly stored inside the database. Please avoid
// any expensive operations, e.g. modifying all messages / attachments, etc., as
// it may cause out-of-memory errors for users with long histories:
// https://github.com/signalapp/Signal-Desktop/issues/2163
exports.migrations = [
{
version: '12.0',
migrate(transaction, next) {
console.log('Migration 12');
console.log('creating object stores');
const messages = transaction.db.createObjectStore('messages');
messages.createIndex('conversation', ['conversationId', 'received_at'], {
unique: false,
});
messages.createIndex('receipt', 'sent_at', { unique: false });
messages.createIndex('unread', ['conversationId', 'unread'], { unique: false });
messages.createIndex('expires_at', 'expires_at', { unique: false });
const conversations = transaction.db.createObjectStore('conversations');
conversations.createIndex('inbox', 'active_at', { unique: false });
conversations.createIndex('group', 'members', {
unique: false,
multiEntry: true,
});
conversations.createIndex('type', 'type', {
unique: false,
});
conversations.createIndex('search', 'tokens', {
unique: false,
multiEntry: true,
});
transaction.db.createObjectStore('groups');
transaction.db.createObjectStore('sessions');
transaction.db.createObjectStore('identityKeys');
transaction.db.createObjectStore('preKeys');
transaction.db.createObjectStore('signedPreKeys');
transaction.db.createObjectStore('items');
console.log('creating debug log');
transaction.db.createObjectStore('debug');
next();
},
},
{
version: '13.0',
migrate(transaction, next) {
console.log('Migration 13');
console.log('Adding fields to identity keys');
const identityKeys = transaction.objectStore('identityKeys');
const request = identityKeys.openCursor();
const promises = [];
request.onsuccess = (event) => {
const cursor = event.target.result;
if (cursor) {
const attributes = cursor.value;
attributes.timestamp = 0;
attributes.firstUse = false;
attributes.nonblockingApproval = false;
attributes.verified = 0;
promises.push(new Promise(((resolve, reject) => {
const putRequest = identityKeys.put(attributes, attributes.id);
putRequest.onsuccess = resolve;
putRequest.onerror = (e) => {
console.log(e);
reject(e);
};
})));
cursor.continue();
} else {
// no more results
// eslint-disable-next-line more/no-then
Promise.all(promises).then(() => {
next();
});
}
};
request.onerror = (event) => {
console.log(event);
};
},
},
{
version: '14.0',
migrate(transaction, next) {
console.log('Migration 14');
console.log('Adding unprocessed message store');
const unprocessed = transaction.db.createObjectStore('unprocessed');
unprocessed.createIndex('received', 'timestamp', { unique: false });
next();
},
},
{
version: '15.0',
migrate(transaction, next) {
console.log('Migration 15');
console.log('Adding messages index for de-duplication');
const messages = transaction.objectStore('messages');
messages.createIndex('unique', ['source', 'sourceDevice', 'sent_at'], {
unique: true,
});
next();
},
},
{
version: '16.0',
migrate(transaction, next) {
console.log('Migration 16');
console.log('Dropping log table, since we now log to disk');
transaction.db.deleteObjectStore('debug');
next();
},
},
{
version: 17,
async migrate(transaction, next) {
console.log('Migration 17');
const start = Date.now();
const messagesStore = transaction.objectStore('messages');
console.log('Create index from attachment schema version to attachment');
messagesStore.createIndex('schemaVersion', 'schemaVersion', { unique: false });
const duration = Date.now() - start;
console.log(
'Complete migration to database version 17.',
`Duration: ${duration}ms`
);
next();
},
},
];
const database = {
id: 'signal',
nolog: true,
migrations: exports.migrations,
};
exports.run = ({ Backbone } = {}) =>
runMigrations({ Backbone, database });
exports.getDatabase = () => ({
name: database.id,
version: last(exports.migrations).version,
});

@ -0,0 +1,15 @@
const { runMigrations } = require('./run_migrations');
exports.migrations = [
// {
// version: 18,
// async migrate(transaction, next) {
// console.log('Migration 18');
// console.log('Attachments stored on disk');
// next();
// },
// },
];
exports.run = runMigrations;

@ -0,0 +1,71 @@
/* eslint-env browser */
const {
head,
isFunction,
isObject,
isString,
last,
} = require('lodash');
const db = require('../database');
const { deferredToPromise } = require('../deferred_to_promise');
const closeDatabaseConnection = ({ Backbone } = {}) =>
deferredToPromise(Backbone.sync('closeall'));
exports.runMigrations = async ({ Backbone, database } = {}) => {
if (!isObject(Backbone) || !isObject(Backbone.Collection) ||
!isFunction(Backbone.Collection.extend)) {
throw new TypeError('"Backbone" is required');
}
if (!isObject(database) || !isString(database.id) ||
!Array.isArray(database.migrations)) {
throw new TypeError('"database" is required');
}
const {
firstVersion: firstMigrationVersion,
lastVersion: lastMigrationVersion,
} = getMigrationVersions(database);
const databaseVersion = await db.getVersion(database.id);
const isAlreadyUpgraded = databaseVersion >= lastMigrationVersion;
console.log('Database status', {
firstMigrationVersion,
lastMigrationVersion,
databaseVersion,
isAlreadyUpgraded,
});
if (isAlreadyUpgraded) {
return;
}
const migrationCollection = new (Backbone.Collection.extend({
database,
storeName: 'items',
}))();
await deferredToPromise(migrationCollection.fetch({ limit: 1 }));
console.log('Close database connection');
await closeDatabaseConnection({ Backbone });
};
const getMigrationVersions = (database) => {
if (!isObject(database) || !Array.isArray(database.migrations)) {
throw new TypeError('"database" is required');
}
const firstMigration = head(database.migrations);
const lastMigration = last(database.migrations);
const firstVersion = firstMigration ? parseInt(firstMigration.version, 10) : null;
const lastVersion = lastMigration ? parseInt(lastMigration.version, 10) : null;
return { firstVersion, lastVersion };
};

@ -2,10 +2,12 @@
const Path = require('path');
const compose = require('lodash/fp/compose');
const escapeRegExp = require('lodash/escapeRegExp');
const isRegExp = require('lodash/isRegExp');
const isString = require('lodash/isString');
const {
escapeRegExp,
isRegExp,
isString,
} = require('lodash');
const { compose } = require('lodash/fp');
const PHONE_NUMBER_PATTERN = /\+\d{7,12}(\d{3})/g;

@ -0,0 +1,86 @@
const { isObject, isString } = require('lodash');
const ITEMS_STORE_NAME = 'items';
const LAST_PROCESSED_INDEX_KEY = 'attachmentMigration_lastProcessedIndex';
const IS_MIGRATION_COMPLETE_KEY = 'attachmentMigration_isComplete';
// Public API
exports.getAttachmentMigrationLastProcessedIndex = connection =>
exports._getItem(connection, LAST_PROCESSED_INDEX_KEY);
exports.setAttachmentMigrationLastProcessedIndex = (connection, value) =>
exports._setItem(connection, LAST_PROCESSED_INDEX_KEY, value);
exports.deleteAttachmentMigrationLastProcessedIndex = connection =>
exports._deleteItem(connection, LAST_PROCESSED_INDEX_KEY);
exports.isAttachmentMigrationComplete = async connection =>
Boolean(await exports._getItem(connection, IS_MIGRATION_COMPLETE_KEY));
exports.markAttachmentMigrationComplete = connection =>
exports._setItem(connection, IS_MIGRATION_COMPLETE_KEY, true);
// Private API
exports._getItem = (connection, key) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
if (!isString(key)) {
throw new TypeError('"key" must be a string');
}
const transaction = connection.transaction(ITEMS_STORE_NAME, 'readonly');
const itemsStore = transaction.objectStore(ITEMS_STORE_NAME);
const request = itemsStore.get(key);
return new Promise((resolve, reject) => {
request.onerror = event =>
reject(event.target.error);
request.onsuccess = event =>
resolve(event.target.result ? event.target.result.value : null);
});
};
exports._setItem = (connection, key, value) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
if (!isString(key)) {
throw new TypeError('"key" must be a string');
}
const transaction = connection.transaction(ITEMS_STORE_NAME, 'readwrite');
const itemsStore = transaction.objectStore(ITEMS_STORE_NAME);
const request = itemsStore.put({ id: key, value }, key);
return new Promise((resolve, reject) => {
request.onerror = event =>
reject(event.target.error);
request.onsuccess = () =>
resolve();
});
};
exports._deleteItem = (connection, key) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
if (!isString(key)) {
throw new TypeError('"key" must be a string');
}
const transaction = connection.transaction(ITEMS_STORE_NAME, 'readwrite');
const itemsStore = transaction.objectStore(ITEMS_STORE_NAME);
const request = itemsStore.delete(key);
return new Promise((resolve, reject) => {
request.onerror = event =>
reject(event.target.error);
request.onsuccess = () =>
resolve();
});
};

@ -0,0 +1,4 @@
/* global setTimeout */
exports.sleep = ms =>
new Promise(resolve => setTimeout(resolve, ms));

@ -1,5 +1,4 @@
const isFunction = require('lodash/isFunction');
const isString = require('lodash/isString');
const { isFunction, isString } = require('lodash');
const MIME = require('./mime');
const { arrayBufferToBlob, blobToArrayBuffer, dataURLToBlob } = require('blob-util');
@ -17,7 +16,6 @@ const { migrateDataToFileSystem } = require('./attachment/migrate_data_to_file_s
// key: ArrayBuffer
// size: integer
// thumbnail: ArrayBuffer
// schemaVersion: integer
// }
// // Outgoing message attachment fields
@ -26,7 +24,6 @@ const { migrateDataToFileSystem } = require('./attachment/migrate_data_to_file_s
// data: ArrayBuffer
// fileName: string
// size: integer
// schemaVersion: integer
// }
// Returns true if `rawAttachment` is a valid attachment based on our current schema.

@ -1,7 +1,9 @@
const isArrayBuffer = require('lodash/isArrayBuffer');
const isFunction = require('lodash/isFunction');
const isUndefined = require('lodash/isUndefined');
const omit = require('lodash/omit');
const {
isArrayBuffer,
isFunction,
isUndefined,
omit,
} = require('lodash');
// type Context :: {

@ -1,4 +1,4 @@
const isFunction = require('lodash/isFunction');
const { isFunction } = require('lodash');
const Attachment = require('./attachment');
const Errors = require('./errors');

@ -1,4 +1,4 @@
const isNumber = require('lodash/isNumber');
const { isNumber } = require('lodash');
exports.isValid = value =>

@ -7,17 +7,14 @@
const Attachment = require('./js/modules/types/attachment');
const Attachments = require('./app/attachments');
const Message = require('./js/modules/types/message');
const { deferredToPromise } = require('./js/modules/deferred_to_promise');
const { app } = electron.remote;
window.PROTO_ROOT = 'protos';
window.config = require('url').parse(window.location.toString(), true).query;
window.wrapDeferred = function(deferred) {
return new Promise(function(resolve, reject) {
deferred.then(resolve, reject);
});
};
window.wrapDeferred = deferredToPromise;
const ipc = electron.ipcRenderer;
window.config.localeMessages = ipc.sendSync('locale-data');
@ -127,13 +124,19 @@
window.Signal = {};
window.Signal.Backup = require('./js/modules/backup');
window.Signal.Crypto = require('./js/modules/crypto');
window.Signal.Database = require('./js/modules/database');
window.Signal.Debug = require('./js/modules/debug');
window.Signal.Logs = require('./js/modules/logs');
window.Signal.Migrations = {};
window.Signal.Migrations.loadAttachmentData = Attachment.loadData(readAttachmentData);
window.Signal.Migrations.deleteAttachmentData = Attachment.deleteData(deleteAttachmentData);
window.Signal.Migrations.upgradeMessageSchema = upgradeMessageSchema;
window.Signal.Migrations.V17 = require('./js/modules/migrations/17');
window.Signal.Migrations.Migrations0DatabaseWithAttachmentData =
require('./js/modules/migrations/migrations_0_database_with_attachment_data');
window.Signal.Migrations.Migrations1DatabaseWithoutAttachmentData =
require('./js/modules/migrations/migrations_1_database_without_attachment_data');
window.Signal.OS = require('./js/modules/os');
window.Signal.Settings = require('./js/modules/settings');
window.Signal.Types = {};
window.Signal.Types.Attachment = Attachment;
window.Signal.Types.Errors = require('./js/modules/types/errors');

@ -0,0 +1,18 @@
#!/bin/bash
ROOT=$1
if [[ "$1" == "" ]]; then
echo "Usage: $(basename "$0") <signal-profile-path>"
exit 1
fi
while true
do
echo -n "$(date -u +"%Y-%m-%dT%H:%M:%SZ ")"
du -sm "$ROOT/attachments.noindex"
echo -n "$(date -u +"%Y-%m-%dT%H:%M:%SZ ")"
du -sm "$ROOT/IndexedDB"
sleep 1
done
Loading…
Cancel
Save