Create IndexedDB index from `schemaVersion` to `Message` (#2128)
commit
51d17a6dcb
@ -0,0 +1,55 @@
|
||||
const Message = require('../../types/message');
|
||||
|
||||
|
||||
exports.run = async (transaction) => {
|
||||
const messagesStore = transaction.objectStore('messages');
|
||||
|
||||
console.log('Initialize messages schema version');
|
||||
const numUpgradedMessages = await _initializeMessageSchemaVersion(messagesStore);
|
||||
console.log('Complete messages schema version initialization', { numUpgradedMessages });
|
||||
|
||||
console.log('Create index from attachment schema version to attachment');
|
||||
messagesStore.createIndex('schemaVersion', 'schemaVersion', { unique: false });
|
||||
};
|
||||
|
||||
const _initializeMessageSchemaVersion = messagesStore =>
|
||||
new Promise((resolve, reject) => {
|
||||
const messagePutOperations = [];
|
||||
|
||||
const cursorRequest = messagesStore.openCursor();
|
||||
cursorRequest.onsuccess = async (event) => {
|
||||
const cursor = event.target.result;
|
||||
const hasMoreData = Boolean(cursor);
|
||||
if (!hasMoreData) {
|
||||
await Promise.all(messagePutOperations);
|
||||
return resolve(messagePutOperations.length);
|
||||
}
|
||||
|
||||
const message = cursor.value;
|
||||
const messageWithSchemaVersion = Message.initializeSchemaVersion(message);
|
||||
messagePutOperations.push(putItem(
|
||||
messagesStore,
|
||||
messageWithSchemaVersion,
|
||||
messageWithSchemaVersion.id
|
||||
));
|
||||
|
||||
return cursor.continue();
|
||||
};
|
||||
|
||||
cursorRequest.onerror = event =>
|
||||
reject(event.target.error);
|
||||
});
|
||||
|
||||
// putItem :: IDBObjectStore -> Item -> Key -> Promise Item
|
||||
const putItem = (store, item, key) =>
|
||||
new Promise((resolve, reject) => {
|
||||
try {
|
||||
const request = store.put(item, key);
|
||||
request.onsuccess = event =>
|
||||
resolve(event.target.result);
|
||||
request.onerror = event =>
|
||||
reject(event.target.error);
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
@ -1,17 +1,165 @@
|
||||
const isFunction = require('lodash/isFunction');
|
||||
|
||||
const Attachment = require('./attachment');
|
||||
const Errors = require('./errors');
|
||||
const SchemaVersion = require('./schema_version');
|
||||
|
||||
|
||||
const GROUP = 'group';
|
||||
const PRIVATE = 'private';
|
||||
|
||||
// Schema version history
|
||||
//
|
||||
// Version 0
|
||||
// - Schema initialized
|
||||
// Version 1
|
||||
// - Attachments: Auto-orient JPEG attachments using EXIF `Orientation` data
|
||||
// Version 2
|
||||
// - Attachments: Sanitize Unicode order override characters
|
||||
const INITIAL_SCHEMA_VERSION = 0;
|
||||
|
||||
// Increment this version number every time we add a message schema upgrade
|
||||
// step. This will allow us to retroactively upgrade existing messages. As we
|
||||
// add more upgrade steps, we could design a pipeline that does this
|
||||
// incrementally, e.g. from version 0 / unknown -> 1, 1 --> 2, etc., similar to
|
||||
// how we do database migrations:
|
||||
exports.CURRENT_SCHEMA_VERSION = 2;
|
||||
|
||||
|
||||
// Public API
|
||||
exports.GROUP = GROUP;
|
||||
exports.PRIVATE = PRIVATE;
|
||||
|
||||
// Placeholder until we have stronger preconditions:
|
||||
exports.isValid = () =>
|
||||
true;
|
||||
|
||||
// Schema
|
||||
// Message -> Promise Message
|
||||
exports.initializeSchemaVersion = (message) => {
|
||||
const isInitialized = SchemaVersion.isValid(message.schemaVersion) &&
|
||||
message.schemaVersion >= 1;
|
||||
if (isInitialized) {
|
||||
return message;
|
||||
}
|
||||
|
||||
const numAttachments = Array.isArray(message.attachments)
|
||||
? message.attachments.length
|
||||
: 0;
|
||||
const hasAttachments = numAttachments > 0;
|
||||
if (!hasAttachments) {
|
||||
return Object.assign(
|
||||
{},
|
||||
message,
|
||||
{ schemaVersion: INITIAL_SCHEMA_VERSION }
|
||||
);
|
||||
}
|
||||
|
||||
// All attachments should have the same schema version, so we just pick
|
||||
// the first one:
|
||||
const firstAttachment = message.attachments[0];
|
||||
const inheritedSchemaVersion = SchemaVersion.isValid(firstAttachment.schemaVersion)
|
||||
? firstAttachment.schemaVersion
|
||||
: INITIAL_SCHEMA_VERSION;
|
||||
const messageWithInitialSchema = Object.assign(
|
||||
{},
|
||||
message,
|
||||
{
|
||||
schemaVersion: inheritedSchemaVersion,
|
||||
attachments: message.attachments.map(Attachment.removeSchemaVersion),
|
||||
}
|
||||
);
|
||||
|
||||
return messageWithInitialSchema;
|
||||
};
|
||||
|
||||
// Middleware
|
||||
// type UpgradeStep = Message -> Promise Message
|
||||
|
||||
// SchemaVersion -> UpgradeStep -> UpgradeStep
|
||||
exports._withSchemaVersion = (schemaVersion, upgrade) => {
|
||||
if (!SchemaVersion.isValid(schemaVersion)) {
|
||||
throw new TypeError('`schemaVersion` is invalid');
|
||||
}
|
||||
if (!isFunction(upgrade)) {
|
||||
throw new TypeError('`upgrade` must be a function');
|
||||
}
|
||||
|
||||
return async (message) => {
|
||||
if (!exports.isValid(message)) {
|
||||
console.log('Message._withSchemaVersion: Invalid input message:', message);
|
||||
return message;
|
||||
}
|
||||
|
||||
const isAlreadyUpgraded = message.schemaVersion >= schemaVersion;
|
||||
if (isAlreadyUpgraded) {
|
||||
return message;
|
||||
}
|
||||
|
||||
const expectedVersion = schemaVersion - 1;
|
||||
const hasExpectedVersion = message.schemaVersion === expectedVersion;
|
||||
if (!hasExpectedVersion) {
|
||||
console.log(
|
||||
'WARNING: Message._withSchemaVersion: Unexpected version:',
|
||||
`Expected message to have version ${expectedVersion},`,
|
||||
`but got ${message.schemaVersion}.`,
|
||||
message
|
||||
);
|
||||
return message;
|
||||
}
|
||||
|
||||
let upgradedMessage;
|
||||
try {
|
||||
upgradedMessage = await upgrade(message);
|
||||
} catch (error) {
|
||||
console.log(
|
||||
'Message._withSchemaVersion: error:',
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
return message;
|
||||
}
|
||||
|
||||
if (!exports.isValid(upgradedMessage)) {
|
||||
console.log(
|
||||
'Message._withSchemaVersion: Invalid upgraded message:',
|
||||
upgradedMessage
|
||||
);
|
||||
return message;
|
||||
}
|
||||
|
||||
return Object.assign(
|
||||
{},
|
||||
upgradedMessage,
|
||||
{ schemaVersion }
|
||||
);
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
// Public API
|
||||
// _mapAttachments :: (Attachment -> Promise Attachment) ->
|
||||
// Message ->
|
||||
// Promise Message
|
||||
exports._mapAttachments = upgradeAttachment => async message =>
|
||||
Object.assign(
|
||||
{},
|
||||
message,
|
||||
{
|
||||
attachments: await Promise.all(message.attachments.map(upgradeAttachment)),
|
||||
}
|
||||
);
|
||||
|
||||
const toVersion0 = async message =>
|
||||
exports.initializeSchemaVersion(message);
|
||||
|
||||
const toVersion1 = exports._withSchemaVersion(
|
||||
1,
|
||||
exports._mapAttachments(Attachment.autoOrientJPEG)
|
||||
);
|
||||
const toVersion2 = exports._withSchemaVersion(
|
||||
2,
|
||||
exports._mapAttachments(Attachment.replaceUnicodeOrderOverrides)
|
||||
);
|
||||
|
||||
// UpgradeStep
|
||||
exports.upgradeSchema = async message =>
|
||||
Object.assign({}, message, {
|
||||
attachments:
|
||||
await Promise.all(message.attachments.map(Attachment.upgradeSchema)),
|
||||
});
|
||||
toVersion2(await toVersion1(await toVersion0(message)));
|
||||
|
@ -0,0 +1,5 @@
|
||||
const isNumber = require('lodash/isNumber');
|
||||
|
||||
|
||||
exports.isValid = value =>
|
||||
isNumber(value) && value >= 0;
|
@ -0,0 +1,240 @@
|
||||
const { assert } = require('chai');
|
||||
|
||||
const Message = require('../../../js/modules/types/message');
|
||||
|
||||
|
||||
describe('Message', () => {
|
||||
describe('initializeSchemaVersion', () => {
|
||||
it('should ignore messages with previously inherited schema', () => {
|
||||
const input = {
|
||||
body: 'Imagine there is no heaven…',
|
||||
schemaVersion: 2,
|
||||
};
|
||||
const expected = {
|
||||
body: 'Imagine there is no heaven…',
|
||||
schemaVersion: 2,
|
||||
};
|
||||
|
||||
const actual = Message.initializeSchemaVersion(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
context('for message without attachments', () => {
|
||||
it('should initialize schema version to zero', () => {
|
||||
const input = {
|
||||
body: 'Imagine there is no heaven…',
|
||||
attachments: [],
|
||||
};
|
||||
const expected = {
|
||||
body: 'Imagine there is no heaven…',
|
||||
attachments: [],
|
||||
schemaVersion: 0,
|
||||
};
|
||||
|
||||
const actual = Message.initializeSchemaVersion(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
});
|
||||
|
||||
context('for message with attachments', () => {
|
||||
it('should inherit existing attachment schema version', () => {
|
||||
const input = {
|
||||
body: 'Imagine there is no heaven…',
|
||||
attachments: [{
|
||||
contentType: 'image/jpeg',
|
||||
fileName: 'lennon.jpg',
|
||||
schemaVersion: 7,
|
||||
}],
|
||||
};
|
||||
const expected = {
|
||||
body: 'Imagine there is no heaven…',
|
||||
attachments: [{
|
||||
contentType: 'image/jpeg',
|
||||
fileName: 'lennon.jpg',
|
||||
}],
|
||||
schemaVersion: 7,
|
||||
};
|
||||
|
||||
const actual = Message.initializeSchemaVersion(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('upgradeSchema', () => {
|
||||
it('should upgrade an unversioned message to the latest version', async () => {
|
||||
const input = {
|
||||
attachments: [{
|
||||
contentType: 'application/json',
|
||||
data: null,
|
||||
fileName: 'test\u202Dfig.exe',
|
||||
size: 1111,
|
||||
}],
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const expected = {
|
||||
attachments: [{
|
||||
contentType: 'application/json',
|
||||
data: null,
|
||||
fileName: 'test\uFFFDfig.exe',
|
||||
size: 1111,
|
||||
}],
|
||||
schemaVersion: Message.CURRENT_SCHEMA_VERSION,
|
||||
};
|
||||
|
||||
const actual = await Message.upgradeSchema(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
context('with multiple upgrade steps', () => {
|
||||
it('should return last valid message when any upgrade step fails', async () => {
|
||||
const input = {
|
||||
attachments: [{
|
||||
contentType: 'application/json',
|
||||
data: null,
|
||||
fileName: 'test\u202Dfig.exe',
|
||||
size: 1111,
|
||||
}],
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const expected = {
|
||||
attachments: [{
|
||||
contentType: 'application/json',
|
||||
data: null,
|
||||
fileName: 'test\u202Dfig.exe',
|
||||
size: 1111,
|
||||
}],
|
||||
hasUpgradedToVersion1: true,
|
||||
schemaVersion: 1,
|
||||
};
|
||||
|
||||
const v1 = async message =>
|
||||
Object.assign({}, message, { hasUpgradedToVersion1: true });
|
||||
const v2 = async () => {
|
||||
throw new Error('boom');
|
||||
};
|
||||
const v3 = async message =>
|
||||
Object.assign({}, message, { hasUpgradedToVersion3: true });
|
||||
|
||||
const toVersion1 = Message._withSchemaVersion(1, v1);
|
||||
const toVersion2 = Message._withSchemaVersion(2, v2);
|
||||
const toVersion3 = Message._withSchemaVersion(3, v3);
|
||||
|
||||
const upgradeSchema = async message =>
|
||||
toVersion3(await toVersion2(await toVersion1(message)));
|
||||
|
||||
const actual = await upgradeSchema(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
it('should skip out-of-order upgrade steps', async () => {
|
||||
const input = {
|
||||
attachments: [{
|
||||
contentType: 'application/json',
|
||||
data: null,
|
||||
fileName: 'test\u202Dfig.exe',
|
||||
size: 1111,
|
||||
}],
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const expected = {
|
||||
attachments: [{
|
||||
contentType: 'application/json',
|
||||
data: null,
|
||||
fileName: 'test\u202Dfig.exe',
|
||||
size: 1111,
|
||||
}],
|
||||
schemaVersion: 2,
|
||||
hasUpgradedToVersion1: true,
|
||||
hasUpgradedToVersion2: true,
|
||||
};
|
||||
|
||||
const v1 = async attachment =>
|
||||
Object.assign({}, attachment, { hasUpgradedToVersion1: true });
|
||||
const v2 = async attachment =>
|
||||
Object.assign({}, attachment, { hasUpgradedToVersion2: true });
|
||||
const v3 = async attachment =>
|
||||
Object.assign({}, attachment, { hasUpgradedToVersion3: true });
|
||||
|
||||
const toVersion1 = Message._withSchemaVersion(1, v1);
|
||||
const toVersion2 = Message._withSchemaVersion(2, v2);
|
||||
const toVersion3 = Message._withSchemaVersion(3, v3);
|
||||
|
||||
// NOTE: We upgrade to 3 before 2, i.e. the pipeline should abort:
|
||||
const upgradeSchema = async attachment =>
|
||||
toVersion2(await toVersion3(await toVersion1(attachment)));
|
||||
|
||||
const actual = await upgradeSchema(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('_withSchemaVersion', () => {
|
||||
it('should require a version number', () => {
|
||||
const toVersionX = () => {};
|
||||
assert.throws(
|
||||
() => Message._withSchemaVersion(toVersionX, 2),
|
||||
'`schemaVersion` is invalid'
|
||||
);
|
||||
});
|
||||
|
||||
it('should require an upgrade function', () => {
|
||||
assert.throws(
|
||||
() => Message._withSchemaVersion(2, 3),
|
||||
'`upgrade` must be a function'
|
||||
);
|
||||
});
|
||||
|
||||
it('should skip upgrading if message has already been upgraded', async () => {
|
||||
const upgrade = async message =>
|
||||
Object.assign({}, message, { foo: true });
|
||||
const upgradeWithVersion = Message._withSchemaVersion(3, upgrade);
|
||||
|
||||
const input = {
|
||||
id: 'guid-guid-guid-guid',
|
||||
schemaVersion: 4,
|
||||
};
|
||||
const expected = {
|
||||
id: 'guid-guid-guid-guid',
|
||||
schemaVersion: 4,
|
||||
};
|
||||
const actual = await upgradeWithVersion(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
it('should return original message if upgrade function throws', async () => {
|
||||
const upgrade = async () => {
|
||||
throw new Error('boom!');
|
||||
};
|
||||
const upgradeWithVersion = Message._withSchemaVersion(3, upgrade);
|
||||
|
||||
const input = {
|
||||
id: 'guid-guid-guid-guid',
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const expected = {
|
||||
id: 'guid-guid-guid-guid',
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const actual = await upgradeWithVersion(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
it('should return original message if upgrade function returns null', async () => {
|
||||
const upgrade = async () => null;
|
||||
const upgradeWithVersion = Message._withSchemaVersion(3, upgrade);
|
||||
|
||||
const input = {
|
||||
id: 'guid-guid-guid-guid',
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const expected = {
|
||||
id: 'guid-guid-guid-guid',
|
||||
schemaVersion: 0,
|
||||
};
|
||||
const actual = await upgradeWithVersion(input);
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
});
|
||||
});
|
@ -0,0 +1,25 @@
|
||||
require('mocha-testcheck').install();
|
||||
const { assert } = require('chai');
|
||||
|
||||
const SchemaVersion = require('../../../js/modules/types/schema_version');
|
||||
|
||||
|
||||
describe('SchemaVersion', () => {
|
||||
describe('isValid', () => {
|
||||
check.it(
|
||||
'should return true for positive integers',
|
||||
gen.posInt,
|
||||
(input) => {
|
||||
assert.isTrue(SchemaVersion.isValid(input));
|
||||
}
|
||||
);
|
||||
|
||||
check.it(
|
||||
'should return false for any other value',
|
||||
gen.primitive.suchThat(value => typeof value !== 'number' || value < 0),
|
||||
(input) => {
|
||||
assert.isFalse(SchemaVersion.isValid(input));
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue