@ -20,7 +20,6 @@ public protocol JobRunnerType {
func appDidFinishLaunching ( using dependencies : Dependencies )
func appDidBecomeActive ( using dependencies : Dependencies )
func startNonBlockingQueues ( using dependencies : Dependencies )
func enableNewSingleExecutionJobsOnly ( using dependencies : Dependencies )
func stopAndClearPendingJobs ( exceptForVariant : Job . Variant ? , using dependencies : Dependencies , onComplete : ( ( ) -> ( ) ) ? )
// MARK: - J o b S c h e d u l i n g
@ -205,7 +204,6 @@ public final class JobRunner: JobRunnerType {
internal var appReadyToStartQueues : Atomic < Bool > = Atomic ( false )
internal var appHasBecomeActive : Atomic < Bool > = Atomic ( false )
internal var forceAllowSingleExecutionJobs : Atomic < Bool > = Atomic ( false )
internal var perSessionJobsCompleted : Atomic < Set < Int64 > > = Atomic ( [ ] )
internal var hasCompletedInitialBecomeActive : Atomic < Bool > = Atomic ( false )
internal var shutdownBackgroundTask : Atomic < OWSBackgroundTask ? > = Atomic ( nil )
@ -230,6 +228,7 @@ public final class JobRunner: JobRunnerType {
self . allowToExecuteJobs = (
isTestingJobRunner || (
Singleton . hasAppContext &&
Singleton . appContext . isMainApp &&
! SNUtilitiesKit . isRunningTests
)
)
@ -322,7 +321,6 @@ public final class JobRunner: JobRunnerType {
// N o w t h a t w e ' v e f i n i s h e d s e t t i n g u p t h e J o b R u n n e r , u p d a t e t h e q u e u e c l o s u r e s
self . blockingQueue . mutate {
$0 ? . canStart = { [ weak self ] queue -> Bool in ( self ? . canStart ( queue : queue ) = = true ) }
$0 ? . canStartPendingJobs = { [ weak self ] queue -> Bool in ( self ? . canStartPendingJobs ( queue : queue ) = = true ) }
$0 ? . onQueueDrained = { [ weak self ] in
// O n c e a l l b l o c k i n g j o b s h a v e b e e n c o m p l e t e d w e w a n t t o s t a r t r u n n i n g
// t h e r e m a i n i n g j o b q u e u e s
@ -338,9 +336,6 @@ public final class JobRunner: JobRunnerType {
self . queues . mutate {
$0 . values . forEach { queue in
queue . canStart = { [ weak self ] targetQueue -> Bool in ( self ? . canStart ( queue : targetQueue ) = = true ) }
queue . canStartPendingJobs = { [ weak self ] targetQueue -> Bool in
( self ? . canStartPendingJobs ( queue : targetQueue ) = = true )
}
}
}
}
@ -353,19 +348,6 @@ public final class JobRunner: JobRunnerType {
}
public func canStart ( queue : JobQueue ? ) -> Bool {
return (
allowToExecuteJobs && (
forceAllowSingleExecutionJobs . wrappedValue || (
appReadyToStartQueues . wrappedValue && (
queue ? . type = = . blocking ||
canStartNonBlockingQueue
)
)
)
)
}
public func canStartPendingJobs ( queue : JobQueue ? ) -> Bool {
return (
allowToExecuteJobs &&
appReadyToStartQueues . wrappedValue && (
@ -465,17 +447,6 @@ public final class JobRunner: JobRunnerType {
}
public func appDidFinishLaunching ( using dependencies : Dependencies ) {
// C l e a r a n y ' r u n O n c e T r a n s i e n t ' e n t r i e s i n t h e d a t a b a s e ( t h e y s h o u l d o n l y e v e r b e r u n d u r i n g
// t h e a p p s e s s i o n t h a t t h e y w e r e s c h e d u l e d i n )
//
// N o t e : I f w e a r e a l r e a d y i n " s i n g l e - e x e c u t i o n m o d e " t h e n d o n ' t d o t h i s a s t h e r e c o u l d b e r u n n i n g
// j o b s ( t h i s c a s e o c c u r s d u r i n g O n b o a r d i n g w h e n t r y i n g t o r e t r i e v e t h e e x i s t i n g p r o f i l e n a m e )
if ! forceAllowSingleExecutionJobs . wrappedValue {
dependencies . storage . writeAsync { db in
try Job . filter ( Job . Columns . behaviour = = Job . Behaviour . runOnceTransient ) . deleteAll ( db )
}
}
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s
appReadyToStartQueues . mutate { $0 = true }
@ -539,7 +510,6 @@ public final class JobRunner: JobRunnerType {
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s a n d s t a r t q u e u e i n g n o n - l a u n c h j o b s
appReadyToStartQueues . mutate { $0 = true }
appHasBecomeActive . mutate { $0 = true }
forceAllowSingleExecutionJobs . mutate { $0 = false }
// I f w e h a v e a r u n n i n g " s u t d o w n B a c k g r o u n d T a s k " t h e n w e w a n t t o c a n c e l i t a s o t h e r w i s e i t
// c a n r e s u l t i n t h e d a t a b a s e b e i n g s u s p e n d e d a n d u s b e i n g u n a b l e t o i n t e r a c t w i t h i t a t a l l
@ -603,27 +573,6 @@ public final class JobRunner: JobRunnerType {
}
}
public func enableNewSingleExecutionJobsOnly ( using dependencies : Dependencies ) {
// I f w e h a v e a l r e a d y f u l l y s t a r t e d t h e J o b R u n n e r t h e n d o n ' t b o t h e r d o i n g t h i s ( t h i s s h o u l d n ' t
// c u r r e n t l y b e p o s s i b l e b u t m i g h t b e i n t h e f u t u r e a n d s w a p p i n g t h i s f l a g w h i l e t h e J o b R u n n e r
// i s i n i t ' s " n o r m a l " m o d e c o u l d r e s u l t i n u n e x p e c t e d b e h a v i o u r )
guard ! appReadyToStartQueues . wrappedValue else { return }
// C l e a r a n y ' r u n O n c e T r a n s i e n t ' e n t r i e s i n t h e d a t a b a s e ( t h e y s h o u l d o n l y e v e r b e r u n d u r i n g
// t h e a p p s e s s i o n t h a t t h e y w e r e s c h e d u l e d i n )
dependencies . storage . writeAsync { db in
try Job . filter ( Job . Columns . behaviour = = Job . Behaviour . runOnceTransient ) . deleteAll ( db )
}
// T h i s f u n c t i o n i s c a l l e d b y t h e a p p e x t e n s i o n s t o a l l o w t h e m t o r u n j o b s d i r e c t l y w i t h o u t
// t r i g g e r i n g a n y r e c u r r i n g o r p e n d i n g j o b s
//
// N o t e : T h i s w i l l o n l y a l l o w j o b s t o r u n i f t h e y a r e d i r e c t l y a d d e d t o a j o b q u e u e a s i f
// ` c a n S t a r t P e n d i n g J o b s ` r e t u r n s ` f a l s e ` t h e n a n y p e r s i s t e d j o b s * * W I L L N O T * * b e f e t c h e d a n d
// a d d e d t o t h e q u e u e
forceAllowSingleExecutionJobs . mutate { $0 = true }
}
public func stopAndClearPendingJobs (
exceptForVariant : Job . Variant ? ,
using dependencies : Dependencies ,
@ -859,10 +808,6 @@ public final class JobRunner: JobRunnerType {
return (
job . behaviour = = . runOnceNextLaunch ||
job . behaviour = = . recurringOnLaunch ||
(
job . behaviour = = . runOnceTransient &&
forceAllowSingleExecutionJobs . wrappedValue
) ||
appHasBecomeActive . wrappedValue
)
}
@ -1014,7 +959,6 @@ public final class JobQueue: Hashable {
private var executorMap : Atomic < [ Job . Variant : JobExecutor . Type ] > = Atomic ( [ : ] )
fileprivate var canStart : ( ( JobQueue ? ) -> Bool ) ?
fileprivate var canStartPendingJobs : ( ( JobQueue ? ) -> Bool ) ?
fileprivate var onQueueDrained : ( ( ) -> ( ) ) ?
fileprivate var hasStartedAtLeastOnce : Atomic < Bool > = Atomic ( false )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
@ -1319,13 +1263,11 @@ public final class JobQueue: Hashable {
hasStartedAtLeastOnce . mutate { $0 = true }
// G e t a n y p e n d i n g j o b s
var jobsToRun : [ Job ] = [ ]
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
if canStartPendingJobs ? ( self ) = = true {
let jobVariants : [ Job . Variant ] = self . jobVariants
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
let jobsAlreadyInQueue : Set < Int64 > = pendingJobsQueue . wrappedValue . compactMap { $0 . id } . asSet ( )
jobsToRun = dependencies . storage . read ( using : dependencies ) { db in
let jobsToRun : [ Job ] = dependencies . storage . read ( using : dependencies ) { db in
try Job
. filterPendingJobs (
variants : jobVariants ,
@ -1337,7 +1279,6 @@ public final class JobQueue: Hashable {
. fetchAll ( db )
}
. defaulting ( to : [ ] )
}
// D e t e r m i n e t h e n u m b e r o f j o b s t o r u n
var jobCount : Int = 0
@ -1524,14 +1465,6 @@ public final class JobQueue: Hashable {
}
private func scheduleNextSoonestJob ( using dependencies : Dependencies ) {
// I f w e c a n ' t s c h e d u l e p e n d i n g j o b s t h e n c o m p l e t e t h e q u e u e
guard canStartPendingJobs ? ( self ) = = true else {
if executionType != . concurrent || currentlyRunningJobIds . wrappedValue . isEmpty {
self . onQueueDrained ? ( )
}
return
}
// R e t r i e v e a n y p e n d i n g j o b s f r o m t h e d a t a b a s e
let jobVariants : [ Job . Variant ] = self . jobVariants
let jobIdsAlreadyRunning : Set < Int64 > = currentlyRunningJobIds . wrappedValue
@ -1914,10 +1847,6 @@ public extension JobRunner {
instance . appDidBecomeActive ( using : dependencies )
}
static func enableNewSingleExecutionJobsOnly ( using dependencies : Dependencies = Dependencies ( ) ) {
instance . enableNewSingleExecutionJobsOnly ( using : dependencies )
}
static func afterBlockingQueue ( callback : @ escaping ( ) -> ( ) ) {
instance . afterBlockingQueue ( callback : callback )
}