|
|
|
@ -1,6 +1,8 @@
|
|
|
|
|
const isNumber = require('lodash/isNumber');
|
|
|
|
|
const isFunction = require('lodash/isFunction');
|
|
|
|
|
|
|
|
|
|
const Message = require('./types/message');
|
|
|
|
|
const { deferredToPromise } = require('./deferred_to_promise');
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
exports.processNext = async ({
|
|
|
|
@ -8,7 +10,6 @@ exports.processNext = async ({
|
|
|
|
|
BackboneMessageCollection,
|
|
|
|
|
count,
|
|
|
|
|
upgradeMessageSchema,
|
|
|
|
|
wrapDeferred,
|
|
|
|
|
} = {}) => {
|
|
|
|
|
if (!isFunction(BackboneMessage)) {
|
|
|
|
|
throw new TypeError('`BackboneMessage` (Whisper.Message) constructor is required');
|
|
|
|
@ -27,10 +28,6 @@ exports.processNext = async ({
|
|
|
|
|
throw new TypeError('`upgradeMessageSchema` is required');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isFunction(wrapDeferred)) {
|
|
|
|
|
throw new TypeError('`wrapDeferred` is required');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const startTime = Date.now();
|
|
|
|
|
|
|
|
|
|
const startFetchTime = Date.now();
|
|
|
|
@ -44,7 +41,7 @@ exports.processNext = async ({
|
|
|
|
|
const upgradeDuration = Date.now() - startUpgradeTime;
|
|
|
|
|
|
|
|
|
|
const startSaveTime = Date.now();
|
|
|
|
|
const saveMessage = _saveMessage({ BackboneMessage, wrapDeferred });
|
|
|
|
|
const saveMessage = _saveMessage({ BackboneMessage });
|
|
|
|
|
await Promise.all(upgradedMessages.map(saveMessage));
|
|
|
|
|
const saveDuration = Date.now() - startSaveTime;
|
|
|
|
|
|
|
|
|
@ -61,9 +58,9 @@ exports.processNext = async ({
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const _saveMessage = ({ BackboneMessage, wrapDeferred } = {}) => (message) => {
|
|
|
|
|
const _saveMessage = ({ BackboneMessage } = {}) => (message) => {
|
|
|
|
|
const backboneMessage = new BackboneMessage(message);
|
|
|
|
|
return wrapDeferred(backboneMessage.save());
|
|
|
|
|
return deferredToPromise(backboneMessage.save());
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const _fetchMessagesRequiringSchemaUpgrade =
|
|
|
|
|