@ -36,6 +36,13 @@ public protocol JobExecutor {
}
}
public final class JobRunner {
public final class JobRunner {
public enum JobResult {
case succeeded
case failed
case deferred
case notFound
}
private static let blockingQueue : Atomic < JobQueue ? > = Atomic (
private static let blockingQueue : Atomic < JobQueue ? > = Atomic (
JobQueue (
JobQueue (
type : . blocking ,
type : . blocking ,
@ -332,6 +339,15 @@ public final class JobRunner {
. defaulting ( to : [ : ] )
. defaulting ( to : [ : ] )
}
}
public static func afterCurrentlyRunningJob ( _ job : Job ? , callback : @ escaping ( JobResult ) -> ( ) ) {
guard let job : Job = job , let jobId : Int64 = job . id , let queue : JobQueue = queues . wrappedValue [ job . variant ] else {
callback ( . notFound )
return
}
queue . afterCurrentlyRunningJob ( jobId , callback : callback )
}
public static func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
public static func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
guard let targetQueue : JobQueue = queues . wrappedValue [ variant ] else { return false }
guard let targetQueue : JobQueue = queues . wrappedValue [ variant ] else { return false }
guard let detailsData : Data = try ? JSONEncoder ( ) . encode ( details ) else { return false }
guard let detailsData : Data = try ? JSONEncoder ( ) . encode ( details ) else { return false }
@ -339,6 +355,12 @@ public final class JobRunner {
return targetQueue . hasPendingOrRunningJob ( with : detailsData )
return targetQueue . hasPendingOrRunningJob ( with : detailsData )
}
}
public static func removePendingJob ( _ job : Job ? ) {
guard let job : Job = job , let jobId : Int64 = job . id else { return }
queues . wrappedValue [ job . variant ] ? . removePendingJob ( jobId )
}
// MARK: - C o n v e n i e n c e
// MARK: - C o n v e n i e n c e
fileprivate static func getRetryInterval ( for job : Job ) -> TimeInterval {
fileprivate static func getRetryInterval ( for job : Job ) -> TimeInterval {
@ -445,6 +467,7 @@ private final class JobQueue {
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
private var queue : Atomic < [ Job ] > = Atomic ( [ ] )
private var queue : Atomic < [ Job ] > = Atomic ( [ ] )
private var jobsCurrentlyRunning : Atomic < Set < Int64 > > = Atomic ( [ ] )
private var jobsCurrentlyRunning : Atomic < Set < Int64 > > = Atomic ( [ ] )
private var jobCallbacks : Atomic < [ Int64 : [ ( JobRunner . JobResult ) -> ( ) ] ] > = Atomic ( [ : ] )
private var detailsForCurrentlyRunningJobs : Atomic < [ Int64 : Data ? ] > = Atomic ( [ : ] )
private var detailsForCurrentlyRunningJobs : Atomic < [ Int64 : Data ? ] > = Atomic ( [ : ] )
private var deferLoopTracker : Atomic < [ Int64 : ( count : Int , times : [ TimeInterval ] ) ] > = Atomic ( [ : ] )
private var deferLoopTracker : Atomic < [ Int64 : ( count : Int , times : [ TimeInterval ] ) ] > = Atomic ( [ : ] )
@ -560,12 +583,29 @@ private final class JobQueue {
return detailsForCurrentlyRunningJobs . wrappedValue
return detailsForCurrentlyRunningJobs . wrappedValue
}
}
fileprivate func afterCurrentlyRunningJob ( _ jobId : Int64 , callback : @ escaping ( JobRunner . JobResult ) -> ( ) ) {
guard isCurrentlyRunning ( jobId ) else {
callback ( . notFound )
return
}
jobCallbacks . mutate { jobCallbacks in
jobCallbacks [ jobId ] = ( jobCallbacks [ jobId ] ? ? [ ] ) . appending ( callback )
}
}
fileprivate func hasPendingOrRunningJob ( with detailsData : Data ? ) -> Bool {
fileprivate func hasPendingOrRunningJob ( with detailsData : Data ? ) -> Bool {
let pendingJobs : [ Job ] = queue . wrappedValue
let pendingJobs : [ Job ] = queue . wrappedValue
return pendingJobs . contains { job in job . details = = detailsData }
return pendingJobs . contains { job in job . details = = detailsData }
}
}
fileprivate func removePendingJob ( _ jobId : Int64 ) {
queue . mutate { queue in
queue = queue . filter { $0 . id != jobId }
}
}
// MARK: - J o b R u n n i n g
// MARK: - J o b R u n n i n g
fileprivate func start ( force : Bool = false ) {
fileprivate func start ( force : Bool = false ) {
@ -900,10 +940,8 @@ private final class JobQueue {
}
}
}
}
// T h e j o b i s r e m o v e d f r o m t h e q u e u e b e f o r e i t r u n s s o a l l w e n e e d t o t o i s r e m o v e i t
// P e r f o r m j o b c l e a n u p a n d s t a r t t h e n e x t j o b
// f r o m t h e ' c u r r e n t l y R u n n i n g ' s e t a n d s t a r t t h e n e x t o n e
performCleanUp ( for : job , result : . succeeded )
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( )
}
}
@ -914,8 +952,7 @@ private final class JobQueue {
private func handleJobFailed ( _ job : Job , error : Error ? , permanentFailure : Bool ) {
private func handleJobFailed ( _ job : Job , error : Error ? , permanentFailure : Bool ) {
guard Storage . shared . read ( { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
guard Storage . shared . read ( { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job canceled " )
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job canceled " )
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
performCleanUp ( for : job , result : . failed )
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( )
@ -923,12 +960,30 @@ private final class JobQueue {
return
return
}
}
// I f t h i s i s t h e b l o c k i n g q u e u e a n d a " b l o c k i n g " j o b f a i l e d t h e n r e r u n i t i m m e d i a t e l y
// I f t h i s i s t h e b l o c k i n g q u e u e a n d a " b l o c k i n g " j o b f a i l e d t h e n r e r u n i t
// i m m e d i a t e l y ( i n t h i s c a s e w e d o n ' t t r i g g e r a n y j o b c a l l b a c k s b e c a u s e t h e
// j o b i s n ' t a c t u a l l y d o n e , i t ' s g o i n g t o t r y a g a i n i m m e d i a t e l y )
if self . type = = . blocking && job . shouldBlock {
if self . type = = . blocking && job . shouldBlock {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job failed; retrying immediately " )
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job failed; retrying immediately " )
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
// I f i t w a s a p o s s i b l e d e f e r r a l l o o p t h e n w e d o n ' t a c t u a l l y w a n t t o
queue . mutate { $0 . insert ( job , at : 0 ) }
// r e t r y t h e j o b ( e v e n i f i t ' s a b l o c k i n g o n e , t h i s g i v e s a s m a l l c h a n c e
// t h a t t h e a p p c o u l d c o n t i n u e t o f u n c t i o n )
let wasPossibleDeferralLoop : Bool = {
if let error = error , case JobRunnerError . possibleDeferralLoop = error { return true }
return false
} ( )
performCleanUp (
for : job ,
result : . failed ,
shouldTriggerCallbacks : wasPossibleDeferralLoop
)
// O n l y a d d i t b a c k t o t h e q u e u e i f i t w a s n ' t a d e f e r r a l l o o p
if ! wasPossibleDeferralLoop {
queue . mutate { $0 . insert ( job , at : 0 ) }
}
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( )
@ -1003,8 +1058,7 @@ private final class JobQueue {
}
}
}
}
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
performCleanUp ( for : job , result : . failed )
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( )
}
}
@ -1014,8 +1068,7 @@ private final class JobQueue {
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
private func handleJobDeferred ( _ job : Job ) {
private func handleJobDeferred ( _ job : Job ) {
var stuckInDeferLoop : Bool = false
var stuckInDeferLoop : Bool = false
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
deferLoopTracker . mutate {
deferLoopTracker . mutate {
guard let lastRecord : ( count : Int , times : [ TimeInterval ] ) = $0 [ job . id ] else {
guard let lastRecord : ( count : Int , times : [ TimeInterval ] ) = $0 [ job . id ] else {
$0 = $0 . setting (
$0 = $0 . setting (
@ -1055,8 +1108,29 @@ private final class JobQueue {
return
return
}
}
performCleanUp ( for : job , result : . deferred )
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( )
}
}
}
}
private func performCleanUp ( for job : Job , result : JobRunner . JobResult , shouldTriggerCallbacks : Bool = true ) {
// T h e j o b i s r e m o v e d f r o m t h e q u e u e b e f o r e i t r u n s s o a l l w e n e e d t o t o i s r e m o v e i t
// f r o m t h e ' c u r r e n t l y R u n n i n g ' s e t
jobsCurrentlyRunning . mutate { $0 = $0 . removing ( job . id ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
guard shouldTriggerCallbacks else { return }
// R u n a n y j o b c a l l b a c k s n o w t h a t i t ' s d o n e
var jobCallbacksToRun : [ ( JobRunner . JobResult ) -> ( ) ] = [ ]
jobCallbacks . mutate { jobCallbacks in
jobCallbacksToRun = ( jobCallbacks [ job . id ] ? ? [ ] )
jobCallbacks = jobCallbacks . removingValue ( forKey : job . id )
}
DispatchQueue . global ( qos : . default ) . async {
jobCallbacksToRun . forEach { $0 ( result ) }
}
}
}
}