Merge pull request #814 from mpretty-cyro/fix/job-runner-issues

JobRunner fixes
pull/815/head
Morgan Pretty 2 years ago committed by GitHub
commit e3f40642d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2016,7 +2016,8 @@ extension ConversationVC:
try MessageSender.send(
db,
message: DataExtractionNotification(
kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs))
kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)),
sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs())
),
interactionId: nil,
in: thread
@ -2270,7 +2271,8 @@ extension ConversationVC:
try MessageSender.send(
db,
message: DataExtractionNotification(
kind: .screenshot
kind: .screenshot,
sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs())
),
interactionId: nil,
in: thread

@ -540,7 +540,8 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou
message: DataExtractionNotification(
kind: .mediaSaved(
timestamp: UInt64(currentViewController.galleryItem.interactionTimestampMs)
)
),
sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs())
),
interactionId: nil, // Show no interaction for the current user
in: thread

@ -27,8 +27,13 @@ public final class DataExtractionNotification: ControlMessage {
// MARK: - Initialization
public init(kind: Kind) {
super.init()
public init(
kind: Kind,
sentTimestamp: UInt64? = nil
) {
super.init(
sentTimestamp: sentTimestamp
)
self.kind = kind
}

@ -3,7 +3,7 @@
import Foundation
import GRDB
public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible {
public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "job" }
internal static let dependencyForeignKey = ForeignKey([Columns.id], to: [JobDependencies.Columns.dependantId])
public static let dependantJobDependency = hasMany(

@ -3,7 +3,7 @@
import Foundation
import GRDB
public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible {
public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "jobDependencies" }
internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id])
internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id])

@ -45,6 +45,14 @@ public extension Array {
return updatedArray
}
func inserting(contentsOf other: [Element]?, at index: Int) -> [Element] {
guard let other: [Element] = other else { return self }
var updatedArray: [Element] = self
updatedArray.insert(contentsOf: other, at: 0)
return updatedArray
}
func grouped<Key: Hashable>(by keyForValue: (Element) throws -> Key) -> [Key: [Element]] {
return ((try? Dictionary(grouping: self, by: keyForValue)) ?? [:])
}

