You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/ts/session/utils/job_runners/PersistedJob.ts

199 lines
6.0 KiB
TypeScript

import { cloneDeep, flatten, isEmpty, isNil, uniq } from 'lodash';
export type PersistedJobType =
| 'ConfigurationSyncJobType'
| 'AvatarDownloadJobType'
| 'FetchMsgExpirySwarmJobType'
| 'UpdateMsgExpirySwarmJobType'
| 'FakeSleepForJobType'
| 'FakeSleepForJobMultiType';
interface PersistedJobData {
jobType: PersistedJobType;
identifier: string;
nextAttemptTimestamp: number;
delayBetweenRetries: number;
maxAttempts: number; // to try to run this job twice, set this to 2.
currentRetry: number;
}
export interface FakeSleepJobData extends PersistedJobData {
jobType: 'FakeSleepForJobType';
returnResult: boolean;
sleepDuration: number;
}
export interface FakeSleepForMultiJobData extends PersistedJobData {
jobType: 'FakeSleepForJobMultiType';
returnResult: boolean;
sleepDuration: number;
}
export interface AvatarDownloadPersistedData extends PersistedJobData {
jobType: 'AvatarDownloadJobType';
conversationId: string;
}
interface PersitedDataWithMsgIds extends PersistedJobData {
msgIds: Array<string>;
}
export interface ConfigurationSyncPersistedData extends PersistedJobData {
jobType: 'ConfigurationSyncJobType';
}
export interface FetchMsgExpirySwarmPersistedData extends PersitedDataWithMsgIds {
jobType: 'FetchMsgExpirySwarmJobType';
}
export interface UpdateMsgExpirySwarmPersistedData extends PersitedDataWithMsgIds {
jobType: 'UpdateMsgExpirySwarmJobType';
}
export type TypeOfPersistedData =
| ConfigurationSyncPersistedData
| AvatarDownloadPersistedData
| FetchMsgExpirySwarmPersistedData
| UpdateMsgExpirySwarmPersistedData
| FakeSleepJobData
| FakeSleepForMultiJobData;
export type AddJobCheckReturn = 'skipAddSameJobPresent' | null;
export enum RunJobResult {
Success = 1,
RetryJobIfPossible = 2,
PermanentFailure = 3,
}
function isDataWithMsgIds(data: PersistedJobData): data is PersitedDataWithMsgIds {
return !isNil((data as PersitedDataWithMsgIds)?.msgIds);
}
/**
* This class can be used to save and run jobs from the database.
* Every child class must take the minimum amount of arguments, and make sure they are unlikely to change.
* For instance, don't have the attachments to downloads as arguments, just the messageId and the index.
* Don't have the new profileImage url for an avatar download job, just the conversationId.
*
* It is the role of the job to fetch the latest data, and decide if a process is needed or not
* If the job throws or returns false, it will be retried by the corresponding job runner.
*/
export abstract class PersistedJob<T extends PersistedJobData> {
public persistedData: T;
private runningPromise: Promise<RunJobResult> | null = null;
public constructor(data: T) {
if (data.maxAttempts < 1) {
throw new Error('maxAttempts must be >= 1');
}
if (isEmpty(data.identifier)) {
throw new Error('identifier must be not empty');
}
if (isEmpty(data.jobType)) {
throw new Error('jobType must be not empty');
}
if (data.delayBetweenRetries <= 0) {
throw new Error('delayBetweenRetries must be at least > 0');
}
if (data.nextAttemptTimestamp <= 0) {
throw new Error('nextAttemptTimestamp must be set and > 0');
}
this.persistedData = data;
}
public async runJob() {
if (!this.runningPromise) {
// eslint-disable-next-line more/no-then
this.runningPromise = this.run()
.then(jobResult => {
this.runningPromise = null;
return jobResult;
})
.catch(e => {
window.log.warn(
'runJob() threw. this cannot happen, but rehtrowing as this should be handled in each jobs run()',
e
);
throw e;
});
}
return this.runningPromise;
}
/**
* If that job is running, wait for its completion (success or failure) before returning.
* Can be used to wait for the task to be done before exiting the JobRunner
*/
public async waitForCurrentTry() {
try {
return this.runningPromise || Promise.resolve(true);
} catch (e) {
window.log.warn('waitForCurrentTry got an error: ', e.message);
return Promise.resolve(true);
}
}
/**
* This one must be reimplemented in the child class, and must first call `super.serializeBase()`
*/
public abstract serializeJob(): T;
public abstract nonRunningJobsToRemove(jobs: Array<T>): Array<T>;
public abstract addJobCheck(jobs: Array<T>): AddJobCheckReturn;
public addJobCheckSameTypePresent(jobs: Array<T>): 'skipAddSameJobPresent' | null {
return jobs.some(j => j.jobType === this.persistedData.jobType)
? 'skipAddSameJobPresent'
: null;
}
public addJobCheckEveryMsgIdsAlreadyPresent(jobs: Array<T>): 'skipAddSameJobPresent' | null {
if (!jobs.length) {
return null;
}
if (!isDataWithMsgIds(this.persistedData)) {
throw new Error(`${this.persistedData.jobType} does not have a msgIds field`);
}
const allIdsAlreadyScheduled = uniq(
flatten(
jobs.map(m => {
if (!isDataWithMsgIds(m)) {
throw new Error(`${this.persistedData.jobType} does not have a msgIds field`);
}
return m.msgIds;
})
)
);
// if all ids we are trying to add are already tracked as other jobs in the job runner,
// there is no need to add this job at all.
if (this.persistedData.msgIds.every(m => allIdsAlreadyScheduled.includes(m))) {
return 'skipAddSameJobPresent';
}
return null;
}
public abstract getJobTimeoutMs(): number;
/**
* This function will be called by the runner do run the logic of that job.
* It **must** return true if that job is a success and doesn't need to be retried.
* If it returns false, or throws, it will be retried (if not reach the retries limit yet).
*
* Note: you should check the this.isAborted() to know if you should cancel the current processing of your logic.
*/
protected abstract run(): Promise<RunJobResult>;
protected serializeBase(): T {
return cloneDeep(this.persistedData);
}
}