Merge pull request #673 from mpretty-cyro/fix/various-bugs

Fixed a few bugs
pull/677/head
Morgan Pretty 2 years ago committed by GitHub
commit 64189b0bf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -150,7 +150,8 @@ extension ContextMenuVC {
)
let canDelete: Bool = (
cellViewModel.threadVariant != .openGroup ||
currentUserIsOpenGroupModerator
currentUserIsOpenGroupModerator ||
cellViewModel.state == .failed
)
let canBan: Bool = (
cellViewModel.threadVariant == .openGroup &&

@ -1134,7 +1134,58 @@ extension ConversationVC:
.filter(id: cellViewModel.id)
.asRequest(of: Int64.self)
.fetchOne(db)
else { return }
else {
// If the message hasn't been sent yet then just delete locally
guard cellViewModel.state == .sending || cellViewModel.state == .failed else { return }
// Retrieve any message send jobs for this interaction
let jobs: [Job] = Storage.shared
.read { db in
try? Job
.filter(Job.Columns.variant == Job.Variant.messageSend)
.filter(Job.Columns.interactionId == cellViewModel.id)
.fetchAll(db)
}
.defaulting(to: [])
// If the job is currently running then wait until it's done before triggering
// the deletion
let targetJob: Job? = jobs.first(where: { JobRunner.isCurrentlyRunning($0) })
guard targetJob == nil else {
JobRunner.afterCurrentlyRunningJob(targetJob) { [weak self] result in
switch result {
// If it succeeded then we'll need to delete from the server so re-run
// this function (if we still don't have the server id for some reason
// then this would result in a local-only deletion which should be fine
case .succeeded: self?.delete(cellViewModel)
// Otherwise we just need to cancel the pending job (in case it retries)
// and delete the interaction
default:
JobRunner.removePendingJob(targetJob)
Storage.shared.writeAsync { db in
_ = try Interaction
.filter(id: cellViewModel.id)
.deleteAll(db)
}
}
}
return
}
// If it's not currently running then remove any pending jobs (just to be safe) and
// delete the interaction locally
jobs.forEach { JobRunner.removePendingJob($0) }
Storage.shared.writeAsync { db in
_ = try Interaction
.filter(id: cellViewModel.id)
.deleteAll(db)
}
return
}
if remove {
OpenGroupAPI

@ -608,6 +608,7 @@ public final class OpenGroupManager: NSObject {
guard !messageServerIdsToRemove.isEmpty else { return }
_ = try? Interaction
.filter(Interaction.Columns.threadId == openGroup.threadId)
.filter(messageServerIdsToRemove.contains(Interaction.Columns.openGroupServerMessageId))
.deleteAll(db)
}

@ -637,6 +637,7 @@ public extension MessageViewModel {
let attachmentIdColumnLiteral: SQL = SQL(stringLiteral: Attachment.Columns.id.name)
let groupMemberModeratorTableLiteral: SQL = SQL(stringLiteral: "groupMemberModerator")
let groupMemberAdminTableLiteral: SQL = SQL(stringLiteral: "groupMemberAdmin")
let groupMemberGroupIdColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.groupId.name)
let groupMemberProfileIdColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.profileId.name)
let groupMemberRoleColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.role.name)
@ -715,11 +716,13 @@ public extension MessageViewModel {
)
LEFT JOIN \(GroupMember.self) AS \(groupMemberModeratorTableLiteral) ON (
\(SQL("\(thread[.variant]) = \(SessionThread.Variant.openGroup)")) AND
\(groupMemberModeratorTableLiteral).\(groupMemberGroupIdColumnLiteral) = \(interaction[.threadId]) AND
\(groupMemberModeratorTableLiteral).\(groupMemberProfileIdColumnLiteral) = \(interaction[.authorId]) AND
\(SQL("\(groupMemberModeratorTableLiteral).\(groupMemberRoleColumnLiteral) = \(GroupMember.Role.moderator)"))
)
LEFT JOIN \(GroupMember.self) AS \(groupMemberAdminTableLiteral) ON (
\(SQL("\(thread[.variant]) = \(SessionThread.Variant.openGroup)")) AND
\(groupMemberAdminTableLiteral).\(groupMemberGroupIdColumnLiteral) = \(interaction[.threadId]) AND
\(groupMemberAdminTableLiteral).\(groupMemberProfileIdColumnLiteral) = \(interaction[.authorId]) AND
\(SQL("\(groupMemberAdminTableLiteral).\(groupMemberRoleColumnLiteral) = \(GroupMember.Role.admin)"))
)

