You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			141 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			TypeScript
		
	
			
		
		
	
	
			141 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			TypeScript
		
	
import { cloneDeep, isEmpty } from 'lodash';
 | 
						|
 | 
						|
export type PersistedJobType =
 | 
						|
  | 'ConfigurationSyncJobType'
 | 
						|
  | 'AvatarDownloadJobType'
 | 
						|
  | 'FakeSleepForJobType'
 | 
						|
  | 'FakeSleepForJobMultiType';
 | 
						|
 | 
						|
interface PersistedJobData {
 | 
						|
  jobType: PersistedJobType;
 | 
						|
  identifier: string;
 | 
						|
  nextAttemptTimestamp: number;
 | 
						|
  delayBetweenRetries: number;
 | 
						|
  maxAttempts: number; // to try to run this job twice, set this to 2.
 | 
						|
  currentRetry: number;
 | 
						|
}
 | 
						|
 | 
						|
export interface FakeSleepJobData extends PersistedJobData {
 | 
						|
  jobType: 'FakeSleepForJobType';
 | 
						|
  returnResult: boolean;
 | 
						|
  sleepDuration: number;
 | 
						|
}
 | 
						|
export interface FakeSleepForMultiJobData extends PersistedJobData {
 | 
						|
  jobType: 'FakeSleepForJobMultiType';
 | 
						|
  returnResult: boolean;
 | 
						|
  sleepDuration: number;
 | 
						|
}
 | 
						|
 | 
						|
export interface AvatarDownloadPersistedData extends PersistedJobData {
 | 
						|
  jobType: 'AvatarDownloadJobType';
 | 
						|
  conversationId: string;
 | 
						|
}
 | 
						|
 | 
						|
export interface ConfigurationSyncPersistedData extends PersistedJobData {
 | 
						|
  jobType: 'ConfigurationSyncJobType';
 | 
						|
}
 | 
						|
 | 
						|
export type TypeOfPersistedData =
 | 
						|
  | ConfigurationSyncPersistedData
 | 
						|
  | AvatarDownloadPersistedData
 | 
						|
  | FakeSleepJobData
 | 
						|
  | FakeSleepForMultiJobData;
 | 
						|
 | 
						|
export type AddJobCheckReturn = 'skipAddSameJobPresent' | 'sameJobDataAlreadyInQueue' | null;
 | 
						|
 | 
						|
export enum RunJobResult {
 | 
						|
  Success = 1,
 | 
						|
  RetryJobIfPossible = 2,
 | 
						|
  PermanentFailure = 3,
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * This class can be used to save and run jobs from the database.
 | 
						|
 * Every child class must take the minimum amount of arguments, and make sure they are unlikely to change.
 | 
						|
 * For instance, don't have the attachments to downloads as arguments, just the messageId and the index.
 | 
						|
 * Don't have the new profileImage url for an avatar download job, just the conversationId.
 | 
						|
 *
 | 
						|
 * It is the role of the job to fetch the latest data, and decide if a process is needed or not
 | 
						|
 * If the job throws or returns false, it will be retried by the corresponding job runner.
 | 
						|
 */
 | 
						|
export abstract class PersistedJob<T extends PersistedJobData> {
 | 
						|
  public persistedData: T;
 | 
						|
 | 
						|
  private runningPromise: Promise<RunJobResult> | null = null;
 | 
						|
 | 
						|
  public constructor(data: T) {
 | 
						|
    if (data.maxAttempts < 1) {
 | 
						|
      throw new Error('maxAttempts must be >= 1');
 | 
						|
    }
 | 
						|
 | 
						|
    if (isEmpty(data.identifier)) {
 | 
						|
      throw new Error('identifier must be not empty');
 | 
						|
    }
 | 
						|
 | 
						|
    if (isEmpty(data.jobType)) {
 | 
						|
      throw new Error('jobType must be not empty');
 | 
						|
    }
 | 
						|
 | 
						|
    if (data.delayBetweenRetries <= 0) {
 | 
						|
      throw new Error('delayBetweenRetries must be at least > 0');
 | 
						|
    }
 | 
						|
 | 
						|
    if (data.nextAttemptTimestamp <= 0) {
 | 
						|
      throw new Error('nextAttemptTimestamp must be set and > 0');
 | 
						|
    }
 | 
						|
 | 
						|
    this.persistedData = data;
 | 
						|
  }
 | 
						|
 | 
						|
  public async runJob() {
 | 
						|
    if (!this.runningPromise) {
 | 
						|
      this.runningPromise = this.run();
 | 
						|
    }
 | 
						|
    return this.runningPromise;
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * If that job is running, wait for its completion (success or failure) before returning.
 | 
						|
   * Can be used to wait for the task to be done before exiting the JobRunner
 | 
						|
   */
 | 
						|
  public async waitForCurrentTry() {
 | 
						|
    try {
 | 
						|
      // tslint:disable-next-line: no-promise-as-boolean
 | 
						|
      return this.runningPromise || Promise.resolve(true);
 | 
						|
    } catch (e) {
 | 
						|
      window.log.warn('waitForCurrentTry got an error: ', e.message);
 | 
						|
      return Promise.resolve(true);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * This one must be reimplemented in the child class, and must first call `super.serializeBase()`
 | 
						|
   */
 | 
						|
  public abstract serializeJob(): T;
 | 
						|
 | 
						|
  public abstract nonRunningJobsToRemove(jobs: Array<T>): Array<T>;
 | 
						|
 | 
						|
  public abstract addJobCheck(jobs: Array<T>): AddJobCheckReturn;
 | 
						|
 | 
						|
  public addJobCheckSameTypePresent(jobs: Array<T>): 'skipAddSameJobPresent' | null {
 | 
						|
    return jobs.some(j => j.jobType === this.persistedData.jobType)
 | 
						|
      ? 'skipAddSameJobPresent'
 | 
						|
      : null;
 | 
						|
  }
 | 
						|
 | 
						|
  public abstract getJobTimeoutMs(): number;
 | 
						|
 | 
						|
  /**
 | 
						|
   * This function will be called by the runner do run the logic of that job.
 | 
						|
   * It **must** return true if that job is a success and doesn't need to be retried.
 | 
						|
   * If it returns false, or throws, it will be retried (if not reach the retries limit yet).
 | 
						|
   *
 | 
						|
   * Note: you should check the this.isAborted() to know if you should cancel the current processing of your logic.
 | 
						|
   */
 | 
						|
  protected abstract run(): Promise<RunJobResult>;
 | 
						|
 | 
						|
  protected serializeBase(): T {
 | 
						|
    return cloneDeep(this.persistedData);
 | 
						|
  }
 | 
						|
}
 |