@ -131,6 +131,10 @@ public final class JobRunner {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return
return
}
}
guard ! canStartJob || updatedJob . id != nil else {
SNLog ( " [JobRunner] Not starting \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return
}
queues . mutate { $0 [ updatedJob . variant ] ? . add ( updatedJob , canStartJob : canStartJob ) }
queues . mutate { $0 [ updatedJob . variant ] ? . add ( updatedJob , canStartJob : canStartJob ) }
@ -150,6 +154,10 @@ public final class JobRunner {
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func upsert ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
public static func upsert ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
guard job . id != nil else {
add ( db , job : job , canStartJob : canStartJob )
return
}
queues . wrappedValue [ job . variant ] ? . upsert ( job , canStartJob : canStartJob )
queues . wrappedValue [ job . variant ] ? . upsert ( job , canStartJob : canStartJob )
@ -159,7 +167,7 @@ public final class JobRunner {
}
}
}
}
@ discardableResult public static func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> Job ? {
@ discardableResult public static func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> ( Int64 , Job ) ? {
switch job ? . behaviour {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
SNLog ( " [JobRunner] Attempted to insert \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job before the current one even though it's behaviour is \( job . map { " \( $0 . behaviour ) " } ? ? " unknown " ) " )
SNLog ( " [JobRunner] Attempted to insert \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job before the current one even though it's behaviour is \( job . map { " \( $0 . behaviour ) " } ? ? " unknown " ) " )
@ -173,6 +181,10 @@ public final class JobRunner {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return nil
return nil
}
}
guard let jobId : Int64 = updatedJob . id else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return nil
}
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
@ -181,7 +193,7 @@ public final class JobRunner {
queues . wrappedValue [ updatedJob . variant ] ? . start ( )
queues . wrappedValue [ updatedJob . variant ] ? . start ( )
}
}
return updatedJob
return ( jobId , updatedJob )
}
}
public static func appDidFinishLaunching ( ) {
public static func appDidFinishLaunching ( ) {
@ -499,6 +511,10 @@ private final class JobQueue {
job . behaviour != . runOnceNextLaunch ,
job . behaviour != . runOnceNextLaunch ,
job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970
job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970
else { return }
else { return }
guard job . id != nil else {
SNLog ( " [JobRunner] Prevented attempt to add \( job . variant ) job without id to queue " )
return
}
queue . mutate { $0 . append ( job ) }
queue . mutate { $0 . append ( job ) }
}
}
@ -510,7 +526,7 @@ private final class JobQueue {
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
fileprivate func upsert ( _ job : Job , canStartJob : Bool = true ) {
fileprivate func upsert ( _ job : Job , canStartJob : Bool = true ) {
guard let jobId : Int64 = job . id else {
guard let jobId : Int64 = job . id else {
add( job , canStartJob : canStartJob )
SNLog( " [JobRunner] Prevented attempt to upsert \( job . variant ) job without id to queue " )
return
return
}
}
@ -535,6 +551,11 @@ private final class JobQueue {
}
}
fileprivate func insert ( _ job : Job , before otherJob : Job ) {
fileprivate func insert ( _ job : Job , before otherJob : Job ) {
guard job . id != nil else {
SNLog ( " [JobRunner] Prevented attempt to insert \( job . variant ) job without id to queue " )
return
}
// I n s e r t t h e j o b b e f o r e t h e c u r r e n t j o b ( r e - a d d i n g t h e c u r r e n t j o b t o
// I n s e r t t h e j o b b e f o r e t h e c u r r e n t j o b ( r e - a d d i n g t h e c u r r e n t j o b t o
// t h e s t a r t o f t h e q u e u e i f i t ' s n o t i n t h e r e ) - t h i s w i l l m e a n t h e n e w
// t h e s t a r t o f t h e q u e u e i f i t ' s n o t i n t h e r e ) - t h i s w i l l m e a n t h e n e w
// j o b w i l l r u n a n d t h e n t h e o t h e r J o b w i l l r u n ( o r r u n a g a i n ) o n c e i t ' s
// j o b w i l l r u n a n d t h e n t h e o t h e r J o b w i l l r u n ( o r r u n a g a i n ) o n c e i t ' s
@ -634,7 +655,12 @@ private final class JobQueue {
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobsAlreadyInQueue : Set < Int64 > = queue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsAlreadyInQueue : Set < Int64 > = queue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsToRun : [ Job ] = Storage . shared . read { db in
let jobsToRun : [ Job ] = Storage . shared . read { db in
try Job . filterPendingJobs ( variants : jobVariants )
try Job
. filterPendingJobs (
variants : jobVariants ,
excludeFutureJobs : true ,
includeJobsWithDependencies : false
)
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. filter ( ! jobsAlreadyInQueue . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y i n t h e q u e u e
. filter ( ! jobsAlreadyInQueue . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y i n t h e q u e u e
. fetchAll ( db )
. fetchAll ( db )
@ -709,6 +735,11 @@ private final class JobQueue {
handleJobFailed ( nextJob , error : JobRunnerError . requiredInteractionIdMissing , permanentFailure : true )
handleJobFailed ( nextJob , error : JobRunnerError . requiredInteractionIdMissing , permanentFailure : true )
return
return
}
}
guard nextJob . id != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing id " )
handleJobFailed ( nextJob , error : JobRunnerError . jobIdMissing , permanentFailure : false )
return
}
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
guard nextJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 else {
guard nextJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 else {
@ -787,7 +818,7 @@ private final class JobQueue {
numJobsRunning = jobsCurrentlyRunning . count
numJobsRunning = jobsCurrentlyRunning . count
}
}
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . setting ( nextJob . id , nextJob . details ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . setting ( nextJob . id , nextJob . details ) }
SNLog ( " [JobRunner] \( queueContext ) started job (\( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
SNLog ( " [JobRunner] \( queueContext ) started \( nextJob . variant ) job (\( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
jobExecutor . run (
jobExecutor . run (
nextJob ,
nextJob ,
@ -809,7 +840,12 @@ private final class JobQueue {
private func scheduleNextSoonestJob ( ) {
private func scheduleNextSoonestJob ( ) {
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let nextJobTimestamp : TimeInterval ? = Storage . shared . read { db in
let nextJobTimestamp : TimeInterval ? = Storage . shared . read { db in
try Job . filterPendingJobs ( variants : jobVariants , excludeFutureJobs : false )
try Job
. filterPendingJobs (
variants : jobVariants ,
excludeFutureJobs : false ,
includeJobsWithDependencies : false
)
. select ( . nextRunTimestamp )
. select ( . nextRunTimestamp )
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. filter ( ! jobIdsAlreadyRunning . contains ( Job . Columns . id ) ) // E x c l u d e j o b s a l r e a d y r u n n i n g
. asRequest ( of : TimeInterval . self )
. asRequest ( of : TimeInterval . self )