From b3995c117b05716a919158fa239e72db5bd5da72 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Fri, 20 Jan 2023 14:36:27 +1100 Subject: [PATCH] feat: add a PersistedJobRunner which can handle SyncConfJobs --- .../utils/job_runners/JobDeserialization.ts | 22 ++ ts/session/utils/job_runners/JobRunner.ts | 268 ++++++++++++++++++ ts/session/utils/job_runners/PersistedJob.ts | 107 +++++++ .../job_runners/jobs/ConfigurationSyncJob.ts | 45 +++ .../utils/job_runners/jobs/JobRunnerType.ts | 1 + .../unit/utils/job_runner/JobRunner_test.ts | 144 ++++++++++ 6 files changed, 587 insertions(+) create mode 100644 ts/session/utils/job_runners/JobDeserialization.ts create mode 100644 ts/session/utils/job_runners/JobRunner.ts create mode 100644 ts/session/utils/job_runners/PersistedJob.ts create mode 100644 ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts create mode 100644 ts/session/utils/job_runners/jobs/JobRunnerType.ts create mode 100644 ts/test/session/unit/utils/job_runner/JobRunner_test.ts diff --git a/ts/session/utils/job_runners/JobDeserialization.ts b/ts/session/utils/job_runners/JobDeserialization.ts new file mode 100644 index 000000000..07fc2141d --- /dev/null +++ b/ts/session/utils/job_runners/JobDeserialization.ts @@ -0,0 +1,22 @@ +import { isEmpty, isString } from 'lodash'; +import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob'; +import { Persistedjob, PersistedJobType, SerializedPersistedJob } from './PersistedJob'; + +export function persistedJobFromData(data: SerializedPersistedJob): Persistedjob | null { + if (!data || isEmpty(data.jobType) || !isString(data?.jobType)) { + return null; + } + const jobType: PersistedJobType = data.jobType as PersistedJobType; + switch (jobType) { + case 'ConfigurationSyncJobType': + return new ConfigurationSyncJob({ + maxAttempts: data.maxAttempts, + identifier: data.identifier, + nextAttemptTimestamp: data.nextAttemptTimestamp, + currentRetry: data.currentRetry, + }); + default: + console.warn('unknown persisted job type:', jobType); + return null; + } +} diff --git a/ts/session/utils/job_runners/JobRunner.ts b/ts/session/utils/job_runners/JobRunner.ts new file mode 100644 index 000000000..497096c0c --- /dev/null +++ b/ts/session/utils/job_runners/JobRunner.ts @@ -0,0 +1,268 @@ +import { cloneDeep, compact, isArray, isString } from 'lodash'; +import { Data } from '../../../data/data'; +import { persistedJobFromData } from './JobDeserialization'; +import { JobRunnerType } from './jobs/JobRunnerType'; +import { Persistedjob, SerializedPersistedJob } from './PersistedJob'; + +/** + * 'not_running' when the queue is not running + * 'already_started' if startProcessing was called already once before + * 'job_in_progress' if there is already a job in progress + * 'job_deferred' if there is a next job, but too far in the future to start it now + * 'job_started' a job was pending to be started and we could start it, so we started it + * 'no_job' if there are no jobs to be run at all + */ +export type StartProcessingResult = + | 'not_running' + | 'already_started' + | 'job_in_progress' + | 'job_deferred' + | 'job_started' + | 'no_job'; + +export class PersistedJobRunner { + private isInit = false; + private jobsScheduled: Array = []; + private isStarted = false; + private readonly jobRunnerType: JobRunnerType; + private nextJobStartTimer: NodeJS.Timeout | null = null; + private currentJob: Persistedjob | null = null; + + constructor(jobRunnerType: JobRunnerType) { + this.jobRunnerType = jobRunnerType; + console.warn('new runner'); + } + + public async loadJobsFromDb() { + if (this.isInit) { + throw new Error('job runner already init'); + } + let jobsArray: Array = []; + const found = await Data.getItemById(this.getJobRunnerItemId()); + if (found && found.value && isString(found.value)) { + const asStr = found.value; + + try { + const parsed = JSON.parse(asStr); + if (!isArray(parsed)) { + jobsArray = []; + } else { + jobsArray = parsed; + } + } catch (e) { + window.log.warn(`Failed to parse jobs of type ${this.jobRunnerType} from DB`); + jobsArray = []; + } + } + const jobs: Array = compact(jobsArray.map(persistedJobFromData)); + this.jobsScheduled = cloneDeep(jobs); + // make sure the list is sorted + this.sortJobsList(); + this.isInit = true; + } + + public async addJob(job: Persistedjob) { + this.assertIsInitialized(); + + if (job.singleJobInQueue) { + // make sure there is no job with that same type already scheduled. + if (this.jobsScheduled.find(j => j.jobType === job.jobType)) { + console.info( + `job runner has already a job "${job.identifier}" planned so not adding another one` + ); + return; + } + + this.jobsScheduled.push(job); + this.sortJobsList(); + await this.writeJobsToDB(); + + if (this.isStarted) { + // a new job was added. trigger it if we can/have to start it + this.planNextJob(); + } + + return; + } + throw new Error('persisted job runner does not support non single type for now.'); + } + + /** + * Only used for testing + */ + public getJobList() { + return this.getSerializedJobs(); + } + + public resetForTesting() { + this.jobsScheduled = []; + this.isInit = false; + + if (this.nextJobStartTimer) { + clearTimeout(this.nextJobStartTimer); + this.nextJobStartTimer = null; + } + this.currentJob = null; + } + + /** + * if we are running a job, this call will await until the job is done + */ + public async stopAndWaitCurrentJob() { + if (!this.isStarted || !this.currentJob) { + return; + } + this.isStarted = false; + if (this.currentJob) { + await this.currentJob.waitForCurrentTry(); + } + } + + public startProcessing(): StartProcessingResult { + if (this.isStarted) { + return 'already_started'; + } + this.isStarted = true; + return this.planNextJob(); + } + + private sortJobsList() { + this.jobsScheduled.sort((a, b) => a.nextAttemptTimestamp - b.nextAttemptTimestamp); + } + + private async writeJobsToDB() { + const serialized = this.getSerializedJobs(); + console.warn('writing to db', serialized); + await Data.createOrUpdateItem({ + id: this.getJobRunnerItemId(), + value: JSON.stringify(serialized), + }); + } + + private getSerializedJobs() { + return this.jobsScheduled.map(m => m.serializeJob()); + } + + private getJobRunnerItemId() { + return `jobRunner-${this.jobRunnerType}`; + } + + /** + * Returns 'not_running' if that job runner is not started at all + * Returns 'in_progress' if there is already a job running + * Returns 'none' if there are no jobs to be started at all (or the runner is not running) + * Returns 'started' if there the next jobs was just started + * Returns 'deferred' if there is a next job but it is in the future and so wasn't started yet, but a timer is set. + */ + private planNextJob(): StartProcessingResult { + if (!this.isStarted) { + return 'not_running'; + } + if (this.currentJob) { + return 'job_in_progress'; + } + if (!this.jobsScheduled.length) { + return 'no_job'; + } + + const nextJob = this.jobsScheduled[0]; + + if (!nextJob) { + return 'no_job'; + } + + if (nextJob.nextAttemptTimestamp <= Date.now()) { + if (this.nextJobStartTimer) { + global.clearTimeout(this.nextJobStartTimer); + this.nextJobStartTimer = null; + } + // nextJob should be started right away + void this.runNextJob(); + return 'job_started'; + } + + // next job is not to be started right away, just plan our runner to be awakened when the time is right. + if (this.nextJobStartTimer) { + global.clearTimeout(this.nextJobStartTimer); + } + // plan a timer to wakeup when that timer is reached. + this.nextJobStartTimer = global.setTimeout(() => { + console.warn('wakeup timer'); + if (this.nextJobStartTimer) { + global.clearTimeout(this.nextJobStartTimer); + this.nextJobStartTimer = null; + } + void this.runNextJob(); + }, Math.max(nextJob.nextAttemptTimestamp - Date.now(), 1)); + + return 'job_deferred'; + } + + private deleteJobByIdentifier(identifier: string) { + const jobIndex = this.jobsScheduled.findIndex(f => f.identifier === identifier); + if (jobIndex >= 0) { + this.jobsScheduled.splice(jobIndex, 1); + } + } + + private async runNextJob() { + console.warn('runNextJob called'); + this.assertIsInitialized(); + if (this.currentJob || !this.isStarted || !this.jobsScheduled.length) { + return; + } + + const nextJob = this.jobsScheduled[0]; + + if (nextJob.nextAttemptTimestamp >= Date.now()) { + window.log.info('next job is not to be run just yet. Going idle.'); + this.planNextJob(); + return; + } + + try { + if (this.currentJob) { + return; + } + this.currentJob = nextJob; + const success = await this.currentJob.runJob(); + if (!success) { + throw new Error(`job ${nextJob.identifier} failed`); + } + // here the job did not throw and didn't return false. Consider it OK then and remove it from the list of jobs to run. + this.deleteJobByIdentifier(this.currentJob.identifier); + await this.writeJobsToDB(); + } catch (e) { + // either the job throw or didn't return 'OK' + if (nextJob.currentRetry >= nextJob.maxAttempts) { + // we cannot restart this job anymore. Remove the entry completely + this.deleteJobByIdentifier(nextJob.identifier); + } else { + // that job can be restarted. Plan a retry later with the already defined retry + nextJob.nextAttemptTimestamp = Date.now() + nextJob.delayBetweenRetries; + } + // in any case, either we removed a job or changed one of the timestamp. + // so sort the list again, and persist it + this.sortJobsList(); + await this.writeJobsToDB(); + } finally { + this.currentJob = null; + // start the next job if there is any to be started now, or just plan the wakeup of our runner for the right time. + this.planNextJob(); + } + } + + private assertIsInitialized() { + if (!this.isInit) { + throw new Error( + 'persisted job runner was not initlized yet. Call initWithData with what you have persisted first' + ); + } + } +} + +const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob'); + +export const runners = { + configurationSyncRunner, +}; diff --git a/ts/session/utils/job_runners/PersistedJob.ts b/ts/session/utils/job_runners/PersistedJob.ts new file mode 100644 index 000000000..c73616b0c --- /dev/null +++ b/ts/session/utils/job_runners/PersistedJob.ts @@ -0,0 +1,107 @@ +import { isEmpty } from 'lodash'; + +export type PersistedJobType = 'ConfigurationSyncJobType'; + +export type SerializedPersistedJob = { + // we need at least those as they are needed to do lookups of the list of jobs. + jobType: string; + identifier: string; + nextAttemptTimestamp: number; + maxAttempts: number; // to run try to run it twice, set this to 2. + currentRetry: number; // + // then we can have other details on a specific type of job case + [key: string]: any; +}; + +export abstract class Persistedjob { + public readonly identifier: string; + public readonly singleJobInQueue: boolean; + public readonly delayBetweenRetries: number; + public readonly maxAttempts: number; + public readonly jobType: PersistedJobType; + public currentRetry: number; + public nextAttemptTimestamp: number; + + private runningPromise: Promise | null = null; + + public constructor({ + maxAttempts, + delayBetweenRetries, + identifier, + singleJobInQueue, + jobType, + nextAttemptTimestamp, + }: { + identifier: string; + maxAttempts: number; + delayBetweenRetries: number; + singleJobInQueue: boolean; + jobType: PersistedJobType; + nextAttemptTimestamp: number; + currentRetry: number; + }) { + this.identifier = identifier; + this.jobType = jobType; + this.delayBetweenRetries = delayBetweenRetries; + this.maxAttempts = maxAttempts; + this.currentRetry = 0; + this.singleJobInQueue = singleJobInQueue; + this.nextAttemptTimestamp = nextAttemptTimestamp; + + if (maxAttempts < 1) { + throw new Error('maxAttempts must be >= 1'); + } + + if (isEmpty(identifier)) { + throw new Error('identifier must be not empty'); + } + + if (isEmpty(jobType)) { + throw new Error('identifier must be not empty'); + } + + if (delayBetweenRetries <= 0) { + throw new Error('delayBetweenRetries must be at least > 0'); + } + + if (nextAttemptTimestamp <= 0) { + throw new Error('nextAttemptTimestamp must be set and > 0'); + } + } + + public async runJob() { + if (!this.runningPromise) { + this.runningPromise = this.run(); + } + return this.runningPromise; + } + + /** + * If that job is running, wait for its completion (success or failure) before returning. + * Can be used to wait for the task to be done before exiting the JobRunner + */ + public async waitForCurrentTry() { + // tslint:disable-next-line: no-promise-as-boolean + return this.runningPromise || Promise.resolve(); + } + + /** + * This one must be reimplemented in the child class, and must first call `super.serializeBase()` + */ + public abstract serializeJob(): SerializedPersistedJob; + + protected abstract run(): Promise; // must return true if that job is a success and doesn't need to be retried + + protected serializeBase(): SerializedPersistedJob { + return { + // those are mandatory + jobType: this.jobType, + identifier: this.identifier, + nextAttemptTimestamp: this.nextAttemptTimestamp, + maxAttempts: this.maxAttempts, + currentRetry: this.currentRetry, + delayBetweenRetries: this.delayBetweenRetries, + singleJobInQueue: this.singleJobInQueue, + }; + } +} diff --git a/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts new file mode 100644 index 000000000..0be0d07cc --- /dev/null +++ b/ts/session/utils/job_runners/jobs/ConfigurationSyncJob.ts @@ -0,0 +1,45 @@ +import { isNumber } from 'lodash'; +import { v4 } from 'uuid'; +import { sleepFor } from '../../Promise'; +import { Persistedjob, SerializedPersistedJob } from '../PersistedJob'; + +export class ConfigurationSyncJob extends Persistedjob { + constructor({ + identifier, + nextAttemptTimestamp, + maxAttempts, + currentRetry, + }: { + identifier: string | null; + nextAttemptTimestamp: number | null; + maxAttempts: number | null; + currentRetry: number; + }) { + super({ + jobType: 'ConfigurationSyncJobType', + identifier: identifier || v4(), + delayBetweenRetries: 3000, + maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3, + nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000, + singleJobInQueue: true, + currentRetry, + }); + } + + public async run() { + // blablha do everything from the notion page, and if success, return true. + console.warn(`running job ${this.jobType} with id:"${this.identifier}" `); + + await sleepFor(5000); + console.warn( + `running job ${this.jobType} with id:"${this.identifier}" done and returning failed ` + ); + + return false; + } + + public serializeJob(): SerializedPersistedJob { + const fromParent = super.serializeBase(); + return fromParent; + } +} diff --git a/ts/session/utils/job_runners/jobs/JobRunnerType.ts b/ts/session/utils/job_runners/jobs/JobRunnerType.ts new file mode 100644 index 000000000..fdf5c9bd4 --- /dev/null +++ b/ts/session/utils/job_runners/jobs/JobRunnerType.ts @@ -0,0 +1 @@ +export type JobRunnerType = 'ConfigurationSyncJob'; diff --git a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts new file mode 100644 index 000000000..beaff7e90 --- /dev/null +++ b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts @@ -0,0 +1,144 @@ +import { expect } from 'chai'; +import _ from 'lodash'; +import Sinon from 'sinon'; +import { v4 } from 'uuid'; +import { persistedJobFromData } from '../../../../../session/utils/job_runners/JobDeserialization'; +import { PersistedJobRunner } from '../../../../../session/utils/job_runners/JobRunner'; +import { SerializedPersistedJob } from '../../../../../session/utils/job_runners/PersistedJob'; +import { stubData } from '../../../../test-utils/utils'; + +function getFakeConfigurationJobPersisted(timestamp: number): SerializedPersistedJob { + return { + jobType: 'ConfigurationSyncJobType', + identifier: v4(), + delayBetweenRetries: 3000, + maxAttempts: 3, + nextAttemptTimestamp: timestamp || Date.now() + 3000, + singleJobInQueue: true, + currentRetry: 0, + }; +} + +function getFakeConfigurationJob(timestamp: number) { + const job = persistedJobFromData(getFakeConfigurationJobPersisted(timestamp)); + if (!job) { + throw new Error('persistedJobFromData failed'); + } + return job; +} + +describe('JobRunner', () => { + let getItemById: Sinon.SinonStub; + let createOrUpdateItem: Sinon.SinonStub; + let clock: Sinon.SinonFakeTimers; + + let runner: PersistedJobRunner; + + beforeEach(() => { + getItemById = stubData('getItemById'); + createOrUpdateItem = stubData('createOrUpdateItem'); + clock = Sinon.useFakeTimers(); + runner = new PersistedJobRunner('ConfigurationSyncJob'); + }); + + afterEach(() => { + Sinon.restore(); + runner.resetForTesting(); + }); + + describe('loadJobsFromDb', () => { + it('throw if already loaded', async () => { + await runner.loadJobsFromDb(); + try { + await runner.loadJobsFromDb(); + throw new Error('PLOP'); // the line above should throw something else + } catch (e) { + expect(e.message).to.not.eq('PLOP'); + } + }); + it('unsorted list is sorted after loading', async () => { + const unsorted = [ + getFakeConfigurationJobPersisted(1), + getFakeConfigurationJobPersisted(5), + getFakeConfigurationJobPersisted(0), + ]; + getItemById.resolves({ + id: '', + value: JSON.stringify(unsorted), + }); + + await runner.loadJobsFromDb(); + + const jobList = runner.getJobList(); + expect(jobList).to.be.deep.eq( + unsorted.sort((a, b) => a.nextAttemptTimestamp - b.nextAttemptTimestamp) + ); + }); + + it('invalid stored data results in empty array of jobs', async () => { + const unsorted = { invalid: 'data' }; + getItemById.resolves({ + id: '', + value: JSON.stringify(unsorted), + }); + + await runner.loadJobsFromDb(); + + const jobList = runner.getJobList(); + expect(jobList).to.be.deep.eq([]); + }); + + it('no stored data results in empty array of jobs', async () => { + getItemById.resolves(null); + + await runner.loadJobsFromDb(); + + const jobList = runner.getJobList(); + expect(jobList).to.be.deep.eq([]); + }); + }); + + describe('addJob', () => { + it('can add configurationSyncJob ', async () => { + await runner.loadJobsFromDb(); + const job = getFakeConfigurationJob(123); + const persisted = job.serializeJob(); + await runner.addJob(job); + + expect(runner.getJobList()).to.deep.eq([persisted]); + }); + it('does not add a second configurationSyncJob if one is already there', async () => { + await runner.loadJobsFromDb(); + const job = getFakeConfigurationJob(123); + const job2 = getFakeConfigurationJob(1234); + await runner.addJob(job); + await runner.addJob(job2); + const persisted = job.serializeJob(); + + expect(runner.getJobList()).to.deep.eq([persisted]); + }); + }); + + describe('startProcessing Config Sync Jobs', () => { + it('does not trigger anything if no job present ', async () => { + await runner.loadJobsFromDb(); + expect(runner.startProcessing()).to.be.eq('no_job'); + }); + + it('triggers a job right away if there is a job which should already be running', async () => { + await runner.loadJobsFromDb(); + clock.tick(100); + const job = getFakeConfigurationJob(50); + await runner.addJob(job); + expect(runner.startProcessing()).to.be.eq('job_started'); + }); + + it('plans a deffered job if there is a job starting later', async () => { + await runner.loadJobsFromDb(); + clock.tick(100); + const job = getFakeConfigurationJob(150); + await runner.addJob(job); + expect(runner.startProcessing()).to.be.eq('job_deferred'); + }); + }); +});