From 141c22ed4308e7659e9ef84e1393b638d527b1aa Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Mon, 23 Jan 2023 16:13:11 +1100 Subject: [PATCH] tesat: add tests for the JobRunner class --- ts/components/leftpane/ActionsPanel.tsx | 12 +- .../utils/job_runners/JobDeserialization.ts | 20 ++ ts/session/utils/job_runners/JobRunner.ts | 142 +++++--- ts/session/utils/job_runners/PersistedJob.ts | 14 +- .../utils/job_runners/jobs/JobRunnerType.ts | 2 +- .../unit/utils/job_runner/FakeSleepForJob.ts | 98 ++++++ .../unit/utils/job_runner/JobRunner_test.ts | 325 ++++++++++++++++-- tslint.json | 2 +- 8 files changed, 528 insertions(+), 87 deletions(-) create mode 100644 ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts diff --git a/ts/components/leftpane/ActionsPanel.tsx b/ts/components/leftpane/ActionsPanel.tsx index cd6a639df..2edebaa29 100644 --- a/ts/components/leftpane/ActionsPanel.tsx +++ b/ts/components/leftpane/ActionsPanel.tsx @@ -26,7 +26,7 @@ import { cleanUpOldDecryptedMedias } from '../../session/crypto/DecryptedAttachm import { DURATION } from '../../session/constants'; -import { editProfileModal, onionPathModal } from '../../state/ducks/modalDialog'; +import { onionPathModal } from '../../state/ducks/modalDialog'; import { uploadOurAvatar } from '../../interactions/conversationInteractions'; import { debounce, isEmpty, isString } from 'lodash'; @@ -67,7 +67,15 @@ const Section = (props: { type: SectionType }) => { const handleClick = async () => { /* tslint:disable:no-void-expression */ if (type === SectionType.Profile) { - dispatch(editProfileModal({})); + const message = new SharedConfigMessage({ + data: new Uint8Array([1, 2, 3]), + kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE, + seqno: Long.fromNumber(0), + timestamp: GetNetworkTime.getNowWithNetworkOffset(), + }); + await getMessageQueue().sendSyncMessage({ message, namespace: SnodeNamespaces.UserProfile }); + console.warn('FIXME'); + // dispatch(editProfileModal({})); } else if (type === SectionType.ColorMode) { const currentTheme = String(window.Events.getThemeSetting()); const newTheme = (isDarkMode diff --git a/ts/session/utils/job_runners/JobDeserialization.ts b/ts/session/utils/job_runners/JobDeserialization.ts index 07fc2141d..0e800613e 100644 --- a/ts/session/utils/job_runners/JobDeserialization.ts +++ b/ts/session/utils/job_runners/JobDeserialization.ts @@ -1,4 +1,8 @@ import { isEmpty, isString } from 'lodash'; +import { + FakeSleepForJob, + FakeSleepForMultiJob, +} from '../../../test/session/unit/utils/job_runner/FakeSleepForJob'; import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob'; import { Persistedjob, PersistedJobType, SerializedPersistedJob } from './PersistedJob'; @@ -15,6 +19,22 @@ export function persistedJobFromData(data: SerializedPersistedJob): Persistedjob nextAttemptTimestamp: data.nextAttemptTimestamp, currentRetry: data.currentRetry, }); + case 'FakeSleepForJobType': + return new FakeSleepForJob({ + maxAttempts: data.maxAttempts, + identifier: data.identifier, + nextAttemptTimestamp: data.nextAttemptTimestamp, + currentRetry: data.currentRetry, + }); + case 'FakeSleepForJobMultiType': + return new FakeSleepForMultiJob({ + maxAttempts: data.maxAttempts, + identifier: data.identifier, + nextAttemptTimestamp: data.nextAttemptTimestamp, + currentRetry: data.currentRetry, + returnResult: data.returnResult, + sleepDuration: data.sleepDuration, + }); default: console.warn('unknown persisted job type:', jobType); return null; diff --git a/ts/session/utils/job_runners/JobRunner.ts b/ts/session/utils/job_runners/JobRunner.ts index 497096c0c..b75beebe4 100644 --- a/ts/session/utils/job_runners/JobRunner.ts +++ b/ts/session/utils/job_runners/JobRunner.ts @@ -5,20 +5,21 @@ import { JobRunnerType } from './jobs/JobRunnerType'; import { Persistedjob, SerializedPersistedJob } from './PersistedJob'; /** - * 'not_running' when the queue is not running - * 'already_started' if startProcessing was called already once before * 'job_in_progress' if there is already a job in progress * 'job_deferred' if there is a next job, but too far in the future to start it now * 'job_started' a job was pending to be started and we could start it, so we started it * 'no_job' if there are no jobs to be run at all */ -export type StartProcessingResult = - | 'not_running' - | 'already_started' - | 'job_in_progress' - | 'job_deferred' - | 'job_started' - | 'no_job'; +export type StartProcessingResult = 'job_in_progress' | 'job_deferred' | 'job_started' | 'no_job'; + +export type AddJobResult = 'job_deferred' | 'job_started'; + +export type JobEventListener = { + onJobSuccess: (job: SerializedPersistedJob) => void; + onJobDeferred: (job: SerializedPersistedJob) => void; + onJobError: (job: SerializedPersistedJob) => void; + onJobStarted: (job: SerializedPersistedJob) => void; +}; export class PersistedJobRunner { private isInit = false; @@ -27,9 +28,11 @@ export class PersistedJobRunner { private readonly jobRunnerType: JobRunnerType; private nextJobStartTimer: NodeJS.Timeout | null = null; private currentJob: Persistedjob | null = null; + private readonly jobEventsListener: JobEventListener | null; - constructor(jobRunnerType: JobRunnerType) { + constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) { this.jobRunnerType = jobRunnerType; + this.jobEventsListener = jobEventsListener; console.warn('new runner'); } @@ -61,30 +64,31 @@ export class PersistedJobRunner { this.isInit = true; } - public async addJob(job: Persistedjob) { + public async addJob( + job: Persistedjob + ): Promise<'type_exists' | 'identifier_exists' | AddJobResult> { this.assertIsInitialized(); if (job.singleJobInQueue) { // make sure there is no job with that same type already scheduled. if (this.jobsScheduled.find(j => j.jobType === job.jobType)) { console.info( - `job runner has already a job "${job.identifier}" planned so not adding another one` + `job runner has already a job with type:"${job.jobType}" planned so not adding another one` ); - return; - } - - this.jobsScheduled.push(job); - this.sortJobsList(); - await this.writeJobsToDB(); - - if (this.isStarted) { - // a new job was added. trigger it if we can/have to start it - this.planNextJob(); + return 'type_exists'; } + return this.addJobUnchecked(job); + } - return; + // make sure there is no job with that same identifier already . + if (this.jobsScheduled.find(j => j.identifier === job.identifier)) { + console.info( + `job runner has already a job with id:"${job.identifier}" planned so not adding another one` + ); + return 'identifier_exists'; } - throw new Error('persisted job runner does not support non single type for now.'); + console.info(`job runner adding type :"${job.jobType}" `); + return this.addJobUnchecked(job); } /** @@ -106,21 +110,32 @@ export class PersistedJobRunner { } /** - * if we are running a job, this call will await until the job is done + * if we are running a job, this call will await until the job is done and stop the queue */ - public async stopAndWaitCurrentJob() { + public async stopAndWaitCurrentJob(): Promise<'no_await' | 'await'> { if (!this.isStarted || !this.currentJob) { - return; + return 'no_await'; } this.isStarted = false; - if (this.currentJob) { - await this.currentJob.waitForCurrentTry(); + await this.currentJob.waitForCurrentTry(); + return 'await'; + } + + /** + * if we are running a job, this call will await until the job is done. + * If another job must be run right away this one, we will also add the upcoming one as the currentJob. + */ + public async waitCurrentJob(): Promise<'no_await' | 'await'> { + if (!this.isStarted || !this.currentJob) { + return 'no_await'; } + await this.currentJob.waitForCurrentTry(); + return 'await'; } public startProcessing(): StartProcessingResult { if (this.isStarted) { - return 'already_started'; + throw new Error('startProcessing already called'); } this.isStarted = true; return this.planNextJob(); @@ -139,6 +154,23 @@ export class PersistedJobRunner { }); } + private async addJobUnchecked(job: Persistedjob) { + this.jobsScheduled.push(cloneDeep(job)); + this.sortJobsList(); + await this.writeJobsToDB(); + // a new job was added. trigger it if we can/have to start it + const result = this.planNextJob(); + + console.warn('addJobUnchecked: ', result); + if (result === 'no_job') { + throw new Error('We just pushed a job, there cannot be no job'); + } + if (result === 'job_in_progress') { + return 'job_deferred'; + } + return result; + } + private getSerializedJobs() { return this.jobsScheduled.map(m => m.serializeJob()); } @@ -148,24 +180,23 @@ export class PersistedJobRunner { } /** - * Returns 'not_running' if that job runner is not started at all - * Returns 'in_progress' if there is already a job running + * Returns 'job_in_progress' if there is already a job running * Returns 'none' if there are no jobs to be started at all (or the runner is not running) * Returns 'started' if there the next jobs was just started - * Returns 'deferred' if there is a next job but it is in the future and so wasn't started yet, but a timer is set. + * Returns 'job_deferred' if there is a next job but it is in the future and so wasn't started yet, but a timer is set. */ private planNextJob(): StartProcessingResult { if (!this.isStarted) { - return 'not_running'; + if (this.jobsScheduled.length) { + return 'job_deferred'; + } else { + return 'no_job'; + } } if (this.currentJob) { return 'job_in_progress'; } - if (!this.jobsScheduled.length) { - return 'no_job'; - } - - const nextJob = this.jobsScheduled[0]; + const nextJob = this.jobsScheduled?.[0]; if (!nextJob) { return 'no_job'; @@ -183,11 +214,11 @@ export class PersistedJobRunner { // next job is not to be started right away, just plan our runner to be awakened when the time is right. if (this.nextJobStartTimer) { + // remove the timer as there might be a more urgent job to be run before the one we have set here. global.clearTimeout(this.nextJobStartTimer); } // plan a timer to wakeup when that timer is reached. this.nextJobStartTimer = global.setTimeout(() => { - console.warn('wakeup timer'); if (this.nextJobStartTimer) { global.clearTimeout(this.nextJobStartTimer); this.nextJobStartTimer = null; @@ -200,13 +231,14 @@ export class PersistedJobRunner { private deleteJobByIdentifier(identifier: string) { const jobIndex = this.jobsScheduled.findIndex(f => f.identifier === identifier); + console.info('deleteJobByIdentifier job', identifier, ' index', jobIndex); + if (jobIndex >= 0) { this.jobsScheduled.splice(jobIndex, 1); } } private async runNextJob() { - console.warn('runNextJob called'); this.assertIsInitialized(); if (this.currentJob || !this.isStarted || !this.jobsScheduled.length) { return; @@ -214,8 +246,12 @@ export class PersistedJobRunner { const nextJob = this.jobsScheduled[0]; - if (nextJob.nextAttemptTimestamp >= Date.now()) { - window.log.info('next job is not to be run just yet. Going idle.'); + // if the time is 101, and that task is to be run at t=101, we need to start it right away. + if (nextJob.nextAttemptTimestamp > Date.now()) { + console.warn( + 'next job is not due to be run just yet. Going idle.', + nextJob.nextAttemptTimestamp - Date.now() + ); this.planNextJob(); return; } @@ -225,28 +261,44 @@ export class PersistedJobRunner { return; } this.currentJob = nextJob; + + this.jobEventsListener?.onJobStarted(this.currentJob.serializeJob()); + const success = await this.currentJob.runJob(); if (!success) { throw new Error(`job ${nextJob.identifier} failed`); } + // here the job did not throw and didn't return false. Consider it OK then and remove it from the list of jobs to run. this.deleteJobByIdentifier(this.currentJob.identifier); await this.writeJobsToDB(); } catch (e) { // either the job throw or didn't return 'OK' - if (nextJob.currentRetry >= nextJob.maxAttempts) { + if (nextJob.currentRetry >= nextJob.maxAttempts - 1) { // we cannot restart this job anymore. Remove the entry completely this.deleteJobByIdentifier(nextJob.identifier); + if (this.jobEventsListener && this.currentJob) { + this.jobEventsListener.onJobError(this.currentJob.serializeJob()); + } } else { + nextJob.currentRetry = nextJob.currentRetry + 1; // that job can be restarted. Plan a retry later with the already defined retry nextJob.nextAttemptTimestamp = Date.now() + nextJob.delayBetweenRetries; + if (this.jobEventsListener && this.currentJob) { + this.jobEventsListener.onJobDeferred(this.currentJob.serializeJob()); + } } // in any case, either we removed a job or changed one of the timestamp. // so sort the list again, and persist it this.sortJobsList(); await this.writeJobsToDB(); } finally { + if (this.jobEventsListener && this.currentJob) { + this.jobEventsListener.onJobSuccess(this.currentJob.serializeJob()); + } + this.currentJob = null; + // start the next job if there is any to be started now, or just plan the wakeup of our runner for the right time. this.planNextJob(); } @@ -261,7 +313,7 @@ export class PersistedJobRunner { } } -const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob'); +const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob', null); export const runners = { configurationSyncRunner, diff --git a/ts/session/utils/job_runners/PersistedJob.ts b/ts/session/utils/job_runners/PersistedJob.ts index c73616b0c..632429b4f 100644 --- a/ts/session/utils/job_runners/PersistedJob.ts +++ b/ts/session/utils/job_runners/PersistedJob.ts @@ -1,6 +1,9 @@ import { isEmpty } from 'lodash'; -export type PersistedJobType = 'ConfigurationSyncJobType'; +export type PersistedJobType = + | 'ConfigurationSyncJobType' + | 'FakeSleepForJobType' + | 'FakeSleepForJobMultiType'; export type SerializedPersistedJob = { // we need at least those as they are needed to do lookups of the list of jobs. @@ -81,8 +84,13 @@ export abstract class Persistedjob { * Can be used to wait for the task to be done before exiting the JobRunner */ public async waitForCurrentTry() { - // tslint:disable-next-line: no-promise-as-boolean - return this.runningPromise || Promise.resolve(); + try { + // tslint:disable-next-line: no-promise-as-boolean + return this.runningPromise || Promise.resolve(); + } catch (e) { + window.log.warn('waitForCurrentTry got an error: ', e.message); + return Promise.resolve(); + } } /** diff --git a/ts/session/utils/job_runners/jobs/JobRunnerType.ts b/ts/session/utils/job_runners/jobs/JobRunnerType.ts index fdf5c9bd4..1b448416b 100644 --- a/ts/session/utils/job_runners/jobs/JobRunnerType.ts +++ b/ts/session/utils/job_runners/jobs/JobRunnerType.ts @@ -1 +1 @@ -export type JobRunnerType = 'ConfigurationSyncJob'; +export type JobRunnerType = 'ConfigurationSyncJob' | 'FakeSleepForJob'; diff --git a/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts b/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts new file mode 100644 index 000000000..68a7340c7 --- /dev/null +++ b/ts/test/session/unit/utils/job_runner/FakeSleepForJob.ts @@ -0,0 +1,98 @@ +import { isNumber } from 'lodash'; +import { v4 } from 'uuid'; +import { sleepFor } from '../../../../../session/utils/Promise'; +import { + Persistedjob, + SerializedPersistedJob, +} from '../../../../../session/utils/job_runners/PersistedJob'; + +export class FakeSleepForMultiJob extends Persistedjob { + private readonly sleepDuration: number; + private readonly returnResult: boolean; + + constructor({ + identifier, + nextAttemptTimestamp, + maxAttempts, + currentRetry, + returnResult, + sleepDuration, + }: { + identifier: string | null; + nextAttemptTimestamp: number | null; + maxAttempts: number | null; + currentRetry: number; + sleepDuration: number; + returnResult: boolean; + }) { + super({ + jobType: 'FakeSleepForJobMultiType', + identifier: identifier || v4(), + delayBetweenRetries: 10000, + maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3, + nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000, + singleJobInQueue: false, + currentRetry, + }); + this.returnResult = returnResult; + this.sleepDuration = sleepDuration; + if (process.env.NODE_APP_INSTANCE !== undefined) { + throw new Error('FakeSleepForJobMultiType are only meant for testing purposes'); + } + } + + public async run() { + console.warn( + `running job ${this.jobType} with id:"${this.identifier}". sleeping for ${this.sleepDuration} & returning ${this.returnResult} ` + ); + await sleepFor(this.sleepDuration); + console.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `); + return this.returnResult; + } + + public serializeJob(): SerializedPersistedJob { + const fromParent = super.serializeBase(); + fromParent.sleepDuration = this.sleepDuration; + fromParent.returnResult = this.returnResult; + return fromParent; + } +} + +export class FakeSleepForJob extends Persistedjob { + constructor({ + identifier, + nextAttemptTimestamp, + maxAttempts, + currentRetry, + }: { + identifier: string | null; + nextAttemptTimestamp: number | null; + maxAttempts: number | null; + currentRetry: number; + }) { + super({ + jobType: 'FakeSleepForJobType', + identifier: identifier || v4(), + delayBetweenRetries: 10000, + maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3, + nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000, + singleJobInQueue: true, + currentRetry, + }); + if (process.env.NODE_APP_INSTANCE !== undefined) { + throw new Error('FakeSleepForJob are only meant for testing purposes'); + } + } + + public async run() { + console.warn(`running job ${this.jobType} with id:"${this.identifier}" `); + await sleepFor(5000); + console.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `); + return false; + } + + public serializeJob(): SerializedPersistedJob { + const fromParent = super.serializeBase(); + return fromParent; + } +} diff --git a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts index beaff7e90..8dc5ff062 100644 --- a/ts/test/session/unit/utils/job_runner/JobRunner_test.ts +++ b/ts/test/session/unit/utils/job_runner/JobRunner_test.ts @@ -1,44 +1,76 @@ import { expect } from 'chai'; -import _ from 'lodash'; +import _, { isUndefined } from 'lodash'; import Sinon from 'sinon'; import { v4 } from 'uuid'; -import { persistedJobFromData } from '../../../../../session/utils/job_runners/JobDeserialization'; -import { PersistedJobRunner } from '../../../../../session/utils/job_runners/JobRunner'; +import { + JobEventListener, + PersistedJobRunner, +} from '../../../../../session/utils/job_runners/JobRunner'; +import { FakeSleepForJob, FakeSleepForMultiJob } from './FakeSleepForJob'; import { SerializedPersistedJob } from '../../../../../session/utils/job_runners/PersistedJob'; +import { sleepFor } from '../../../../../session/utils/Promise'; import { stubData } from '../../../../test-utils/utils'; -function getFakeConfigurationJobPersisted(timestamp: number): SerializedPersistedJob { - return { - jobType: 'ConfigurationSyncJobType', +function getFakeSleepForJob(timestamp: number): FakeSleepForJob { + const job = new FakeSleepForJob({ identifier: v4(), - delayBetweenRetries: 3000, maxAttempts: 3, - nextAttemptTimestamp: timestamp || Date.now() + 3000, - singleJobInQueue: true, + nextAttemptTimestamp: timestamp || 3000, currentRetry: 0, - }; + }); + return job; +} + +function getFakeSleepForJobPersisted(timestamp: number): SerializedPersistedJob { + return getFakeSleepForJob(timestamp).serializeJob(); } -function getFakeConfigurationJob(timestamp: number) { - const job = persistedJobFromData(getFakeConfigurationJobPersisted(timestamp)); - if (!job) { - throw new Error('persistedJobFromData failed'); - } +function getFakeSleepForMultiJob({ + timestamp, + identifier, + returnResult, +}: { + timestamp: number; + identifier?: string; + returnResult?: boolean; +}): FakeSleepForMultiJob { + const job = new FakeSleepForMultiJob({ + identifier: identifier || v4(), + maxAttempts: 3, + nextAttemptTimestamp: timestamp || 3000, + currentRetry: 0, + returnResult: isUndefined(returnResult) ? true : returnResult, + sleepDuration: 5000, + }); return job; } +// tslint:disable-next-line: max-func-body-length describe('JobRunner', () => { let getItemById: Sinon.SinonStub; - let createOrUpdateItem: Sinon.SinonStub; let clock: Sinon.SinonFakeTimers; - let runner: PersistedJobRunner; + let jobEventsListener: JobEventListener; beforeEach(() => { getItemById = stubData('getItemById'); - createOrUpdateItem = stubData('createOrUpdateItem'); - clock = Sinon.useFakeTimers(); - runner = new PersistedJobRunner('ConfigurationSyncJob'); + stubData('createOrUpdateItem'); + clock = Sinon.useFakeTimers({ shouldAdvanceTime: true }); + jobEventsListener = { + onJobDeferred: (_job: SerializedPersistedJob) => { + // console.warn('listener got deferred for job ', job); + }, + onJobSuccess: (_job: SerializedPersistedJob) => { + // console.warn('listener got success for job ', job); + }, + onJobError: (_job: SerializedPersistedJob) => { + // console.warn('listener got error for job ', job); + }, + onJobStarted: (_job: SerializedPersistedJob) => { + // console.warn('listener got started for job ', job); + }, + }; + runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener); }); afterEach(() => { @@ -58,9 +90,9 @@ describe('JobRunner', () => { }); it('unsorted list is sorted after loading', async () => { const unsorted = [ - getFakeConfigurationJobPersisted(1), - getFakeConfigurationJobPersisted(5), - getFakeConfigurationJobPersisted(0), + getFakeSleepForJobPersisted(1), + getFakeSleepForJobPersisted(5), + getFakeSleepForJobPersisted(0), ]; getItemById.resolves({ id: '', @@ -99,27 +131,160 @@ describe('JobRunner', () => { }); describe('addJob', () => { - it('can add configurationSyncJob ', async () => { + it('can add FakeSleepForJob ', async () => { await runner.loadJobsFromDb(); - const job = getFakeConfigurationJob(123); + const job = getFakeSleepForJob(123); const persisted = job.serializeJob(); - await runner.addJob(job); + const result = await runner.addJob(job); + expect(result).to.be.eq('job_deferred'); expect(runner.getJobList()).to.deep.eq([persisted]); }); - it('does not add a second configurationSyncJob if one is already there', async () => { + it('does not add a second FakeSleepForJob if one is already there', async () => { await runner.loadJobsFromDb(); - const job = getFakeConfigurationJob(123); - const job2 = getFakeConfigurationJob(1234); - await runner.addJob(job); - await runner.addJob(job2); + const job = getFakeSleepForJob(123); + const job2 = getFakeSleepForJob(1234); + let result = await runner.addJob(job); + expect(result).to.eq('job_deferred'); + result = await runner.addJob(job2); + expect(result).to.eq('type_exists'); const persisted = job.serializeJob(); expect(runner.getJobList()).to.deep.eq([persisted]); }); + + it('can add a FakeSleepForJobMulti (sorted) even if one is already there', async () => { + await runner.loadJobsFromDb(); + const job = getFakeSleepForMultiJob({ timestamp: 1234 }); + const job2 = getFakeSleepForMultiJob({ timestamp: 123 }); + const job3 = getFakeSleepForMultiJob({ timestamp: 1 }); + + let result = await runner.addJob(job); + expect(result).to.eq('job_deferred'); + + result = await runner.addJob(job2); + expect(result).to.eq('job_deferred'); + + result = await runner.addJob(job3); + expect(result).to.eq('job_deferred'); + + expect(runner.getJobList()).to.deep.eq([ + job3.serializeJob(), + job2.serializeJob(), + job.serializeJob(), + ]); + }); + + it('cannot add a FakeSleepForJobMulti with an id already existing', async () => { + await runner.loadJobsFromDb(); + const job = getFakeSleepForMultiJob({ timestamp: 1234 }); + const job2 = getFakeSleepForMultiJob({ timestamp: 123, identifier: job.identifier }); + let result = await runner.addJob(job); + expect(result).to.be.eq('job_deferred'); + result = await runner.addJob(job2); + expect(result).to.be.eq('identifier_exists'); + + expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + }); + + it('two jobs are running sequentially', async () => { + await runner.loadJobsFromDb(); + const job = getFakeSleepForMultiJob({ timestamp: 100 }); + const job2 = getFakeSleepForMultiJob({ timestamp: 200 }); + runner.startProcessing(); + clock.tick(110); + // job should be started right away + let result = await runner.addJob(job); + expect(result).to.eq('job_started'); + result = await runner.addJob(job2); + expect(result).to.eq('job_deferred'); + expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]); + expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]); + + // each job takes 5s to finish, so let's tick once the first one should be done + clock.tick(5010); + await runner.waitCurrentJob(); + clock.tick(5010); + await runner.waitCurrentJob(); + + expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]); + + clock.tick(5000); + await runner.waitCurrentJob(); + + expect(runner.getJobList()).to.deep.eq([]); + }); + + it('adding one job after the first is done starts it', async () => { + await runner.loadJobsFromDb(); + const job = getFakeSleepForMultiJob({ timestamp: 100 }); + const job2 = getFakeSleepForMultiJob({ timestamp: 120 }); + runner.startProcessing(); + clock.tick(110); + // job should be started right away + let result = await runner.addJob(job); + expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + + expect(result).to.eq('job_started'); + clock.tick(5010); + await runner.waitCurrentJob(); + clock.tick(5010); + + // just give some time for the runner to pick up a new job + await sleepFor(100); + + // the first job should already be finished now + result = await runner.addJob(job2); + expect(result).to.eq('job_started'); + expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]); + + // each job takes 5s to finish, so let's tick once the first one should be done + clock.tick(5010); + await runner.waitCurrentJob(); + + expect(runner.getJobList()).to.deep.eq([]); + }); + + it('adding one job after the first is done schedules it', async () => { + await runner.loadJobsFromDb(); + const job = getFakeSleepForMultiJob({ timestamp: 100 }); + runner.startProcessing(); + clock.tick(110); + // job should be started right away + let result = await runner.addJob(job); + expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + + expect(result).to.eq('job_started'); + clock.tick(5010); + await runner.waitCurrentJob(); + clock.tick(5010); + // just give some time for the runner to pick up a new job + + await sleepFor(100); + + const job2 = getFakeSleepForMultiJob({ timestamp: clock.now + 100 }); + + // job should already be finished now + result = await runner.addJob(job2); + // new job should be deferred as timestamp is not in the past + expect(result).to.eq('job_deferred'); + expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]); + + // tick enough for the job to need to be started + clock.tick(100); + + // that job2 should be running now + await sleepFor(100); + clock.tick(5000); + + await job2.waitForCurrentTry(); + await runner.waitCurrentJob(); + + expect(runner.getJobList()).to.deep.eq([]); + }); }); - describe('startProcessing Config Sync Jobs', () => { + describe('startProcessing FakeSleepForJob', () => { it('does not trigger anything if no job present ', async () => { await runner.loadJobsFromDb(); expect(runner.startProcessing()).to.be.eq('no_job'); @@ -128,17 +293,107 @@ describe('JobRunner', () => { it('triggers a job right away if there is a job which should already be running', async () => { await runner.loadJobsFromDb(); clock.tick(100); - const job = getFakeConfigurationJob(50); + const job = getFakeSleepForJob(50); await runner.addJob(job); expect(runner.startProcessing()).to.be.eq('job_started'); }); - it('plans a deffered job if there is a job starting later', async () => { + it('plans a deferred job if there is a job starting later', async () => { await runner.loadJobsFromDb(); clock.tick(100); - const job = getFakeConfigurationJob(150); + const job = getFakeSleepForJob(150); await runner.addJob(job); expect(runner.startProcessing()).to.be.eq('job_deferred'); }); }); + + describe('stopAndWaitCurrentJob', () => { + it('does not await if no job at all ', async () => { + await runner.loadJobsFromDb(); + runner.startProcessing(); + expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await'); + }); + + it('does not await if there are jobs but none are started', async () => { + await runner.loadJobsFromDb(); + clock.tick(100); + const job = getFakeSleepForJob(150); + await runner.addJob(job); + expect(runner.startProcessing()).to.be.eq('job_deferred'); + expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await'); + }); + + it('does await if there are jobs and one is started', async () => { + await runner.loadJobsFromDb(); + clock.tick(200); + const job = getFakeSleepForJob(150); + await runner.addJob(job); + expect(runner.startProcessing()).to.be.eq('job_started'); + clock.tick(5000); + + expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('await'); + }); + }); + + describe('retriesFailing Jobns', () => { + it('does not await if no job at all ', async () => { + await runner.loadJobsFromDb(); + runner.startProcessing(); + expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await'); + }); + + it('does not await if there are jobs but none are started', async () => { + await runner.loadJobsFromDb(); + clock.tick(100); + const job = getFakeSleepForJob(150); + await runner.addJob(job); + expect(runner.startProcessing()).to.be.eq('job_deferred'); + expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await'); + }); + + it('does await if there are jobs and one is started', async () => { + await runner.loadJobsFromDb(); + const job = getFakeSleepForMultiJob({ timestamp: 100, returnResult: false }); // this job keeps failing + runner.startProcessing(); + clock.tick(110); + // job should be started right away + const result = await runner.addJob(job); + expect(runner.getJobList()).to.deep.eq([job.serializeJob()]); + + expect(result).to.eq('job_started'); + clock.tick(5010); + await runner.waitCurrentJob(); + const jobUpdated = { + ...job.serializeJob(), + nextAttemptTimestamp: clock.now + 10000, + currentRetry: 1, + }; + // just give time for the runner to pick up a new job + await sleepFor(10); + + // the job failed, so the job should still be there + expect(runner.getJobList()).to.deep.eq([jobUpdated]); + + // that job should be retried now + clock.tick(11000); + await runner.waitCurrentJob(); + const jobUpdated2 = { + ...job.serializeJob(), + nextAttemptTimestamp: clock.now + 10000, + currentRetry: 2, + }; + await sleepFor(10); + + await runner.waitCurrentJob(); + expect(runner.getJobList()).to.deep.eq([jobUpdated2]); + + // that job should be retried one more time and then removed from the list of jobs to be run + clock.tick(11000); + await runner.waitCurrentJob(); + await sleepFor(10); + + await runner.waitCurrentJob(); + expect(runner.getJobList()).to.deep.eq([]); + }); + }); }); diff --git a/tslint.json b/tslint.json index 357cccdee..0838b513f 100644 --- a/tslint.json +++ b/tslint.json @@ -99,7 +99,7 @@ "comment": "Usage has been approved by Maxim on 13 Dec 2019" } ], - "max-func-body-length": [true, 150] + "max-func-body-length": [true, 350] }, "rulesDirectory": ["node_modules/tslint-microsoft-contrib"] }