You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
365 lines
12 KiB
TypeScript
365 lines
12 KiB
TypeScript
import { cloneDeep, compact, isArray, isString } from 'lodash';
|
|
import { Data } from '../../../data/data';
|
|
import { timeout } from '../Promise';
|
|
import { persistedJobFromData } from './JobDeserialization';
|
|
import { JobRunnerType } from './jobs/JobRunnerType';
|
|
import {
|
|
AvatarDownloadPersistedData,
|
|
ConfigurationSyncPersistedData,
|
|
PersistedJob,
|
|
RunJobResult,
|
|
TypeOfPersistedData,
|
|
} from './PersistedJob';
|
|
import { Storage } from '../../../util/storage';
|
|
|
|
/**
|
|
* 'job_in_progress' if there is already a job in progress
|
|
* 'job_deferred' if there is a next job, but too far in the future to start it now
|
|
* 'job_started' a job was pending to be started and we could start it, so we started it
|
|
* 'no_job' if there are no jobs to be run at all
|
|
*/
|
|
export type StartProcessingResult = 'job_in_progress' | 'job_deferred' | 'job_started' | 'no_job';
|
|
|
|
export type AddJobResult = 'job_deferred' | 'job_started';
|
|
|
|
function jobToLogId<T extends TypeOfPersistedData>(jobRunner: JobRunnerType, job: PersistedJob<T>) {
|
|
return `id: "${job.persistedData.identifier}" (type: "${jobRunner}")`;
|
|
}
|
|
|
|
/**
|
|
* This class is used to plan jobs and make sure they are retried until the success.
|
|
* By having a specific type, we can find the logic to be run by that type of job.
|
|
*
|
|
* There are different type of jobs which can be scheduled, but we currently only use the SyncConfigurationJob.
|
|
*
|
|
* SyncConfigurationJob is a job which can only be planned once until it is a success. So in the queue on jobs, there can only be one SyncConfigurationJob at all times.
|
|
*
|
|
*/
|
|
export class PersistedJobRunner<T extends TypeOfPersistedData> {
|
|
private isInit = false;
|
|
private jobsScheduled: Array<PersistedJob<T>> = [];
|
|
private isStarted = false;
|
|
private readonly jobRunnerType: JobRunnerType;
|
|
private nextJobStartTimer: NodeJS.Timeout | null = null;
|
|
private currentJob: PersistedJob<T> | null = null;
|
|
|
|
constructor(jobRunnerType: JobRunnerType, _jobEventsListener: null) {
|
|
this.jobRunnerType = jobRunnerType;
|
|
window?.log?.warn(`new runner of type ${jobRunnerType} built`);
|
|
}
|
|
|
|
public async loadJobsFromDb() {
|
|
if (this.isInit) {
|
|
return;
|
|
}
|
|
let jobsArray: Array<T> = [];
|
|
const found = await Data.getItemById(this.getJobRunnerItemId());
|
|
if (found && found.value && isString(found.value)) {
|
|
const asStr = found.value;
|
|
|
|
try {
|
|
const parsed = JSON.parse(asStr);
|
|
if (!isArray(parsed)) {
|
|
jobsArray = [];
|
|
} else {
|
|
jobsArray = parsed;
|
|
}
|
|
} catch (e) {
|
|
window.log.warn(`Failed to parse jobs of type ${this.jobRunnerType} from DB`);
|
|
jobsArray = [];
|
|
}
|
|
}
|
|
const jobs: Array<PersistedJob<T>> = compact(jobsArray.map(persistedJobFromData));
|
|
this.jobsScheduled = cloneDeep(jobs);
|
|
// make sure the list is sorted
|
|
this.sortJobsList();
|
|
this.isInit = true;
|
|
}
|
|
|
|
public async addJob(
|
|
job: PersistedJob<T>
|
|
): Promise<'type_exists' | 'identifier_exists' | AddJobResult> {
|
|
this.assertIsInitialized();
|
|
|
|
if (this.jobsScheduled.find(j => j.persistedData.identifier === job.persistedData.identifier)) {
|
|
window.log.info(
|
|
`job runner (${this.jobRunnerType}) has already a job with id:"${job.persistedData.identifier}" planned so not adding another one`
|
|
);
|
|
return 'identifier_exists';
|
|
}
|
|
|
|
const serializedNonRunningJobs = this.jobsScheduled
|
|
.filter(j => j !== this.currentJob)
|
|
.map(k => k.serializeJob());
|
|
|
|
const addJobChecks = job.addJobCheck(serializedNonRunningJobs);
|
|
if (addJobChecks === 'skipAddSameJobPresent') {
|
|
// window.log.warn(`addjobCheck returned "${addJobChecks}" so not adding it`);
|
|
return 'type_exists';
|
|
}
|
|
|
|
// make sure there is no job with that same identifier already .
|
|
|
|
window.log.info(`job runner adding type :"${job.persistedData.jobType}" `);
|
|
return this.addJobUnchecked(job);
|
|
}
|
|
|
|
/**
|
|
* Only used for testing
|
|
*/
|
|
public getJobList() {
|
|
return this.getSerializedJobs();
|
|
}
|
|
|
|
public resetForTesting() {
|
|
this.jobsScheduled = [];
|
|
this.isInit = false;
|
|
|
|
if (this.nextJobStartTimer) {
|
|
clearTimeout(this.nextJobStartTimer);
|
|
this.nextJobStartTimer = null;
|
|
}
|
|
this.currentJob = null;
|
|
}
|
|
|
|
public getCurrentJobIdentifier(): string | null {
|
|
return this.currentJob?.persistedData?.identifier || null;
|
|
}
|
|
|
|
/**
|
|
* if we are running a job, this call will await until the job is done and stop the queue
|
|
*/
|
|
public async stopAndWaitCurrentJob(): Promise<'no_await' | 'await'> {
|
|
if (!this.isStarted || !this.currentJob) {
|
|
return 'no_await';
|
|
}
|
|
this.isStarted = false;
|
|
await this.currentJob.waitForCurrentTry();
|
|
return 'await';
|
|
}
|
|
|
|
/**
|
|
* if we are running a job, this call will await until the job is done.
|
|
* If another job must be run right away this one, we will also add the upcoming one as the currentJob.
|
|
*/
|
|
public async waitCurrentJob(): Promise<'no_await' | 'await'> {
|
|
if (!this.isStarted || !this.currentJob) {
|
|
return 'no_await';
|
|
}
|
|
await this.currentJob.waitForCurrentTry();
|
|
return 'await';
|
|
}
|
|
|
|
public startProcessing(): StartProcessingResult {
|
|
if (this.isStarted) {
|
|
return this.planNextJob();
|
|
}
|
|
this.isStarted = true;
|
|
return this.planNextJob();
|
|
}
|
|
|
|
private sortJobsList() {
|
|
this.jobsScheduled.sort(
|
|
(a, b) => a.persistedData.nextAttemptTimestamp - b.persistedData.nextAttemptTimestamp
|
|
);
|
|
}
|
|
|
|
private async writeJobsToDB() {
|
|
const serialized = this.getSerializedJobs();
|
|
window.log.debug(`writing to db for "${this.jobRunnerType}": `, serialized);
|
|
await Storage.put(this.getJobRunnerItemId(), JSON.stringify(serialized));
|
|
}
|
|
|
|
private async addJobUnchecked(job: PersistedJob<T>) {
|
|
this.jobsScheduled.push(cloneDeep(job));
|
|
this.sortJobsList();
|
|
await this.writeJobsToDB();
|
|
// a new job was added. trigger it if we can/have to start it
|
|
const result = this.planNextJob();
|
|
|
|
if (result === 'no_job') {
|
|
throw new Error('We just pushed a job, there cannot be no job');
|
|
}
|
|
if (result === 'job_in_progress') {
|
|
return 'job_deferred';
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private getSerializedJobs() {
|
|
return this.jobsScheduled.map(m => m.serializeJob());
|
|
}
|
|
|
|
private getJobRunnerItemId() {
|
|
return `jobRunner-${this.jobRunnerType}`;
|
|
}
|
|
|
|
/**
|
|
* Returns 'job_in_progress' if there is already a job running
|
|
* Returns 'none' if there are no jobs to be started at all (or the runner is not running)
|
|
* Returns 'started' if there the next jobs was just started
|
|
* Returns 'job_deferred' if there is a next job but it is in the future and so wasn't started yet, but a timer is set.
|
|
*/
|
|
private planNextJob(): StartProcessingResult {
|
|
if (!this.isStarted) {
|
|
if (this.jobsScheduled.length) {
|
|
return 'job_deferred';
|
|
} else {
|
|
return 'no_job';
|
|
}
|
|
}
|
|
|
|
if (this.currentJob) {
|
|
return 'job_in_progress';
|
|
}
|
|
const nextJob = this.jobsScheduled?.[0];
|
|
|
|
if (!nextJob) {
|
|
return 'no_job';
|
|
}
|
|
|
|
if (nextJob.persistedData.nextAttemptTimestamp <= Date.now()) {
|
|
if (this.nextJobStartTimer) {
|
|
global.clearTimeout(this.nextJobStartTimer);
|
|
this.nextJobStartTimer = null;
|
|
}
|
|
// nextJob should be started right away
|
|
void this.runNextJob();
|
|
return 'job_started';
|
|
}
|
|
|
|
// next job is not to be started right away, just plan our runner to be awakened when the time is right.
|
|
if (this.nextJobStartTimer) {
|
|
// remove the timer as there might be a more urgent job to be run before the one we have set here.
|
|
global.clearTimeout(this.nextJobStartTimer);
|
|
}
|
|
// plan a timer to wakeup when that timer is reached.
|
|
this.nextJobStartTimer = global.setTimeout(() => {
|
|
if (this.nextJobStartTimer) {
|
|
global.clearTimeout(this.nextJobStartTimer);
|
|
this.nextJobStartTimer = null;
|
|
}
|
|
void this.runNextJob();
|
|
}, Math.max(nextJob.persistedData.nextAttemptTimestamp - Date.now(), 1));
|
|
|
|
return 'job_deferred';
|
|
}
|
|
|
|
private deleteJobsByIdentifier(identifiers: Array<string>) {
|
|
identifiers.forEach(identifier => {
|
|
const jobIndex = this.jobsScheduled.findIndex(f => f.persistedData.identifier === identifier);
|
|
window.log.debug(
|
|
`removing job ${jobToLogId(
|
|
this.jobRunnerType,
|
|
this.jobsScheduled[jobIndex]
|
|
)} at ${jobIndex}`
|
|
);
|
|
|
|
if (jobIndex >= 0) {
|
|
this.jobsScheduled.splice(jobIndex, 1);
|
|
}
|
|
});
|
|
}
|
|
|
|
private async runNextJob() {
|
|
this.assertIsInitialized();
|
|
if (this.currentJob || !this.isStarted || !this.jobsScheduled.length) {
|
|
return;
|
|
}
|
|
|
|
const nextJob = this.jobsScheduled[0];
|
|
|
|
// if the time is 101, and that task is to be run at t=101, we need to start it right away.
|
|
if (nextJob.persistedData.nextAttemptTimestamp > Date.now()) {
|
|
window.log.warn(
|
|
'next job is not due to be run just yet. Going idle.',
|
|
nextJob.persistedData.nextAttemptTimestamp - Date.now()
|
|
);
|
|
this.planNextJob();
|
|
return;
|
|
}
|
|
let success: RunJobResult | null = null;
|
|
|
|
try {
|
|
if (this.currentJob) {
|
|
return;
|
|
}
|
|
this.currentJob = nextJob;
|
|
|
|
success = await timeout(this.currentJob.runJob(), this.currentJob.getJobTimeoutMs());
|
|
|
|
if (success !== RunJobResult.Success) {
|
|
throw new Error('return result was not "Success"');
|
|
}
|
|
|
|
// here the job did not throw and didn't return false. Consider it OK then and remove it from the list of jobs to run.
|
|
this.deleteJobsByIdentifier([this.currentJob.persistedData.identifier]);
|
|
await this.writeJobsToDB();
|
|
} catch (e) {
|
|
window.log.info(`${jobToLogId(this.jobRunnerType, nextJob)} failed with "${e.message}"`);
|
|
if (
|
|
success === RunJobResult.PermanentFailure ||
|
|
nextJob.persistedData.currentRetry >= nextJob.persistedData.maxAttempts - 1
|
|
) {
|
|
if (success === RunJobResult.PermanentFailure) {
|
|
window.log.info(
|
|
`${jobToLogId(this.jobRunnerType, nextJob)}:${
|
|
nextJob.persistedData.currentRetry
|
|
} permament failure for job`
|
|
);
|
|
} else {
|
|
window.log.info(
|
|
`Too many failures for ${jobToLogId(this.jobRunnerType, nextJob)}: ${
|
|
nextJob.persistedData.currentRetry
|
|
} out of ${nextJob.persistedData.maxAttempts}`
|
|
);
|
|
}
|
|
// we cannot restart this job anymore. Remove the entry completely
|
|
this.deleteJobsByIdentifier([nextJob.persistedData.identifier]);
|
|
} else {
|
|
window.log.info(
|
|
`Rescheduling ${jobToLogId(this.jobRunnerType, nextJob)} in ${
|
|
nextJob.persistedData.delayBetweenRetries
|
|
}...`
|
|
);
|
|
nextJob.persistedData.currentRetry = nextJob.persistedData.currentRetry + 1;
|
|
// that job can be restarted. Plan a retry later with the already defined retry
|
|
nextJob.persistedData.nextAttemptTimestamp =
|
|
Date.now() + nextJob.persistedData.delayBetweenRetries;
|
|
}
|
|
// in any case, either we removed a job or changed one of the timestamp.
|
|
// so sort the list again, and persist it
|
|
this.sortJobsList();
|
|
await this.writeJobsToDB();
|
|
} finally {
|
|
this.currentJob = null;
|
|
|
|
// start the next job if there is any to be started now, or just plan the wakeup of our runner for the right time.
|
|
this.planNextJob();
|
|
}
|
|
}
|
|
|
|
private assertIsInitialized() {
|
|
if (!this.isInit) {
|
|
throw new Error(
|
|
'persisted job runner was not initlized yet. Call loadJobsFromDb with what you have persisted first'
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
const configurationSyncRunner = new PersistedJobRunner<ConfigurationSyncPersistedData>(
|
|
'ConfigurationSyncJob',
|
|
null
|
|
);
|
|
|
|
const avatarDownloadRunner = new PersistedJobRunner<AvatarDownloadPersistedData>(
|
|
'AvatarDownloadJob',
|
|
null
|
|
);
|
|
|
|
export const runners = {
|
|
configurationSyncRunner,
|
|
avatarDownloadRunner,
|
|
};
|