|
|
|
@ -1,8 +1,7 @@
|
|
|
|
|
/* global Signal, setTimeout, clearTimeout, getMessageController, NewReceiver */
|
|
|
|
|
|
|
|
|
|
const { isNumber, omit } = require('lodash');
|
|
|
|
|
const getGuid = require('uuid/v4');
|
|
|
|
|
const {
|
|
|
|
|
import { isNumber, omit } from 'lodash';
|
|
|
|
|
// tslint:disable-next-line: no-submodule-imports
|
|
|
|
|
import { default as getGuid } from 'uuid/v4';
|
|
|
|
|
import {
|
|
|
|
|
getMessageById,
|
|
|
|
|
getNextAttachmentDownloadJobs,
|
|
|
|
|
removeAttachmentDownloadJob,
|
|
|
|
@ -10,33 +9,30 @@ const {
|
|
|
|
|
saveAttachmentDownloadJob,
|
|
|
|
|
saveMessage,
|
|
|
|
|
setAttachmentDownloadJobPending,
|
|
|
|
|
} = require('../../ts/data/data');
|
|
|
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
|
start,
|
|
|
|
|
stop,
|
|
|
|
|
addJob,
|
|
|
|
|
};
|
|
|
|
|
} from '../../../ts/data/data';
|
|
|
|
|
import { MessageModel } from '../../models/message';
|
|
|
|
|
import { downloadAttachment } from '../../receiver/attachments';
|
|
|
|
|
import { MessageController } from '../messages';
|
|
|
|
|
|
|
|
|
|
const MAX_ATTACHMENT_JOB_PARALLELISM = 3;
|
|
|
|
|
|
|
|
|
|
const SECOND = 1000;
|
|
|
|
|
const MINUTE = 60 * SECOND;
|
|
|
|
|
const HOUR = 60 * MINUTE;
|
|
|
|
|
const MINUTE = SECOND * 60;
|
|
|
|
|
const HOUR = MINUTE * 60;
|
|
|
|
|
const TICK_INTERVAL = MINUTE;
|
|
|
|
|
|
|
|
|
|
const RETRY_BACKOFF = {
|
|
|
|
|
1: 30 * SECOND,
|
|
|
|
|
2: 30 * MINUTE,
|
|
|
|
|
3: 6 * HOUR,
|
|
|
|
|
1: SECOND * 30,
|
|
|
|
|
2: MINUTE * 30,
|
|
|
|
|
3: HOUR * 6,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let enabled = false;
|
|
|
|
|
let timeout;
|
|
|
|
|
let logger;
|
|
|
|
|
const _activeAttachmentDownloadJobs = {};
|
|
|
|
|
let timeout: any;
|
|
|
|
|
let logger: any;
|
|
|
|
|
const _activeAttachmentDownloadJobs: any = {};
|
|
|
|
|
|
|
|
|
|
async function start(options = {}) {
|
|
|
|
|
export async function start(options: any = {}) {
|
|
|
|
|
({ logger } = options);
|
|
|
|
|
if (!logger) {
|
|
|
|
|
throw new Error('attachment_downloads/start: logger must be provided!');
|
|
|
|
@ -45,10 +41,10 @@ async function start(options = {}) {
|
|
|
|
|
enabled = true;
|
|
|
|
|
await resetAttachmentDownloadPending();
|
|
|
|
|
|
|
|
|
|
_tick();
|
|
|
|
|
void _tick();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function stop() {
|
|
|
|
|
export function stop() {
|
|
|
|
|
enabled = false;
|
|
|
|
|
if (timeout) {
|
|
|
|
|
clearTimeout(timeout);
|
|
|
|
@ -56,7 +52,7 @@ async function stop() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function addJob(attachment, job = {}) {
|
|
|
|
|
export async function addJob(attachment: any, job: any = {}) {
|
|
|
|
|
if (!attachment) {
|
|
|
|
|
throw new Error('attachments_download/addJob: attachment is required');
|
|
|
|
|
}
|
|
|
|
@ -85,7 +81,7 @@ async function addJob(attachment, job = {}) {
|
|
|
|
|
|
|
|
|
|
await saveAttachmentDownloadJob(toSave);
|
|
|
|
|
|
|
|
|
|
_maybeStartJob();
|
|
|
|
|
void _maybeStartJob();
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
...attachment,
|
|
|
|
@ -94,8 +90,9 @@ async function addJob(attachment, job = {}) {
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// tslint:disable: function-name
|
|
|
|
|
async function _tick() {
|
|
|
|
|
_maybeStartJob();
|
|
|
|
|
await _maybeStartJob();
|
|
|
|
|
timeout = setTimeout(_tick, TICK_INTERVAL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -124,13 +121,14 @@ async function _maybeStartJob() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const jobs = nextJobs.slice(0, Math.min(needed, nextJobs.length));
|
|
|
|
|
// tslint:disable: one-variable-per-declaration
|
|
|
|
|
for (let i = 0, max = jobs.length; i < max; i += 1) {
|
|
|
|
|
const job = jobs[i];
|
|
|
|
|
_activeAttachmentDownloadJobs[job.id] = _runJob(job);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function _runJob(job) {
|
|
|
|
|
async function _runJob(job: any) {
|
|
|
|
|
const { id, messageId, attachment, type, index, attempts, isOpenGroupV2 } = job || {};
|
|
|
|
|
let message;
|
|
|
|
|
|
|
|
|
@ -145,7 +143,7 @@ async function _runJob(job) {
|
|
|
|
|
await _finishJob(null, id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
message = getMessageController().register(found.id, found);
|
|
|
|
|
message = MessageController.getInstance().register(found.id, found);
|
|
|
|
|
|
|
|
|
|
const pending = true;
|
|
|
|
|
await setAttachmentDownloadJobPending(id, pending);
|
|
|
|
@ -153,7 +151,7 @@ async function _runJob(job) {
|
|
|
|
|
let downloaded;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
downloaded = await NewReceiver.downloadAttachment(attachment);
|
|
|
|
|
downloaded = await downloadAttachment(attachment);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
// Attachments on the server expire after 60 days, then start returning 404
|
|
|
|
|
if (error && error.code === 404) {
|
|
|
|
@ -171,28 +169,29 @@ async function _runJob(job) {
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const upgradedAttachment = await Signal.Migrations.processNewAttachment(downloaded);
|
|
|
|
|
const upgradedAttachment = await window.Signal.Migrations.processNewAttachment(downloaded);
|
|
|
|
|
|
|
|
|
|
await _addAttachmentToMessage(message, upgradedAttachment, { type, index });
|
|
|
|
|
|
|
|
|
|
await _finishJob(message, id);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
const currentAttempt = (attempts || 0) + 1;
|
|
|
|
|
// tslint:disable: restrict-plus-operands
|
|
|
|
|
const currentAttempt: 1 | 2 | 3 = (attempts || 0) + 1;
|
|
|
|
|
|
|
|
|
|
if (currentAttempt >= 3) {
|
|
|
|
|
logger.error(
|
|
|
|
|
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message.idForLogging()} as permament error:`,
|
|
|
|
|
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message?.idForLogging()} as permament error:`,
|
|
|
|
|
error && error.stack ? error.stack : error
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
await _finishJob(message, id);
|
|
|
|
|
await _finishJob(message || null, id);
|
|
|
|
|
await _addAttachmentToMessage(message, _markAttachmentAsError(attachment), { type, index });
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.error(
|
|
|
|
|
`_runJob: Failed to download attachment type ${type} for message ${message.idForLogging()}, attempt ${currentAttempt}:`,
|
|
|
|
|
`_runJob: Failed to download attachment type ${type} for message ${message?.idForLogging()}, attempt ${currentAttempt}:`,
|
|
|
|
|
error && error.stack ? error.stack : error
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
@ -204,37 +203,40 @@ async function _runJob(job) {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
await saveAttachmentDownloadJob(failedJob);
|
|
|
|
|
// tslint:disable-next-line: no-dynamic-delete
|
|
|
|
|
delete _activeAttachmentDownloadJobs[id];
|
|
|
|
|
_maybeStartJob();
|
|
|
|
|
void _maybeStartJob();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function _finishJob(message, id) {
|
|
|
|
|
async function _finishJob(message: MessageModel | null, id: string) {
|
|
|
|
|
if (message) {
|
|
|
|
|
await saveMessage(message.attributes);
|
|
|
|
|
const conversation = message.getConversation();
|
|
|
|
|
if (conversation) {
|
|
|
|
|
message.commit();
|
|
|
|
|
await message.commit();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await removeAttachmentDownloadJob(id);
|
|
|
|
|
// tslint:disable-next-line: no-dynamic-delete
|
|
|
|
|
delete _activeAttachmentDownloadJobs[id];
|
|
|
|
|
_maybeStartJob();
|
|
|
|
|
await _maybeStartJob();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function getActiveJobCount() {
|
|
|
|
|
return Object.keys(_activeAttachmentDownloadJobs).length;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function _markAttachmentAsError(attachment) {
|
|
|
|
|
function _markAttachmentAsError(attachment: any) {
|
|
|
|
|
return {
|
|
|
|
|
...omit(attachment, ['key', 'digest', 'id']),
|
|
|
|
|
error: true,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function _addAttachmentToMessage(message, attachment, { type, index }) {
|
|
|
|
|
// tslint:disable-next-line: cyclomatic-complexity
|
|
|
|
|
async function _addAttachmentToMessage(message: any, attachment: any, { type, index }: any) {
|
|
|
|
|
if (!message) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -310,7 +312,7 @@ async function _addAttachmentToMessage(message, attachment, { type, index }) {
|
|
|
|
|
|
|
|
|
|
const existingAvatar = group.avatar;
|
|
|
|
|
if (existingAvatar && existingAvatar.path) {
|
|
|
|
|
await Signal.Migrations.deleteAttachmentData(existingAvatar.path);
|
|
|
|
|
await window.Signal.Migrations.deleteAttachmentData(existingAvatar.path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_replaceAttachment(group, 'avatar', attachment, logPrefix);
|
|
|
|
@ -322,7 +324,7 @@ async function _addAttachmentToMessage(message, attachment, { type, index }) {
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function _replaceAttachment(object, key, newAttachment, logPrefix) {
|
|
|
|
|
function _replaceAttachment(object: any, key: any, newAttachment: any, logPrefix: any) {
|
|
|
|
|
const oldAttachment = object[key];
|
|
|
|
|
if (oldAttachment && oldAttachment.path) {
|
|
|
|
|
logger.warn(
|