@ -110,9 +110,39 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
// changes only include table and column info at this stage
guard allObservedTableNames.contains(event.tableName) else { return }
// When generating the tracked change we need to check if the change was
// a deletion to a related table (if so then once the change is performed
// there won't be a way to associated the deleted related record to the
// original so we need to retrieve the association in here)
let trackedChange: PagedData.TrackedChange = {
guard
event.tableName != pagedTableName,
event.kind == .delete,
let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[event.tableName],
let joinToPagedType: SQL = observedChange.joinToPagedType
else { return PagedData.TrackedChange(event: event) }
// Retrieve the pagedRowId for the related value that is
// getting deleted
let pagedRowIds: [Int64] = Storage.shared
.read { db in
PagedData.pagedRowIdsForRelatedRowIds(
db,
tableName: event.tableName,
pagedTableName: pagedTableName,
relatedRowIds: [event.rowID],
joinToPagedType: joinToPagedType
)
}
.defaulting(to: [])
return PagedData.TrackedChange(event: event, pagedRowIdsForRelatedDeletion: pagedRowIds)
}()
// The 'event' object only exists during this method so we need to copy the info
// from it, otherwise it will cease to exist after this metod call finishes
changesInCommit.mutate { $0.insert(PagedData.TrackedChange(event: event)) }
changesInCommit.mutate { $0.insert(trackedChange) }
}
// Note: We will process all updates which come through this method even if
@ -180,13 +210,17 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
.filter { $0.tableName == pagedTableName }
let relatedChanges: [String: [PagedData.TrackedChange]] = committedChanges
.filter { $0.tableName != pagedTableName }
.filter { $0.kind != .delete }
.reduce(into: [:]) { result, next in
guard observedTableChangeTypes[next.tableName] != nil else { return }
result[next.tableName] = (result[next.tableName] ?? []).appending(next)
}
let relatedDeletions: [PagedData.TrackedChange] = committedChanges
.filter { $0.tableName != pagedTableName }
.filter { $0.kind == .delete }
guard !directChanges.isEmpty || !relatedChanges.isEmpty else {
guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(self.dataCache.wrappedValue, self.pageInfo.wrappedValue, false)
return
}
@ -219,7 +253,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
let changesToQuery: [PagedData.TrackedChange] = directChanges
.filter { $0.kind != .delete }
guard !changesToQuery.isEmpty || !relatedChanges.isEmpty else {
guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty)
return
}
@ -248,7 +282,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
.asSet()
}()
guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty else {
guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty)
return
}
@ -270,6 +304,16 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
orderSQL: orderSQL,
filterSQL: filterSQL
)
let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
rowIds: relatedDeletions
.compactMap { $0.pagedRowIdsForRelatedDeletion }
.flatMap { $0 },
tableName: pagedTableName,
requiredJoinSQL: joinSQL,
orderSQL: orderSQL,
filterSQL: filterSQL
)
// Determine if the indexes for the row ids should be displayed on the screen and remove any
// which shouldn't - values less than 'currentCount' or if there is at least one value less than
@ -306,6 +350,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
}
let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes)
let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes)
let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes)
let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count
// Update the offset and totalCount even if the rows are outside of the current page (need to
@ -325,13 +370,13 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
// If there are no valid row ids then stop here (trigger updates though since the page info
// has changes)
guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty else {
guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true)
return
}
// Fetch the inserted/updated rows
let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds).asSet())
let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet())
let updatedItems: [T] = (try? dataQuery(targetRowIds)
.fetchAll(db))
.defaulting(to: [])
@ -904,11 +949,13 @@ public enum PagedData {
let tableName: String
let kind: DatabaseEvent.Kind
let rowId: Int64
let pagedRowIdsForRelatedDeletion: [Int64]?
init(event: DatabaseEvent) {
init(event: DatabaseEvent, pagedRowIdsForRelatedDeletion: [Int64]? = nil) {
self.tableName = event.tableName
self.kind = event.kind
self.rowId = event.rowID
self.pagedRowIdsForRelatedDeletion = pagedRowIdsForRelatedDeletion
}
}

@ -36,6 +36,13 @@ public protocol JobExecutor {
}
public final class JobRunner {
public enum JobResult {
case succeeded
case failed
case deferred
case notFound
}
private static let blockingQueue: Atomic<JobQueue?> = Atomic(
JobQueue(
type: .blocking,
@ -332,6 +339,15 @@ public final class JobRunner {
.defaulting(to: [:])
}
public static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) {
guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else {
callback(.notFound)
return
}
queue.afterCurrentlyRunningJob(jobId, callback: callback)
}
public static func hasPendingOrRunningJob<T: Encodable>(with variant: Job.Variant, details: T) -> Bool {
guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false }
guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false }
@ -339,6 +355,12 @@ public final class JobRunner {
return targetQueue.hasPendingOrRunningJob(with: detailsData)
}
public static func removePendingJob(_ job: Job?) {
guard let job: Job = job, let jobId: Int64 = job.id else { return }
queues.wrappedValue[job.variant]?.removePendingJob(jobId)
}
// MARK: - Convenience
fileprivate static func getRetryInterval(for job: Job) -> TimeInterval {
@ -445,6 +467,7 @@ private final class JobQueue {
fileprivate var isRunning: Atomic<Bool> = Atomic(false)
private var queue: Atomic<[Job]> = Atomic([])
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
@ -560,12 +583,29 @@ private final class JobQueue {
return detailsForCurrentlyRunningJobs.wrappedValue
}
fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) {
guard isCurrentlyRunning(jobId) else {
callback(.notFound)
return
}
jobCallbacks.mutate { jobCallbacks in
jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback)
}
}
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool {
let pendingJobs: [Job] = queue.wrappedValue
return pendingJobs.contains { job in job.details == detailsData }
}
fileprivate func removePendingJob(_ jobId: Int64) {
queue.mutate { queue in
queue = queue.filter { $0.id != jobId }
}
}
// MARK: - Job Running
fileprivate func start(force: Bool = false) {
@ -900,10 +940,8 @@ private final class JobQueue {
}
}
// The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set and start the next one
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
// Perform job cleanup and start the next job
performCleanUp(for: job, result: .succeeded)
internalQueue.async { [weak self] in
self?.runNextJob()
}
@ -914,8 +952,7 @@ private final class JobQueue {
private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) {
guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else {
SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled")
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
performCleanUp(for: job, result: .failed)
internalQueue.async { [weak self] in
self?.runNextJob()
@ -923,12 +960,30 @@ private final class JobQueue {
return
}
// If this is the blocking queue and a "blocking" job failed then rerun it immediately
// If this is the blocking queue and a "blocking" job failed then rerun it
// immediately (in this case we don't trigger any job callbacks because the
// job isn't actually done, it's going to try again immediately)
if self.type == .blocking && job.shouldBlock {
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately")
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
queue.mutate { $0.insert(job, at: 0) }
// If it was a possible deferral loop then we don't actually want to
// retry the job (even if it's a blocking one, this gives a small chance
// that the app could continue to function)
let wasPossibleDeferralLoop: Bool = {
if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true }
return false
}()
performCleanUp(
for: job,
result: .failed,
shouldTriggerCallbacks: wasPossibleDeferralLoop
)
// Only add it back to the queue if it wasn't a deferral loop
if !wasPossibleDeferralLoop {
queue.mutate { $0.insert(job, at: 0) }
}
internalQueue.async { [weak self] in
self?.runNextJob()
@ -1003,8 +1058,7 @@ private final class JobQueue {
}
}
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
performCleanUp(for: job, result: .failed)
internalQueue.async { [weak self] in
self?.runNextJob()
}
@ -1014,8 +1068,7 @@ private final class JobQueue {
/// on other jobs, and it should automatically manage those dependencies)
private func handleJobDeferred(_ job: Job) {
var stuckInDeferLoop: Bool = false
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
deferLoopTracker.mutate {
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
$0 = $0.setting(
@ -1055,8 +1108,29 @@ private final class JobQueue {
return
}
performCleanUp(for: job, result: .deferred)
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) {
// The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
guard shouldTriggerCallbacks else { return }
// Run any job callbacks now that it's done
var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = []
jobCallbacks.mutate { jobCallbacks in
jobCallbacksToRun = (jobCallbacks[job.id] ?? [])
jobCallbacks = jobCallbacks.removingValue(forKey: job.id)
}
DispatchQueue.global(qos: .default).async {
jobCallbacksToRun.forEach { $0(result) }
}
}
}

Loading…
Cancel
Save