@ -627,9 +627,13 @@ private final class JobQueue {
}
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool {
guard let detailsData: Data = detailsData else { return false }
let pendingJobs: [Job] = queue.wrappedValue
return pendingJobs.contains { job in job.details == detailsData }
guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true }
return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData)
}
fileprivate func removePendingJob(_ jobId: Int64) {
@ -760,13 +764,15 @@ private final class JobQueue {
}
// Check if the next job has any dependencies
let dependencyInfo: (expectedCount: Int, jobs: [Job]) = Storage.shared.read { db in
let numExpectedDependencies: Int = try JobDependencies
let dependencyInfo: (expectedCount: Int, jobs: Set<Job>) = Storage.shared.read { db in
let expectedDependencies: Set<JobDependencies> = try JobDependencies
.filter(JobDependencies.Columns.jobId == nextJob.id)
.fetchCount(db)
let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db)
.fetchSet(db)
let jobDependencies: Set<Job> = try Job
.filter(ids: expectedDependencies.compactMap { $0.dependantId })
.fetchSet(db)
return (numExpectedDependencies, jobDependencies)
return (expectedDependencies.count, jobDependencies)
}
.defaulting(to: (0, []))
@ -778,39 +784,15 @@ private final class JobQueue {
guard dependencyInfo.jobs.isEmpty else {
SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first")
let jobDependencyIds: [Int64] = dependencyInfo.jobs
.compactMap { $0.id }
let jobIdsNotInQueue: Set<Int64> = jobDependencyIds
.asSet()
.subtracting(queue.wrappedValue.compactMap { $0.id })
// If there are dependencies which aren't in the queue we should just append them
guard !jobIdsNotInQueue.isEmpty else {
queue.mutate { queue in
queue.append(
contentsOf: dependencyInfo.jobs
.filter { jobIdsNotInQueue.contains($0.id ?? -1) }
)
queue.append(nextJob)
}
handleJobDeferred(nextJob)
return
}
// Otherwise re-add the current job after it's dependencies (if this isn't a concurrent
// queue - don't want to immediately try to start the job again only for it to end up back
// in here)
if executionType != .concurrent {
queue.mutate { queue in
guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else {
queue.append(nextJob)
return
}
queue.insert(nextJob, at: lastDependencyIndex + 1)
}
/// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue
///
/// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies
/// are successfully completed
queue.mutate { queue in
queue = queue
.filter { !dependencyInfo.jobs.contains($0) }
.inserting(contentsOf: Array(dependencyInfo.jobs), at: 0)
}
handleJobDeferred(nextJob)
return
}
@ -910,6 +892,12 @@ private final class JobQueue {
/// This function is called when a job succeeds
private func handleJobSucceeded(_ job: Job, shouldStop: Bool) {
/// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is
/// removed so we need to retrieve these records before that happens)
let dependantJobs: [Job] = Storage.shared
.read { db in try job.dependantJobs.fetchAll(db) }
.defaulting(to: [])
switch job.behaviour {
case .runOnce, .runOnceNextLaunch:
Storage.shared.write { db in
@ -972,26 +960,17 @@ private final class JobQueue {
default: break
}
// For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other
// dependencies they will be removed again when they try to execute)
if executionType == .concurrent {
let dependantJobs: [Job] = Storage.shared
.read { db in try job.dependantJobs.fetchAll(db) }
.defaulting(to: [])
let dependantJobIds: [Int64] = dependantJobs
.compactMap { $0.id }
let jobIdsNotInQueue: Set<Int64> = dependantJobIds
.asSet()
.subtracting(queue.wrappedValue.compactMap { $0.id })
// If there are dependant jobs which aren't in the queue we should just append them
if !jobIdsNotInQueue.isEmpty {
queue.mutate { queue in
queue.append(
contentsOf: dependantJobs
.filter { jobIdsNotInQueue.contains($0.id ?? -1) }
)
}
/// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the
/// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other
/// unrelated jobs)
///
/// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
/// removed from the queue, replaced by their dependencies
if !dependantJobs.isEmpty {
queue.mutate { queue in
queue = queue
.filter { !dependantJobs.contains($0) }
.inserting(contentsOf: dependantJobs, at: 0)
}
}
@ -1051,19 +1030,30 @@ private final class JobQueue {
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job))
Storage.shared.write { db in
/// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try
/// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely)
let dependantJobIds: [Int64] = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
/// Delete/update the failed jobs and any dependencies
let updatedFailureCount: UInt = (job.failureCount + 1)
guard
!permanentFailure && (
maxFailureCount < 0 ||
job.failureCount + 1 < maxFailureCount
updatedFailureCount <= maxFailureCount
)
else {
SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")")
let dependantJobIds: [Int64] = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
// If the job permanently failed or we have performed all of our retry attempts
// then delete the job and all of it's dependant jobs (it'll probably never succeed)
_ = try job.dependantJobs
@ -1071,13 +1061,6 @@ private final class JobQueue {
_ = try job.delete(db)
// Remove the dependant jobs from the queue (so we don't try to run a deleted job)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
performCleanUp(for: job, result: .failed)
return
}
@ -1086,7 +1069,7 @@ private final class JobQueue {
_ = try job
.with(
failureCount: (job.failureCount + 1),
failureCount: updatedFailureCount,
nextRunTimestamp: nextRunTimestamp
)
.saved(db)
@ -1097,22 +1080,9 @@ private final class JobQueue {
try job.dependantJobs
.updateAll(
db,
Job.Columns.failureCount.set(to: (job.failureCount + 1)),
Job.Columns.failureCount.set(to: updatedFailureCount),
Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000)))
)
let dependantJobIds: [Int64] = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
// Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying
// to run dependecies indefinitely)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
}
performCleanUp(for: job, result: .failed)

Loading…
Cancel
Save