import { isNumber, omit } from 'lodash'; // tslint:disable-next-line: no-submodule-imports import { default as getGuid } from 'uuid/v4'; import { getMessageById, getNextAttachmentDownloadJobs, removeAttachmentDownloadJob, resetAttachmentDownloadPending, saveAttachmentDownloadJob, saveMessage, setAttachmentDownloadJobPending, } from '../../../ts/data/data'; import { MessageModel } from '../../models/message'; import { downloadAttachment, downloadAttachmentOpenGroupV2 } from '../../receiver/attachments'; import { getMessageController } from '../messages'; const MAX_ATTACHMENT_JOB_PARALLELISM = 3; const SECOND = 1000; const MINUTE = SECOND * 60; const HOUR = MINUTE * 60; const TICK_INTERVAL = MINUTE; const RETRY_BACKOFF = { 1: SECOND * 30, 2: MINUTE * 30, 3: HOUR * 6, }; let enabled = false; let timeout: any; let logger: any; const _activeAttachmentDownloadJobs: any = {}; export async function start(options: any = {}) { ({ logger } = options); if (!logger) { throw new Error('attachment_downloads/start: logger must be provided!'); } enabled = true; await resetAttachmentDownloadPending(); void _tick(); } export function stop() { enabled = false; if (timeout) { clearTimeout(timeout); timeout = null; } } export async function addJob(attachment: any, job: any = {}) { if (!attachment) { throw new Error('attachments_download/addJob: attachment is required'); } const { messageId, type, index } = job; if (!messageId) { throw new Error('attachments_download/addJob: job.messageId is required'); } if (!type) { throw new Error('attachments_download/addJob: job.type is required'); } if (!isNumber(index)) { throw new Error('attachments_download/addJob: index must be a number'); } const id = getGuid(); const timestamp = Date.now(); const toSave = { ...job, id, attachment, timestamp, pending: 0, attempts: 0, }; await saveAttachmentDownloadJob(toSave); void _maybeStartJob(); return { ...attachment, pending: true, downloadJobId: id, }; } async function _tick() { await _maybeStartJob(); timeout = setTimeout(_tick, TICK_INTERVAL); } async function _maybeStartJob() { if (!enabled) { return; } const jobCount = getActiveJobCount(); const limit = MAX_ATTACHMENT_JOB_PARALLELISM - jobCount; if (limit <= 0) { return; } const nextJobs = await getNextAttachmentDownloadJobs(limit); if (nextJobs.length <= 0) { return; } // To prevent the race condition caused by two parallel database calls, eached kicked // off because the jobCount wasn't at the max. const secondJobCount = getActiveJobCount(); const needed = MAX_ATTACHMENT_JOB_PARALLELISM - secondJobCount; if (needed <= 0) { return; } const jobs = nextJobs.slice(0, Math.min(needed, nextJobs.length)); // tslint:disable: one-variable-per-declaration for (let i = 0, max = jobs.length; i < max; i += 1) { const job = jobs[i]; _activeAttachmentDownloadJobs[job.id] = _runJob(job); } } // tslint:disable-next-line: cyclomatic-complexity async function _runJob(job: any) { const { id, messageId, attachment, type, index, attempts, isOpenGroupV2, openGroupV2Details } = job || {}; let message; try { if (!job || !attachment || !messageId) { throw new Error(`_runJob: Key information required for job was missing. Job id: ${id}`); } const found = await getMessageById(messageId); if (!found) { logger.error('_runJob: Source message not found, deleting job'); await _finishJob(null, id); return; } const isTrusted = found.isTrustedForAttachmentDownload(); if (!isTrusted) { logger.info('_runJob: sender conversation not trusted yet, deleting job'); await _finishJob(null, id); return; } if (isOpenGroupV2 && (!openGroupV2Details?.serverUrl || !openGroupV2Details.roomId)) { window?.log?.warn( 'isOpenGroupV2 download attachment, but no valid openGroupV2Details given:', openGroupV2Details ); await _finishJob(null, id); return; } message = getMessageController().register(found.id, found); const pending = true; await setAttachmentDownloadJobPending(id, pending); let downloaded; try { if (isOpenGroupV2) { downloaded = await downloadAttachmentOpenGroupV2(attachment, openGroupV2Details); } else { downloaded = await downloadAttachment(attachment); } } catch (error) { // Attachments on the server expire after 60 days, then start returning 404 if (error && error.code === 404) { logger.warn( `_runJob: Got 404 from server, marking attachment ${ attachment.id } from message ${message.idForLogging()} as permanent error` ); await _finishJob(message, id); await _addAttachmentToMessage(message, _markAttachmentAsError(attachment), { type, index }); return; } throw error; } const upgradedAttachment = await window.Signal.Migrations.processNewAttachment(downloaded); await _addAttachmentToMessage(message, upgradedAttachment, { type, index }); await _finishJob(message, id); } catch (error) { // tslint:disable: restrict-plus-operands const currentAttempt: 1 | 2 | 3 = (attempts || 0) + 1; if (currentAttempt >= 3) { logger.error( `_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message?.idForLogging()} as permament error:`, error && error.stack ? error.stack : error ); await _finishJob(message || null, id); await _addAttachmentToMessage(message, _markAttachmentAsError(attachment), { type, index }); return; } logger.error( `_runJob: Failed to download attachment type ${type} for message ${message?.idForLogging()}, attempt ${currentAttempt}:`, error && error.stack ? error.stack : error ); const failedJob = { ...job, pending: 0, attempts: currentAttempt, timestamp: Date.now() + RETRY_BACKOFF[currentAttempt], }; await saveAttachmentDownloadJob(failedJob); // tslint:disable-next-line: no-dynamic-delete delete _activeAttachmentDownloadJobs[id]; void _maybeStartJob(); } } async function _finishJob(message: MessageModel | null, id: string) { if (message) { await saveMessage(message.attributes); const conversation = message.getConversation(); if (conversation) { await message.commit(); } } await removeAttachmentDownloadJob(id); // tslint:disable-next-line: no-dynamic-delete delete _activeAttachmentDownloadJobs[id]; await _maybeStartJob(); } function getActiveJobCount() { return Object.keys(_activeAttachmentDownloadJobs).length; } function _markAttachmentAsError(attachment: any) { return { ...omit(attachment, ['key', 'digest', 'id']), error: true, }; } // tslint:disable-next-line: cyclomatic-complexity async function _addAttachmentToMessage(message: any, attachment: any, { type, index }: any) { if (!message) { return; } const logPrefix = `${message.idForLogging()} (type: ${type}, index: ${index})`; if (type === 'attachment') { const attachments = message.get('attachments'); if (!attachments || attachments.length <= index) { throw new Error( `_addAttachmentToMessage: attachments didn't exist or ${index} was too large` ); } _replaceAttachment(attachments, index, attachment, logPrefix); return; } if (type === 'preview') { const preview = message.get('preview'); if (!preview || preview.length <= index) { throw new Error(`_addAttachmentToMessage: preview didn't exist or ${index} was too large`); } const item = preview[index]; if (!item) { throw new Error(`_addAttachmentToMessage: preview ${index} was falsey`); } _replaceAttachment(item, 'image', attachment, logPrefix); return; } if (type === 'contact') { const contact = message.get('contact'); if (!contact || contact.length <= index) { throw new Error(`_addAttachmentToMessage: contact didn't exist or ${index} was too large`); } const item = contact[index]; if (item && item.avatar && item.avatar.avatar) { _replaceAttachment(item.avatar, 'avatar', attachment, logPrefix); } else { logger.warn( `_addAttachmentToMessage: Couldn't update contact with avatar attachment for message ${message.idForLogging()}` ); } return; } if (type === 'quote') { const quote = message.get('quote'); if (!quote) { throw new Error("_addAttachmentToMessage: quote didn't exist"); } const { attachments } = quote; if (!attachments || attachments.length <= index) { throw new Error( `_addAttachmentToMessage: quote attachments didn't exist or ${index} was too large` ); } const item = attachments[index]; if (!item) { throw new Error(`_addAttachmentToMessage: attachment ${index} was falsey`); } _replaceAttachment(item, 'thumbnail', attachment, logPrefix); return; } if (type === 'group-avatar') { const group = message.get('group'); if (!group) { throw new Error("_addAttachmentToMessage: group didn't exist"); } const existingAvatar = group.avatar; if (existingAvatar && existingAvatar.path) { await window.Signal.Migrations.deleteAttachmentData(existingAvatar.path); } _replaceAttachment(group, 'avatar', attachment, logPrefix); return; } throw new Error( `_addAttachmentToMessage: Unknown job type ${type} for message ${message.idForLogging()}` ); } function _replaceAttachment(object: any, key: any, newAttachment: any, logPrefix: any) { const oldAttachment = object[key]; if (oldAttachment && oldAttachment.path) { logger.warn( `_replaceAttachment: ${logPrefix} - old attachment already had path, not replacing` ); } // eslint-disable-next-line no-param-reassign object[key] = newAttachment; }