tesat: add tests for the JobRunner class

pull/2620/head
Audric Ackermann 2 years ago
parent b3995c117b
commit 141c22ed43

@ -26,7 +26,7 @@ import { cleanUpOldDecryptedMedias } from '../../session/crypto/DecryptedAttachm
import { DURATION } from '../../session/constants';
import { editProfileModal, onionPathModal } from '../../state/ducks/modalDialog';
import { onionPathModal } from '../../state/ducks/modalDialog';
import { uploadOurAvatar } from '../../interactions/conversationInteractions';
import { debounce, isEmpty, isString } from 'lodash';
@ -67,7 +67,15 @@ const Section = (props: { type: SectionType }) => {
const handleClick = async () => {
/* tslint:disable:no-void-expression */
if (type === SectionType.Profile) {
dispatch(editProfileModal({}));
const message = new SharedConfigMessage({
data: new Uint8Array([1, 2, 3]),
kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE,
seqno: Long.fromNumber(0),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
});
await getMessageQueue().sendSyncMessage({ message, namespace: SnodeNamespaces.UserProfile });
console.warn('FIXME');
// dispatch(editProfileModal({}));
} else if (type === SectionType.ColorMode) {
const currentTheme = String(window.Events.getThemeSetting());
const newTheme = (isDarkMode

@ -1,4 +1,8 @@
import { isEmpty, isString } from 'lodash';
import {
FakeSleepForJob,
FakeSleepForMultiJob,
} from '../../../test/session/unit/utils/job_runner/FakeSleepForJob';
import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob';
import { Persistedjob, PersistedJobType, SerializedPersistedJob } from './PersistedJob';
@ -15,6 +19,22 @@ export function persistedJobFromData(data: SerializedPersistedJob): Persistedjob
nextAttemptTimestamp: data.nextAttemptTimestamp,
currentRetry: data.currentRetry,
});
case 'FakeSleepForJobType':
return new FakeSleepForJob({
maxAttempts: data.maxAttempts,
identifier: data.identifier,
nextAttemptTimestamp: data.nextAttemptTimestamp,
currentRetry: data.currentRetry,
});
case 'FakeSleepForJobMultiType':
return new FakeSleepForMultiJob({
maxAttempts: data.maxAttempts,
identifier: data.identifier,
nextAttemptTimestamp: data.nextAttemptTimestamp,
currentRetry: data.currentRetry,
returnResult: data.returnResult,
sleepDuration: data.sleepDuration,
});
default:
console.warn('unknown persisted job type:', jobType);
return null;

@ -5,20 +5,21 @@ import { JobRunnerType } from './jobs/JobRunnerType';
import { Persistedjob, SerializedPersistedJob } from './PersistedJob';
/**
* 'not_running' when the queue is not running
* 'already_started' if startProcessing was called already once before
* 'job_in_progress' if there is already a job in progress
* 'job_deferred' if there is a next job, but too far in the future to start it now
* 'job_started' a job was pending to be started and we could start it, so we started it
* 'no_job' if there are no jobs to be run at all
*/
export type StartProcessingResult =
| 'not_running'
| 'already_started'
| 'job_in_progress'
| 'job_deferred'
| 'job_started'
| 'no_job';
export type StartProcessingResult = 'job_in_progress' | 'job_deferred' | 'job_started' | 'no_job';
export type AddJobResult = 'job_deferred' | 'job_started';
export type JobEventListener = {
onJobSuccess: (job: SerializedPersistedJob) => void;
onJobDeferred: (job: SerializedPersistedJob) => void;
onJobError: (job: SerializedPersistedJob) => void;
onJobStarted: (job: SerializedPersistedJob) => void;
};
export class PersistedJobRunner {
private isInit = false;
@ -27,9 +28,11 @@ export class PersistedJobRunner {
private readonly jobRunnerType: JobRunnerType;
private nextJobStartTimer: NodeJS.Timeout | null = null;
private currentJob: Persistedjob | null = null;
private readonly jobEventsListener: JobEventListener | null;
constructor(jobRunnerType: JobRunnerType) {
constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) {
this.jobRunnerType = jobRunnerType;
this.jobEventsListener = jobEventsListener;
console.warn('new runner');
}
@ -61,30 +64,31 @@ export class PersistedJobRunner {
this.isInit = true;
}
public async addJob(job: Persistedjob) {
public async addJob(
job: Persistedjob
): Promise<'type_exists' | 'identifier_exists' | AddJobResult> {
this.assertIsInitialized();
if (job.singleJobInQueue) {
// make sure there is no job with that same type already scheduled.
if (this.jobsScheduled.find(j => j.jobType === job.jobType)) {
console.info(
`job runner has already a job "${job.identifier}" planned so not adding another one`
`job runner has already a job with type:"${job.jobType}" planned so not adding another one`
);
return;
}
this.jobsScheduled.push(job);
this.sortJobsList();
await this.writeJobsToDB();
if (this.isStarted) {
// a new job was added. trigger it if we can/have to start it
this.planNextJob();
return 'type_exists';
}
return this.addJobUnchecked(job);
}
return;
// make sure there is no job with that same identifier already .
if (this.jobsScheduled.find(j => j.identifier === job.identifier)) {
console.info(
`job runner has already a job with id:"${job.identifier}" planned so not adding another one`
);
return 'identifier_exists';
}
throw new Error('persisted job runner does not support non single type for now.');
console.info(`job runner adding type :"${job.jobType}" `);
return this.addJobUnchecked(job);
}
/**
@ -106,21 +110,32 @@ export class PersistedJobRunner {
}
/**
* if we are running a job, this call will await until the job is done
* if we are running a job, this call will await until the job is done and stop the queue
*/
public async stopAndWaitCurrentJob() {
public async stopAndWaitCurrentJob(): Promise<'no_await' | 'await'> {
if (!this.isStarted || !this.currentJob) {
return;
return 'no_await';
}
this.isStarted = false;
if (this.currentJob) {
await this.currentJob.waitForCurrentTry();
await this.currentJob.waitForCurrentTry();
return 'await';
}
/**
* if we are running a job, this call will await until the job is done.
* If another job must be run right away this one, we will also add the upcoming one as the currentJob.
*/
public async waitCurrentJob(): Promise<'no_await' | 'await'> {
if (!this.isStarted || !this.currentJob) {
return 'no_await';
}
await this.currentJob.waitForCurrentTry();
return 'await';
}
public startProcessing(): StartProcessingResult {
if (this.isStarted) {
return 'already_started';
throw new Error('startProcessing already called');
}
this.isStarted = true;
return this.planNextJob();
@ -139,6 +154,23 @@ export class PersistedJobRunner {
});
}
private async addJobUnchecked(job: Persistedjob) {
this.jobsScheduled.push(cloneDeep(job));
this.sortJobsList();
await this.writeJobsToDB();
// a new job was added. trigger it if we can/have to start it
const result = this.planNextJob();
console.warn('addJobUnchecked: ', result);
if (result === 'no_job') {
throw new Error('We just pushed a job, there cannot be no job');
}
if (result === 'job_in_progress') {
return 'job_deferred';
}
return result;
}
private getSerializedJobs() {
return this.jobsScheduled.map(m => m.serializeJob());
}
@ -148,24 +180,23 @@ export class PersistedJobRunner {
}
/**
* Returns 'not_running' if that job runner is not started at all
* Returns 'in_progress' if there is already a job running
* Returns 'job_in_progress' if there is already a job running
* Returns 'none' if there are no jobs to be started at all (or the runner is not running)
* Returns 'started' if there the next jobs was just started
* Returns 'deferred' if there is a next job but it is in the future and so wasn't started yet, but a timer is set.
* Returns 'job_deferred' if there is a next job but it is in the future and so wasn't started yet, but a timer is set.
*/
private planNextJob(): StartProcessingResult {
if (!this.isStarted) {
return 'not_running';
if (this.jobsScheduled.length) {
return 'job_deferred';
} else {
return 'no_job';
}
}
if (this.currentJob) {
return 'job_in_progress';
}
if (!this.jobsScheduled.length) {
return 'no_job';
}
const nextJob = this.jobsScheduled[0];
const nextJob = this.jobsScheduled?.[0];
if (!nextJob) {
return 'no_job';
@ -183,11 +214,11 @@ export class PersistedJobRunner {
// next job is not to be started right away, just plan our runner to be awakened when the time is right.
if (this.nextJobStartTimer) {
// remove the timer as there might be a more urgent job to be run before the one we have set here.
global.clearTimeout(this.nextJobStartTimer);
}
// plan a timer to wakeup when that timer is reached.
this.nextJobStartTimer = global.setTimeout(() => {
console.warn('wakeup timer');
if (this.nextJobStartTimer) {
global.clearTimeout(this.nextJobStartTimer);
this.nextJobStartTimer = null;
@ -200,13 +231,14 @@ export class PersistedJobRunner {
private deleteJobByIdentifier(identifier: string) {
const jobIndex = this.jobsScheduled.findIndex(f => f.identifier === identifier);
console.info('deleteJobByIdentifier job', identifier, ' index', jobIndex);
if (jobIndex >= 0) {
this.jobsScheduled.splice(jobIndex, 1);
}
}
private async runNextJob() {
console.warn('runNextJob called');
this.assertIsInitialized();
if (this.currentJob || !this.isStarted || !this.jobsScheduled.length) {
return;
@ -214,8 +246,12 @@ export class PersistedJobRunner {
const nextJob = this.jobsScheduled[0];
if (nextJob.nextAttemptTimestamp >= Date.now()) {
window.log.info('next job is not to be run just yet. Going idle.');
// if the time is 101, and that task is to be run at t=101, we need to start it right away.
if (nextJob.nextAttemptTimestamp > Date.now()) {
console.warn(
'next job is not due to be run just yet. Going idle.',
nextJob.nextAttemptTimestamp - Date.now()
);
this.planNextJob();
return;
}
@ -225,28 +261,44 @@ export class PersistedJobRunner {
return;
}
this.currentJob = nextJob;
this.jobEventsListener?.onJobStarted(this.currentJob.serializeJob());
const success = await this.currentJob.runJob();
if (!success) {
throw new Error(`job ${nextJob.identifier} failed`);
}
// here the job did not throw and didn't return false. Consider it OK then and remove it from the list of jobs to run.
this.deleteJobByIdentifier(this.currentJob.identifier);
await this.writeJobsToDB();
} catch (e) {
// either the job throw or didn't return 'OK'
if (nextJob.currentRetry >= nextJob.maxAttempts) {
if (nextJob.currentRetry >= nextJob.maxAttempts - 1) {
// we cannot restart this job anymore. Remove the entry completely
this.deleteJobByIdentifier(nextJob.identifier);
if (this.jobEventsListener && this.currentJob) {
this.jobEventsListener.onJobError(this.currentJob.serializeJob());
}
} else {
nextJob.currentRetry = nextJob.currentRetry + 1;
// that job can be restarted. Plan a retry later with the already defined retry
nextJob.nextAttemptTimestamp = Date.now() + nextJob.delayBetweenRetries;
if (this.jobEventsListener && this.currentJob) {
this.jobEventsListener.onJobDeferred(this.currentJob.serializeJob());
}
}
// in any case, either we removed a job or changed one of the timestamp.
// so sort the list again, and persist it
this.sortJobsList();
await this.writeJobsToDB();
} finally {
if (this.jobEventsListener && this.currentJob) {
this.jobEventsListener.onJobSuccess(this.currentJob.serializeJob());
}
this.currentJob = null;
// start the next job if there is any to be started now, or just plan the wakeup of our runner for the right time.
this.planNextJob();
}
@ -261,7 +313,7 @@ export class PersistedJobRunner {
}
}
const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob');
const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob', null);
export const runners = {
configurationSyncRunner,

@ -1,6 +1,9 @@
import { isEmpty } from 'lodash';
export type PersistedJobType = 'ConfigurationSyncJobType';
export type PersistedJobType =
| 'ConfigurationSyncJobType'
| 'FakeSleepForJobType'
| 'FakeSleepForJobMultiType';
export type SerializedPersistedJob = {
// we need at least those as they are needed to do lookups of the list of jobs.
@ -81,8 +84,13 @@ export abstract class Persistedjob {
* Can be used to wait for the task to be done before exiting the JobRunner
*/
public async waitForCurrentTry() {
// tslint:disable-next-line: no-promise-as-boolean
return this.runningPromise || Promise.resolve();
try {
// tslint:disable-next-line: no-promise-as-boolean
return this.runningPromise || Promise.resolve();
} catch (e) {
window.log.warn('waitForCurrentTry got an error: ', e.message);
return Promise.resolve();
}
}
/**

@ -1 +1 @@
export type JobRunnerType = 'ConfigurationSyncJob';
export type JobRunnerType = 'ConfigurationSyncJob' | 'FakeSleepForJob';

@ -0,0 +1,98 @@
import { isNumber } from 'lodash';
import { v4 } from 'uuid';
import { sleepFor } from '../../../../../session/utils/Promise';
import {
Persistedjob,
SerializedPersistedJob,
} from '../../../../../session/utils/job_runners/PersistedJob';
export class FakeSleepForMultiJob extends Persistedjob {
private readonly sleepDuration: number;
private readonly returnResult: boolean;
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
returnResult,
sleepDuration,
}: {
identifier: string | null;
nextAttemptTimestamp: number | null;
maxAttempts: number | null;
currentRetry: number;
sleepDuration: number;
returnResult: boolean;
}) {
super({
jobType: 'FakeSleepForJobMultiType',
identifier: identifier || v4(),
delayBetweenRetries: 10000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000,
singleJobInQueue: false,
currentRetry,
});
this.returnResult = returnResult;
this.sleepDuration = sleepDuration;
if (process.env.NODE_APP_INSTANCE !== undefined) {
throw new Error('FakeSleepForJobMultiType are only meant for testing purposes');
}
}
public async run() {
console.warn(
`running job ${this.jobType} with id:"${this.identifier}". sleeping for ${this.sleepDuration} & returning ${this.returnResult} `
);
await sleepFor(this.sleepDuration);
console.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `);
return this.returnResult;
}
public serializeJob(): SerializedPersistedJob {
const fromParent = super.serializeBase();
fromParent.sleepDuration = this.sleepDuration;
fromParent.returnResult = this.returnResult;
return fromParent;
}
}
export class FakeSleepForJob extends Persistedjob {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
}: {
identifier: string | null;
nextAttemptTimestamp: number | null;
maxAttempts: number | null;
currentRetry: number;
}) {
super({
jobType: 'FakeSleepForJobType',
identifier: identifier || v4(),
delayBetweenRetries: 10000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000,
singleJobInQueue: true,
currentRetry,
});
if (process.env.NODE_APP_INSTANCE !== undefined) {
throw new Error('FakeSleepForJob are only meant for testing purposes');
}
}
public async run() {
console.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
await sleepFor(5000);
console.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `);
return false;
}
public serializeJob(): SerializedPersistedJob {
const fromParent = super.serializeBase();
return fromParent;
}
}

@ -1,44 +1,76 @@
import { expect } from 'chai';
import _ from 'lodash';
import _, { isUndefined } from 'lodash';
import Sinon from 'sinon';
import { v4 } from 'uuid';
import { persistedJobFromData } from '../../../../../session/utils/job_runners/JobDeserialization';
import { PersistedJobRunner } from '../../../../../session/utils/job_runners/JobRunner';
import {
JobEventListener,
PersistedJobRunner,
} from '../../../../../session/utils/job_runners/JobRunner';
import { FakeSleepForJob, FakeSleepForMultiJob } from './FakeSleepForJob';
import { SerializedPersistedJob } from '../../../../../session/utils/job_runners/PersistedJob';
import { sleepFor } from '../../../../../session/utils/Promise';
import { stubData } from '../../../../test-utils/utils';
function getFakeConfigurationJobPersisted(timestamp: number): SerializedPersistedJob {
return {
jobType: 'ConfigurationSyncJobType',
function getFakeSleepForJob(timestamp: number): FakeSleepForJob {
const job = new FakeSleepForJob({
identifier: v4(),
delayBetweenRetries: 3000,
maxAttempts: 3,
nextAttemptTimestamp: timestamp || Date.now() + 3000,
singleJobInQueue: true,
nextAttemptTimestamp: timestamp || 3000,
currentRetry: 0,
};
});
return job;
}
function getFakeSleepForJobPersisted(timestamp: number): SerializedPersistedJob {
return getFakeSleepForJob(timestamp).serializeJob();
}
function getFakeConfigurationJob(timestamp: number) {
const job = persistedJobFromData(getFakeConfigurationJobPersisted(timestamp));
if (!job) {
throw new Error('persistedJobFromData failed');
}
function getFakeSleepForMultiJob({
timestamp,
identifier,
returnResult,
}: {
timestamp: number;
identifier?: string;
returnResult?: boolean;
}): FakeSleepForMultiJob {
const job = new FakeSleepForMultiJob({
identifier: identifier || v4(),
maxAttempts: 3,
nextAttemptTimestamp: timestamp || 3000,
currentRetry: 0,
returnResult: isUndefined(returnResult) ? true : returnResult,
sleepDuration: 5000,
});
return job;
}
// tslint:disable-next-line: max-func-body-length
describe('JobRunner', () => {
let getItemById: Sinon.SinonStub;
let createOrUpdateItem: Sinon.SinonStub;
let clock: Sinon.SinonFakeTimers;
let runner: PersistedJobRunner;
let jobEventsListener: JobEventListener;
beforeEach(() => {
getItemById = stubData('getItemById');
createOrUpdateItem = stubData('createOrUpdateItem');
clock = Sinon.useFakeTimers();
runner = new PersistedJobRunner('ConfigurationSyncJob');
stubData('createOrUpdateItem');
clock = Sinon.useFakeTimers({ shouldAdvanceTime: true });
jobEventsListener = {
onJobDeferred: (_job: SerializedPersistedJob) => {
// console.warn('listener got deferred for job ', job);
},
onJobSuccess: (_job: SerializedPersistedJob) => {
// console.warn('listener got success for job ', job);
},
onJobError: (_job: SerializedPersistedJob) => {
// console.warn('listener got error for job ', job);
},
onJobStarted: (_job: SerializedPersistedJob) => {
// console.warn('listener got started for job ', job);
},
};
runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener);
});
afterEach(() => {
@ -58,9 +90,9 @@ describe('JobRunner', () => {
});
it('unsorted list is sorted after loading', async () => {
const unsorted = [
getFakeConfigurationJobPersisted(1),
getFakeConfigurationJobPersisted(5),
getFakeConfigurationJobPersisted(0),
getFakeSleepForJobPersisted(1),
getFakeSleepForJobPersisted(5),
getFakeSleepForJobPersisted(0),
];
getItemById.resolves({
id: '',
@ -99,27 +131,160 @@ describe('JobRunner', () => {
});
describe('addJob', () => {
it('can add configurationSyncJob ', async () => {
it('can add FakeSleepForJob ', async () => {
await runner.loadJobsFromDb();
const job = getFakeConfigurationJob(123);
const job = getFakeSleepForJob(123);
const persisted = job.serializeJob();
await runner.addJob(job);
const result = await runner.addJob(job);
expect(result).to.be.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([persisted]);
});
it('does not add a second configurationSyncJob if one is already there', async () => {
it('does not add a second FakeSleepForJob if one is already there', async () => {
await runner.loadJobsFromDb();
const job = getFakeConfigurationJob(123);
const job2 = getFakeConfigurationJob(1234);
await runner.addJob(job);
await runner.addJob(job2);
const job = getFakeSleepForJob(123);
const job2 = getFakeSleepForJob(1234);
let result = await runner.addJob(job);
expect(result).to.eq('job_deferred');
result = await runner.addJob(job2);
expect(result).to.eq('type_exists');
const persisted = job.serializeJob();
expect(runner.getJobList()).to.deep.eq([persisted]);
});
it('can add a FakeSleepForJobMulti (sorted) even if one is already there', async () => {
await runner.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 1234 });
const job2 = getFakeSleepForMultiJob({ timestamp: 123 });
const job3 = getFakeSleepForMultiJob({ timestamp: 1 });
let result = await runner.addJob(job);
expect(result).to.eq('job_deferred');
result = await runner.addJob(job2);
expect(result).to.eq('job_deferred');
result = await runner.addJob(job3);
expect(result).to.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([
job3.serializeJob(),
job2.serializeJob(),
job.serializeJob(),
]);
});
it('cannot add a FakeSleepForJobMulti with an id already existing', async () => {
await runner.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 1234 });
const job2 = getFakeSleepForMultiJob({ timestamp: 123, identifier: job.identifier });
let result = await runner.addJob(job);
expect(result).to.be.eq('job_deferred');
result = await runner.addJob(job2);
expect(result).to.be.eq('identifier_exists');
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
});
it('two jobs are running sequentially', async () => {
await runner.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100 });
const job2 = getFakeSleepForMultiJob({ timestamp: 200 });
runner.startProcessing();
clock.tick(110);
// job should be started right away
let result = await runner.addJob(job);
expect(result).to.eq('job_started');
result = await runner.addJob(job2);
expect(result).to.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]);
expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]);
// each job takes 5s to finish, so let's tick once the first one should be done
clock.tick(5010);
await runner.waitCurrentJob();
clock.tick(5010);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]);
clock.tick(5000);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
});
it('adding one job after the first is done starts it', async () => {
await runner.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100 });
const job2 = getFakeSleepForMultiJob({ timestamp: 120 });
runner.startProcessing();
clock.tick(110);
// job should be started right away
let result = await runner.addJob(job);
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
expect(result).to.eq('job_started');
clock.tick(5010);
await runner.waitCurrentJob();
clock.tick(5010);
// just give some time for the runner to pick up a new job
await sleepFor(100);
// the first job should already be finished now
result = await runner.addJob(job2);
expect(result).to.eq('job_started');
expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]);
// each job takes 5s to finish, so let's tick once the first one should be done
clock.tick(5010);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
});
it('adding one job after the first is done schedules it', async () => {
await runner.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100 });
runner.startProcessing();
clock.tick(110);
// job should be started right away
let result = await runner.addJob(job);
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
expect(result).to.eq('job_started');
clock.tick(5010);
await runner.waitCurrentJob();
clock.tick(5010);
// just give some time for the runner to pick up a new job
await sleepFor(100);
const job2 = getFakeSleepForMultiJob({ timestamp: clock.now + 100 });
// job should already be finished now
result = await runner.addJob(job2);
// new job should be deferred as timestamp is not in the past
expect(result).to.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]);
// tick enough for the job to need to be started
clock.tick(100);
// that job2 should be running now
await sleepFor(100);
clock.tick(5000);
await job2.waitForCurrentTry();
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
});
});
describe('startProcessing Config Sync Jobs', () => {
describe('startProcessing FakeSleepForJob', () => {
it('does not trigger anything if no job present ', async () => {
await runner.loadJobsFromDb();
expect(runner.startProcessing()).to.be.eq('no_job');
@ -128,17 +293,107 @@ describe('JobRunner', () => {
it('triggers a job right away if there is a job which should already be running', async () => {
await runner.loadJobsFromDb();
clock.tick(100);
const job = getFakeConfigurationJob(50);
const job = getFakeSleepForJob(50);
await runner.addJob(job);
expect(runner.startProcessing()).to.be.eq('job_started');
});
it('plans a deffered job if there is a job starting later', async () => {
it('plans a deferred job if there is a job starting later', async () => {
await runner.loadJobsFromDb();
clock.tick(100);
const job = getFakeConfigurationJob(150);
const job = getFakeSleepForJob(150);
await runner.addJob(job);
expect(runner.startProcessing()).to.be.eq('job_deferred');
});
});
describe('stopAndWaitCurrentJob', () => {
it('does not await if no job at all ', async () => {
await runner.loadJobsFromDb();
runner.startProcessing();
expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await');
});
it('does not await if there are jobs but none are started', async () => {
await runner.loadJobsFromDb();
clock.tick(100);
const job = getFakeSleepForJob(150);
await runner.addJob(job);
expect(runner.startProcessing()).to.be.eq('job_deferred');
expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await');
});
it('does await if there are jobs and one is started', async () => {
await runner.loadJobsFromDb();
clock.tick(200);
const job = getFakeSleepForJob(150);
await runner.addJob(job);
expect(runner.startProcessing()).to.be.eq('job_started');
clock.tick(5000);
expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('await');
});
});
describe('retriesFailing Jobns', () => {
it('does not await if no job at all ', async () => {
await runner.loadJobsFromDb();
runner.startProcessing();
expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await');
});
it('does not await if there are jobs but none are started', async () => {
await runner.loadJobsFromDb();
clock.tick(100);
const job = getFakeSleepForJob(150);
await runner.addJob(job);
expect(runner.startProcessing()).to.be.eq('job_deferred');
expect(runner.stopAndWaitCurrentJob()).to.be.eventually.eq('no_await');
});
it('does await if there are jobs and one is started', async () => {
await runner.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100, returnResult: false }); // this job keeps failing
runner.startProcessing();
clock.tick(110);
// job should be started right away
const result = await runner.addJob(job);
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
expect(result).to.eq('job_started');
clock.tick(5010);
await runner.waitCurrentJob();
const jobUpdated = {
...job.serializeJob(),
nextAttemptTimestamp: clock.now + 10000,
currentRetry: 1,
};
// just give time for the runner to pick up a new job
await sleepFor(10);
// the job failed, so the job should still be there
expect(runner.getJobList()).to.deep.eq([jobUpdated]);
// that job should be retried now
clock.tick(11000);
await runner.waitCurrentJob();
const jobUpdated2 = {
...job.serializeJob(),
nextAttemptTimestamp: clock.now + 10000,
currentRetry: 2,
};
await sleepFor(10);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([jobUpdated2]);
// that job should be retried one more time and then removed from the list of jobs to be run
clock.tick(11000);
await runner.waitCurrentJob();
await sleepFor(10);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
});
});
});

@ -99,7 +99,7 @@
"comment": "Usage has been approved by Maxim on 13 Dec 2019"
}
],
"max-func-body-length": [true, 150]
"max-func-body-length": [true, 350]
},
"rulesDirectory": ["node_modules/tslint-microsoft-contrib"]
}

Loading…
Cancel
Save