Reduced unneeded DB write operations and fixed a few minor UI bugs

Updated the database to better support the application getting suspended (0xdead10cc crash)
Updated the SOGS message handling to delete messages based on a new 'deleted' flag instead of 'data' being null
Updated the code to prevent the typing indicator from needing a DB write block as frequently
Updated the code to stop any pending jobs when entering the background (in an attempt to prevent the database suspension from causing issues)
Removed the duplicate 'Capabilities.Capability' type (updated 'Capability.Variant' to work in the same way)
Fixed a bug where a number of icons (inc. the "download document" icon) were the wrong colour in dark mode
Fixed a bug where the '@You' highlight could incorrectly have it's width reduced in some cases (had protection to prevent it being larger than the line, but that is a valid case)
Fixed a bug where the JobRunner was starting the background (which could lead to trying to access the database once it had been suspended)
Updated to the latest version of GRDB
Added some logic to the BackgroundPoller process to try and stop processing if the timeout is triggered (will catch some cases but others will end up logging a bunch of "Database is suspended" errors)
Added in some protection to prevent future deferral loops in the JobRunner
pull/612/head
Morgan Pretty 2 years ago
parent ecbded3819
commit 1224e539ea

@ -27,7 +27,7 @@ PODS:
- DifferenceKit/Core (1.2.0)
- DifferenceKit/UIKitExtension (1.2.0):
- DifferenceKit/Core
- GRDB.swift/SQLCipher (5.24.1):
- GRDB.swift/SQLCipher (5.26.0):
- SQLCipher (>= 3.4.0)
- libwebp (1.2.1):
- libwebp/demux (= 1.2.1)
@ -222,7 +222,7 @@ SPEC CHECKSUMS:
CryptoSwift: a532e74ed010f8c95f611d00b8bbae42e9fe7c17
Curve25519Kit: e63f9859ede02438ae3defc5e1a87e09d1ec7ee6
DifferenceKit: 5659c430bb7fe45876fa32ce5cba5d6167f0c805
GRDB.swift: b3180ce2135fc06a453297889b746b1478c4d8c7
GRDB.swift: 1395cb3556df6b16ed69dfc74c3886abc75d2825
libwebp: 98a37e597e40bfdb4c911fc98f2c53d0b12d05fc
Nimble: 5316ef81a170ce87baf72dd961f22f89a602ff84
NVActivityIndicatorView: 1f6c5687f1171810aa27a3296814dc2d7dec3667

@ -520,16 +520,18 @@ extension ConversationVC:
let threadId: String = self.viewModel.threadData.threadId
let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant
let threadIsMessageRequest: Bool = (self.viewModel.threadData.threadIsMessageRequest == true)
let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: .outgoing,
timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000))
)
Storage.shared.writeAsync { db in
TypingIndicators.didStartTyping(
db,
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: .outgoing,
timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000))
)
if needsToStartTypingIndicator {
Storage.shared.writeAsync { db in
TypingIndicators.start(db, threadId: threadId, direction: .outgoing)
}
}
}

@ -418,15 +418,34 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
// MARK: - Functions
public func updateDraft(to draft: String) {
let threadId: String = self.threadId
let currentDraft: String = Storage.shared
.read { db in
try SessionThread
.select(.messageDraft)
.filter(id: threadId)
.asRequest(of: String.self)
.fetchOne(db)
}
.defaulting(to: "")
// Only write the updated draft to the database if it's changed (avoid unnecessary writes)
guard draft != currentDraft else { return }
Storage.shared.writeAsync { db in
try SessionThread
.filter(id: self.threadId)
.filter(id: threadId)
.updateAll(db, SessionThread.Columns.messageDraft.set(to: draft))
}
}
public func markAllAsRead() {
guard let lastInteractionId: Int64 = self.threadData.interactionId else { return }
// Don't bother marking anything as read if there are no unread interactions (we can rely
// on the 'threadData.threadUnreadCount' to always be accurate)
guard
(self.threadData.threadUnreadCount ?? 0) > 0,
let lastInteractionId: Int64 = self.threadData.interactionId
else { return }
let threadId: String = self.threadData.threadId
let trySendReadReceipt: Bool = (self.threadData.threadIsMessageRequest == false)

@ -59,8 +59,8 @@ final class InputViewButton : UIView {
isUserInteractionEnabled = true
widthConstraint.isActive = true
heightConstraint.isActive = true
let tint = isSendButton ? UIColor.black : Colors.text
let iconImageView = UIImageView(image: icon.withTint(tint))
let iconImageView = UIImageView(image: icon.withRenderingMode(.alwaysTemplate))
iconImageView.tintColor = (isSendButton ? UIColor.black : Colors.text)
iconImageView.contentMode = .scaleAspectFit
let iconSize = InputViewButton.iconSize
iconImageView.set(.width, to: iconSize)

@ -28,8 +28,8 @@ final class CallMessageView: UIView {
// Image view
let imageView: UIImageView = UIImageView(
image: UIImage(named: "Phone")?
.resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize))?
.withRenderingMode(.alwaysTemplate)
.resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize))
)
imageView.tintColor = textColor
imageView.contentMode = .center

@ -27,11 +27,11 @@ final class DeletedMessageView: UIView {
private func setUpViewHierarchy(textColor: UIColor) {
// Image view
let icon = UIImage(named: "ic_trash")?
.withRenderingMode(.alwaysTemplate)
.resizedImage(to: CGSize(
width: DeletedMessageView.iconSize,
height: DeletedMessageView.iconSize
))
))?
.withRenderingMode(.alwaysTemplate)
let imageView = UIImageView(image: icon)
imageView.tintColor = textColor

@ -44,13 +44,13 @@ final class MediaPlaceholderView: UIView {
// Image view
let imageView = UIImageView(
image: UIImage(named: iconName)?
.withRenderingMode(.alwaysTemplate)
.resizedImage(
to: CGSize(
width: MediaPlaceholderView.iconSize,
height: MediaPlaceholderView.iconSize
)
)
)?
.withRenderingMode(.alwaysTemplate)
)
imageView.tintColor = textColor
imageView.contentMode = .center

@ -68,8 +68,8 @@ final class OpenGroupInvitationView: UIView {
let iconImageViewSize = OpenGroupInvitationView.iconImageViewSize
let iconImageView = UIImageView(
image: UIImage(named: iconName)?
.resizedImage(to: CGSize(width: iconSize, height: iconSize))?
.withRenderingMode(.alwaysTemplate)
.resizedImage(to: CGSize(width: iconSize, height: iconSize))
)
iconImageView.tintColor = .white
iconImageView.contentMode = .center

@ -122,6 +122,9 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
/// `appDidFinishLaunching` seems to fix this odd behaviour (even though it doesn't match
/// Apple's documentation on the matter)
UNUserNotificationCenter.current().delegate = self
// Resume database
NotificationCenter.default.post(name: Database.resumeNotification, object: self)
}
func applicationDidEnterBackground(_ application: UIApplication) {
@ -130,6 +133,10 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
// NOTE: Fix an edge case where user taps on the callkit notification
// but answers the call on another device
stopPollers(shouldStopUserPoller: !self.hasIncomingCallWaiting())
JobRunner.stopAndClearPendingJobs()
// Suspend database
NotificationCenter.default.post(name: Database.suspendNotification, object: self)
}
func applicationDidReceiveMemoryWarning(_ application: UIApplication) {
@ -185,8 +192,16 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
// MARK: - Background Fetching
func application(_ application: UIApplication, performFetchWithCompletionHandler completionHandler: @escaping (UIBackgroundFetchResult) -> Void) {
// Resume database
NotificationCenter.default.post(name: Database.resumeNotification, object: self)
AppReadiness.runNowOrWhenAppDidBecomeReady {
BackgroundPoller.poll(completionHandler: completionHandler)
BackgroundPoller.poll { result in
// Suspend database
NotificationCenter.default.post(name: Database.suspendNotification, object: self)
completionHandler(result)
}
}
}

@ -137,9 +137,6 @@ class HighlightMentionBackgroundView: UIView {
extraYOffset
)
// We don't want to draw too far to the right
runBounds.size.width = (runBounds.width > lineWidth ? lineWidth : runBounds.width)
let path = UIBezierPath(roundedRect: runBounds, cornerRadius: cornerRadius)
mentionBackgroundColor.setFill()
path.fill()

@ -9,8 +9,11 @@ import SessionUtilitiesKit
public final class BackgroundPoller {
private static var promises: [Promise<Void>] = []
private static var isValid: Bool = false
public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) {
BackgroundPoller.isValid = true
promises = []
.appending(pollForMessages())
.appending(contentsOf: pollForClosedGroupMessages())
@ -32,7 +35,11 @@ public final class BackgroundPoller {
let poller: OpenGroupAPI.Poller = OpenGroupAPI.Poller(for: server)
poller.stop()
return poller.poll(isBackgroundPoll: true, isPostCapabilitiesRetry: false)
return poller.poll(
isBackgroundPoll: true,
isBackgroundPollerValid: { BackgroundPoller.isValid },
isPostCapabilitiesRetry: false
)
}
)
@ -41,6 +48,7 @@ public final class BackgroundPoller {
// after 25 seconds allowing us to cancel all pending promises
let cancelTimer: Timer = Timer.scheduledTimerOnMainThread(withTimeInterval: 25, repeats: false) { timer in
timer.invalidate()
BackgroundPoller.isValid = false
guard promises.contains(where: { !$0.isResolved }) else { return }
@ -50,6 +58,9 @@ public final class BackgroundPoller {
when(resolved: promises)
.done { _ in
// If we have already invalidated the timer then do nothing (we essentially timed out)
guard cancelTimer.isValid else { return }
cancelTimer.invalidate()
completionHandler(.newData)
}
@ -88,7 +99,8 @@ public final class BackgroundPoller {
groupPublicKey,
on: DispatchQueue.main,
maxRetryCount: 0,
isBackgroundPoll: true
isBackgroundPoll: true,
isBackgroundPollValid: { BackgroundPoller.isValid }
)
}
}
@ -100,44 +112,45 @@ public final class BackgroundPoller {
return SnodeAPI.getMessages(from: snode, associatedWith: publicKey)
.then(on: DispatchQueue.main) { messages -> Promise<Void> in
guard !messages.isEmpty else { return Promise.value(()) }
guard !messages.isEmpty, BackgroundPoller.isValid else { return Promise.value(()) }
var jobsToRun: [Job] = []
Storage.shared.write { db in
var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:]
messages.forEach { message in
do {
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message)
let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId)
threadMessages[key] = (threadMessages[key] ?? [])
.appending(processedMessage?.messageInfo)
}
catch {
switch error {
// Ignore duplicate & selfSend message errors (and don't bother logging
// them as there will be a lot since we each service node duplicates messages)
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
MessageReceiverError.duplicateMessage,
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
messages
.compactMap { message -> ProcessedMessage? in
do {
return try Message.processRawReceivedMessage(db, rawMessage: message)
}
catch {
switch error {
// Ignore duplicate & selfSend message errors (and don't bother
// logging them as there will be a lot since we each service node
// duplicates messages)
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
MessageReceiverError.duplicateMessage,
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
// In the background ignore 'SQLITE_ABORT' (it generally means
// the BackgroundPoller has timed out
case DatabaseError.SQLITE_ABORT: break
default: SNLog("Failed to deserialize envelope due to error: \(error).")
}
default: SNLog("Failed to deserialize envelope due to error: \(error).")
return nil
}
}
}
threadMessages
.grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) }
.forEach { threadId, threadMessages in
let maybeJob: Job? = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: threadId,
details: MessageReceiveJob.Details(
messages: threadMessages,
messages: threadMessages.map { $0.messageInfo },
isBackgroundPoll: true
)
)

@ -59,3 +59,37 @@ public struct Capability: Codable, FetchableRecord, PersistableRecord, TableReco
self.isMissing = isMissing
}
}
extension Capability.Variant {
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: SingleValueDecodingContainer = try decoder.singleValueContainer()
let valueString: String = try container.decode(String.self)
// FIXME: Remove this code
// There was a point where we didn't have custom Codable handling for the Capability.Variant
// which resulted in the data being encoded into the database as a JSON dict - this code catches
// that case and extracts the standard string value so it can be processed the same as the
// "proper" custom Codable logic)
if valueString.starts(with: "{") {
self = Capability.Variant(
from: valueString
.replacingOccurrences(of: "\":{}}", with: "")
.replacingOccurrences(of: "\"}}", with: "")
.replacingOccurrences(of: "{\"unsupported\":{\"_0\":\"", with: "")
.replacingOccurrences(of: "{\"", with: "")
)
return
}
// FIXME: Remove this code ^^^
self = Capability.Variant(from: valueString)
}
public func encode(to encoder: Encoder) throws {
var container: SingleValueEncodingContainer = encoder.singleValueContainer()
try container.encode(rawValue)
}
}

@ -4,60 +4,14 @@ import Foundation
extension OpenGroupAPI {
public struct Capabilities: Codable, Equatable {
public enum Capability: Equatable, CaseIterable, Codable {
public static var allCases: [Capability] {
[.sogs, .blind]
}
case sogs
case blind
/// Fallback case if the capability isn't supported by this version of the app
case unsupported(String)
// MARK: - Convenience
public var rawValue: String {
switch self {
case .unsupported(let originalValue): return originalValue
default: return "\(self)"
}
}
// MARK: - Initialization
public init(from valueString: String) {
let maybeValue: Capability? = Capability.allCases.first { $0.rawValue == valueString }
self = (maybeValue ?? .unsupported(valueString))
}
}
public let capabilities: [Capability]
public let missing: [Capability]?
public let capabilities: [Capability.Variant]
public let missing: [Capability.Variant]?
// MARK: - Initialization
public init(capabilities: [Capability], missing: [Capability]? = nil) {
public init(capabilities: [Capability.Variant], missing: [Capability.Variant]? = nil) {
self.capabilities = capabilities
self.missing = missing
}
}
}
extension OpenGroupAPI.Capabilities.Capability {
// MARK: - Codable
public init(from decoder: Decoder) throws {
let container: SingleValueDecodingContainer = try decoder.singleValueContainer()
let valueString: String = try container.decode(String.self)
self = OpenGroupAPI.Capabilities.Capability(from: valueString)
}
public func encode(to encoder: Encoder) throws {
var container: SingleValueEncodingContainer = encoder.singleValueContainer()
try container.encode(rawValue)
}
}

@ -10,6 +10,7 @@ extension OpenGroupAPI {
case sender = "session_id"
case posted
case edited
case deleted
case seqNo = "seqno"
case whisper
case whisperMods = "whisper_mods"
@ -23,6 +24,7 @@ extension OpenGroupAPI {
public let sender: String?
public let posted: TimeInterval
public let edited: TimeInterval?
public let deleted: Bool?
public let seqNo: Int64
public let whisper: Bool
public let whisperMods: Bool
@ -79,6 +81,7 @@ extension OpenGroupAPI.Message {
sender: try? container.decode(String.self, forKey: .sender),
posted: try container.decode(TimeInterval.self, forKey: .posted),
edited: try? container.decode(TimeInterval.self, forKey: .edited),
deleted: try? container.decode(Bool.self, forKey: .deleted),
seqNo: try container.decode(Int64.self, forKey: .seqNo),
whisper: ((try? container.decode(Bool.self, forKey: .whisper)) ?? false),
whisperMods: ((try? container.decode(Bool.self, forKey: .whisperMods)) ?? false),

@ -348,7 +348,7 @@ public final class OpenGroupManager: NSObject {
capabilities.capabilities.forEach { capability in
_ = try? Capability(
openGroupServer: server.lowercased(),
variant: Capability.Variant(from: capability.rawValue),
variant: capability,
isMissing: false
)
.saved(db)
@ -356,7 +356,7 @@ public final class OpenGroupManager: NSObject {
capabilities.missing?.forEach { capability in
_ = try? Capability(
openGroupServer: server.lowercased(),
variant: Capability.Variant(from: capability.rawValue),
variant: capability,
isMissing: true
)
.saved(db)
@ -499,9 +499,12 @@ public final class OpenGroupManager: NSObject {
}
let sortedMessages: [OpenGroupAPI.Message] = messages
.filter { $0.deleted != true }
.sorted { lhs, rhs in lhs.id < rhs.id }
let messageServerIdsToRemove: [Int64] = messages
.filter { $0.deleted == true }
.map { $0.id }
let seqNo: Int64? = sortedMessages.map { $0.seqNo }.max()
var messageServerIdsToRemove: [UInt64] = []
// Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId')
if let seqNo: Int64 = seqNo {
@ -515,11 +518,7 @@ public final class OpenGroupManager: NSObject {
guard
let base64EncodedString: String = message.base64EncodedData,
let data = Data(base64Encoded: base64EncodedString)
else {
// A message with no data has been deleted so add it to the list to remove
messageServerIdsToRemove.append(UInt64(message.id))
return
}
else { return }
do {
let processedMessage: ProcessedMessage? = try Message.processReceivedOpenGroupMessage(

@ -13,8 +13,7 @@ extension MessageReceiver {
switch message.kind {
case .started:
TypingIndicators.didStartTyping(
db,
let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart(
threadId: thread.id,
threadVariant: thread.variant,
threadIsMessageRequest: thread.isMessageRequest(db),
@ -22,6 +21,10 @@ extension MessageReceiver {
timestampMs: message.sentTimestamp.map { Int64($0) }
)
if needsToStartTypingIndicator {
TypingIndicators.start(db, threadId: thread.id, direction: .incoming)
}
case .stopped:
TypingIndicators.didStopTyping(db, threadId: thread.id, direction: .incoming)

@ -291,7 +291,7 @@ public final class MessageSender {
errorCount += 1
guard errorCount == promiseCount else { return } // Only error out if all promises failed
Storage.shared.write { db in
Storage.shared.read { db in
handleFailure(db, with: .other(error))
}
}
@ -300,7 +300,7 @@ public final class MessageSender {
.catch(on: DispatchQueue.global(qos: .default)) { error in
SNLog("Couldn't send message due to error: \(error).")
Storage.shared.write { db in
Storage.shared.read { db in
handleFailure(db, with: .other(error))
}
}
@ -447,7 +447,7 @@ public final class MessageSender {
}
}
.catch(on: DispatchQueue.global(qos: .default)) { error in
dependencies.storage.write { db in
dependencies.storage.read { db in
handleFailure(db, with: .other(error))
}
}
@ -557,7 +557,7 @@ public final class MessageSender {
}
}
.catch(on: DispatchQueue.global(qos: .default)) { error in
dependencies.storage.write { db in
dependencies.storage.read { db in
handleFailure(db, with: .other(error))
}
}
@ -652,15 +652,34 @@ public final class MessageSender {
with error: MessageSenderError,
interactionId: Int64?
) {
// Mark any "sending" recipients as "failed"
_ = try? RecipientState
// Check if we need to mark any "sending" recipients as "failed"
//
// Note: The 'db' could be either read-only or writeable so we determine
// if a change is required, and if so dispatch to a separate queue for the
// actual write
let rowIds: [Int64] = (try? RecipientState
.select(Column.rowID)
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state == RecipientState.State.sending)
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.failed),
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription)
)
.asRequest(of: Int64.self)
.fetchAll(db))
.defaulting(to: [])
guard !rowIds.isEmpty else { return }
// Need to dispatch to a different thread to prevent a potential db re-entrancy
// issue from occuring in some cases
DispatchQueue.global(qos: .background).async {
Storage.shared.write { db in
try RecipientState
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.failed),
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription)
)
}
}
}
// MARK: - Convenience

@ -152,6 +152,7 @@ public final class ClosedGroupPoller {
on queue: DispatchQueue = SessionSnodeKit.Threading.workQueue,
maxRetryCount: UInt = 0,
isBackgroundPoll: Bool = false,
isBackgroundPollValid: @escaping (() -> Bool) = { true },
poller: ClosedGroupPoller? = nil
) -> Promise<Void> {
let promise: Promise<Void> = SnodeAPI.getSwarm(for: groupPublicKey)
@ -160,9 +161,10 @@ public final class ClosedGroupPoller {
guard let snode = swarm.randomElement() else { return Promise(error: Error.insufficientSnodes) }
return attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) {
guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else {
return Promise(error: Error.pollingCanceled)
}
guard
(isBackgroundPoll && isBackgroundPollValid()) ||
poller?.isPolling.wrappedValue[groupPublicKey] == true
else { return Promise(error: Error.pollingCanceled) }
let promises: [Promise<[SnodeReceivedMessage]>] = {
if SnodeAPI.hardfork >= 19 && SnodeAPI.softfork >= 1 {
@ -181,9 +183,13 @@ public final class ClosedGroupPoller {
return when(resolved: promises)
.then(on: queue) { messageResults -> Promise<Void> in
guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) }
guard
(isBackgroundPoll && isBackgroundPollValid()) ||
poller?.isPolling.wrappedValue[groupPublicKey] == true
else { return Promise.value(()) }
var promises: [Promise<Void>] = []
var jobToRun: Job? = nil
let allMessages: [SnodeReceivedMessage] = messageResults
.reduce([]) { result, next in
switch next {
@ -192,8 +198,16 @@ public final class ClosedGroupPoller {
}
}
var messageCount: Int = 0
let totalMessagesCount: Int = allMessages.count
// No need to do anything if there are no messages
guard !allMessages.isEmpty else {
if !isBackgroundPoll {
SNLog("Received no new messages in closed group with public key: \(groupPublicKey)")
}
return Promise.value(())
}
// Otherwise process the messages and add them to the queue for handling
Storage.shared.write { db in
let processedMessages: [ProcessedMessage] = allMessages
.compactMap { message -> ProcessedMessage? in
@ -209,6 +223,14 @@ public final class ClosedGroupPoller {
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
// In the background ignore 'SQLITE_ABORT' (it generally means
// the BackgroundPoller has timed out
case DatabaseError.SQLITE_ABORT:
guard !isBackgroundPoll else { break }
SNLog("Failed to the database being suspended (running in background with no background task).")
break
default: SNLog("Failed to deserialize envelope due to error: \(error).")
}
@ -219,7 +241,7 @@ public final class ClosedGroupPoller {
messageCount = processedMessages.count
let jobToRun: Job? = Job(
jobToRun = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: groupPublicKey,
@ -232,35 +254,29 @@ public final class ClosedGroupPoller {
// If we are force-polling then add to the JobRunner so they are persistent and will retry on
// the next app run if they fail but don't let them auto-start
JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll)
}
if isBackgroundPoll {
// We want to try to handle the receive jobs immediately in the background
if isBackgroundPoll {
promises = promises.appending(
jobToRun.map { job -> Promise<Void> in
let (promise, seal) = Promise<Void>.pending()
// Note: In the background we just want jobs to fail silently
MessageReceiveJob.run(
job,
queue: queue,
success: { _, _ in seal.fulfill(()) },
failure: { _, _, _ in seal.fulfill(()) },
deferred: { _ in seal.fulfill(()) }
)
promises = promises.appending(
jobToRun.map { job -> Promise<Void> in
let (promise, seal) = Promise<Void>.pending()
// Note: In the background we just want jobs to fail silently
MessageReceiveJob.run(
job,
queue: queue,
success: { _, _ in seal.fulfill(()) },
failure: { _, _, _ in seal.fulfill(()) },
deferred: { _ in seal.fulfill(()) }
)
return promise
}
)
}
return promise
}
)
}
if !isBackgroundPoll {
if totalMessagesCount > 0 {
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(totalMessagesCount - messageCount))")
}
else {
SNLog("Received no new messages in closed group with public key: \(groupPublicKey)")
}
else {
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(allMessages.count - messageCount))")
}
return when(fulfilled: promises)

@ -8,6 +8,8 @@ import SessionUtilitiesKit
extension OpenGroupAPI {
public final class Poller {
typealias PollResponse = [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)]
private let server: String
private var timer: Timer? = nil
private var hasStarted = false
@ -71,6 +73,7 @@ extension OpenGroupAPI {
@discardableResult
public func poll(
isBackgroundPoll: Bool,
isBackgroundPollerValid: @escaping (() -> Bool) = { true },
isPostCapabilitiesRetry: Bool,
using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()
) -> Promise<Void> {
@ -83,8 +86,14 @@ extension OpenGroupAPI {
Threading.pollerQueue.async {
dependencies.storage
.read { db in
OpenGroupAPI
.read { db -> Promise<(Int64, PollResponse)> in
let failureCount: Int64 = (try? OpenGroup
.select(max(OpenGroup.Columns.pollFailureCount))
.asRequest(of: Int64.self)
.fetchOne(db))
.defaulting(to: 0)
return OpenGroupAPI
.poll(
db,
server: server,
@ -95,10 +104,24 @@ extension OpenGroupAPI {
),
using: dependencies
)
.map(on: OpenGroupAPI.workQueue) { (failureCount, $0) }
}
.done(on: OpenGroupAPI.workQueue) { [weak self] response in
.done(on: OpenGroupAPI.workQueue) { [weak self] failureCount, response in
guard !isBackgroundPoll || isBackgroundPollerValid() else {
// If this was a background poll and the background poll is no longer valid
// then just stop
self?.isPolling = false
seal.fulfill(())
return
}
self?.isPolling = false
self?.handlePollResponse(response, isBackgroundPoll: isBackgroundPoll, using: dependencies)
self?.handlePollResponse(
response,
failureCount: failureCount,
isBackgroundPoll: isBackgroundPoll,
using: dependencies
)
dependencies.mutableCache.mutate { cache in
cache.hasPerformedInitialPoll[server] = true
@ -106,17 +129,18 @@ extension OpenGroupAPI {
UserDefaults.standard[.lastOpen] = Date()
}
// Reset the failure count
Storage.shared.writeAsync { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0))
}
SNLog("Open group polling finished for \(server).")
seal.fulfill(())
}
.catch(on: OpenGroupAPI.workQueue) { [weak self] error in
guard !isBackgroundPoll || isBackgroundPollerValid() else {
// If this was a background poll and the background poll is no longer valid
// then just stop
self?.isPolling = false
seal.fulfill(())
return
}
// If we are retrying then the error is being handled so no need to continue (this
// method will always resolve)
self?.updateCapabilitiesAndRetryIfNeeded(
@ -141,7 +165,10 @@ extension OpenGroupAPI {
Storage.shared.writeAsync { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1)))
.updateAll(
db,
OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1))
)
}
SNLog("Open group polling failed due to error: \(error). Setting failure count to \(pollFailureCount).")
@ -221,50 +248,47 @@ extension OpenGroupAPI {
return promise
}
private func handlePollResponse(_ response: [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)], isBackgroundPoll: Bool, using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) {
private func handlePollResponse(
_ response: PollResponse,
failureCount: Int64,
isBackgroundPoll: Bool,
using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()
) {
let server: String = self.server
dependencies.storage.write { db in
try response.forEach { endpoint, endpointResponse in
let validResponses: PollResponse = response
.filter { endpoint, endpointResponse in
switch endpoint {
case .capabilities:
guard let responseData: BatchSubResponse<Capabilities> = endpointResponse.data as? BatchSubResponse<Capabilities>, let responseBody: Capabilities = responseData.body else {
guard (endpointResponse.data as? BatchSubResponse<Capabilities>)?.body != nil else {
SNLog("Open group polling failed due to invalid capability data.")
return
return false
}
OpenGroupManager.handleCapabilities(
db,
capabilities: responseBody,
on: server
)
return true
case .roomPollInfo(let roomToken, _):
guard let responseData: BatchSubResponse<RoomPollInfo> = endpointResponse.data as? BatchSubResponse<RoomPollInfo>, let responseBody: RoomPollInfo = responseData.body else {
guard (endpointResponse.data as? BatchSubResponse<RoomPollInfo>)?.body != nil else {
switch (endpointResponse.data as? BatchSubResponse<RoomPollInfo>)?.code {
case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.")
default: SNLog("Open group polling failed due to invalid room info data.")
}
return
return false
}
try OpenGroupManager.handlePollInfo(
db,
pollInfo: responseBody,
publicKey: nil,
for: roomToken,
on: server,
dependencies: dependencies
)
return true
case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _):
guard let responseData: BatchSubResponse<[Failable<Message>]> = endpointResponse.data as? BatchSubResponse<[Failable<Message>]>, let responseBody: [Failable<Message>] = responseData.body else {
guard
let responseData: BatchSubResponse<[Failable<Message>]> = endpointResponse.data as? BatchSubResponse<[Failable<Message>]>,
let responseBody: [Failable<Message>] = responseData.body
else {
switch (endpointResponse.data as? BatchSubResponse<[Failable<Message>]>)?.code {
case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.")
default: SNLog("Open group polling failed due to invalid messages data.")
}
return
return false
}
let successfulMessages: [Message] = responseBody.compactMap { $0.value }
if successfulMessages.count != responseBody.count {
@ -273,9 +297,147 @@ extension OpenGroupAPI {
SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").")
}
return !successfulMessages.isEmpty
case .inbox, .inboxSince, .outbox, .outboxSince:
guard
let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>,
!responseData.failedToParseBody
else {
SNLog("Open group polling failed due to invalid inbox/outbox data.")
return false
}
// Double optional because the server can return a `304` with an empty body
let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? [])
return !messages.isEmpty
default: return false // No custom handling needed
}
}
// If there are no remaining 'validResponses' and there hasn't been a failure then there is
// no need to do anything else
guard !validResponses.isEmpty || failureCount != 0 else { return }
// Retrieve the current capability & group info to check if anything changed
let rooms: [String] = validResponses
.keys
.compactMap { endpoint -> String? in
switch endpoint {
case .roomPollInfo(let roomToken, _): return roomToken
default: return nil
}
}
let currentInfo: (capabilities: Capabilities, groups: [OpenGroup])? = dependencies.storage.read { db in
let allCapabilities: [Capability] = try Capability
.filter(Capability.Columns.openGroupServer == server)
.fetchAll(db)
let capabilities: Capabilities = Capabilities(
capabilities: allCapabilities
.filter { !$0.isMissing }
.map { $0.variant },
missing: {
let missingCapabilities: [Capability.Variant] = allCapabilities
.filter { $0.isMissing }
.map { $0.variant }
return (missingCapabilities.isEmpty ? nil : missingCapabilities)
}()
)
let openGroupIds: [String] = rooms
.map { OpenGroup.idFor(roomToken: $0, server: server) }
let groups: [OpenGroup] = try OpenGroup
.filter(ids: openGroupIds)
.fetchAll(db)
return (capabilities, groups)
}
let changedResponses: PollResponse = validResponses
.filter { endpoint, endpointResponse in
switch endpoint {
case .capabilities:
guard
let responseData: BatchSubResponse<Capabilities> = endpointResponse.data as? BatchSubResponse<Capabilities>,
let responseBody: Capabilities = responseData.body
else { return false }
return (responseBody != currentInfo?.capabilities)
case .roomPollInfo(let roomToken, _):
guard
let responseData: BatchSubResponse<RoomPollInfo> = endpointResponse.data as? BatchSubResponse<RoomPollInfo>,
let responseBody: RoomPollInfo = responseData.body
else { return false }
guard let existingOpenGroup: OpenGroup = currentInfo?.groups.first(where: { $0.roomToken == roomToken }) else {
return true
}
// Note: This might need to be updated in the future when we start tracking
// user permissions if changes to permissions don't trigger a change to
// the 'infoUpdates'
return (
responseBody.activeUsers != existingOpenGroup.userCount || (
responseBody.details != nil &&
responseBody.details?.infoUpdates != existingOpenGroup.infoUpdates
)
)
default: return true
}
}
// If there are no 'changedResponses' and there hasn't been a failure then there is
// no need to do anything else
guard !changedResponses.isEmpty || failureCount != 0 else { return }
dependencies.storage.write { db in
// Reset the failure count
if failureCount > 0 {
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0))
}
try changedResponses.forEach { endpoint, endpointResponse in
switch endpoint {
case .capabilities:
guard
let responseData: BatchSubResponse<Capabilities> = endpointResponse.data as? BatchSubResponse<Capabilities>,
let responseBody: Capabilities = responseData.body
else { return }
OpenGroupManager.handleCapabilities(
db,
capabilities: responseBody,
on: server
)
case .roomPollInfo(let roomToken, _):
guard
let responseData: BatchSubResponse<RoomPollInfo> = endpointResponse.data as? BatchSubResponse<RoomPollInfo>,
let responseBody: RoomPollInfo = responseData.body
else { return }
try OpenGroupManager.handlePollInfo(
db,
pollInfo: responseBody,
publicKey: nil,
for: roomToken,
on: server,
dependencies: dependencies
)
case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _):
guard
let responseData: BatchSubResponse<[Failable<Message>]> = endpointResponse.data as? BatchSubResponse<[Failable<Message>]>,
let responseBody: [Failable<Message>] = responseData.body
else { return }
OpenGroupManager.handleMessages(
db,
messages: successfulMessages,
messages: responseBody.compactMap { $0.value },
for: roomToken,
on: server,
isBackgroundPoll: isBackgroundPoll,
@ -283,10 +445,10 @@ extension OpenGroupAPI {
)
case .inbox, .inboxSince, .outbox, .outboxSince:
guard let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, !responseData.failedToParseBody else {
SNLog("Open group polling failed due to invalid inbox/outbox data.")
return
}
guard
let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>,
!responseData.failedToParseBody
else { return }
// Double optional because the server can return a `304` with an empty body
let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? [])

@ -150,6 +150,10 @@ public final class Poller {
MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend:
break
case DatabaseError.SQLITE_ABORT:
SNLog("Failed to the database being suspended (running in background with no background task).")
break
default: SNLog("Failed to deserialize envelope due to error: \(error).")
}

@ -44,10 +44,7 @@ public class TypingIndicators {
self.timestampMs = (timestampMs ?? Int64(floor(Date().timeIntervalSince1970 * 1000)))
}
fileprivate func starting(_ db: Database) -> Indicator {
let direction: Direction = self.direction
let timestampMs: Int64 = self.timestampMs
fileprivate func start(_ db: Database) {
// Start the typing indicator
switch direction {
case .outgoing:
@ -55,27 +52,17 @@ public class TypingIndicators {
case .incoming:
try? ThreadTypingIndicator(
threadId: self.threadId,
threadId: threadId,
timestampMs: timestampMs
)
.save(db)
}
// Schedule the 'stopCallback' to cancel the typing indicator
stopTimer?.invalidate()
stopTimer = Timer.scheduledTimerOnMainThread(
withTimeInterval: (direction == .outgoing ? 3 : 5),
repeats: false
) { [weak self] _ in
Storage.shared.write { db in
self?.stoping(db)
}
}
return self
// Refresh the timeout since we just started
refreshTimeout()
}
@discardableResult fileprivate func stoping(_ db: Database) -> Indicator? {
fileprivate func stop(_ db: Database) {
self.refreshTimer?.invalidate()
self.refreshTimer = nil
self.stopTimer?.invalidate()
@ -84,7 +71,7 @@ public class TypingIndicators {
switch direction {
case .outgoing:
guard let thread: SessionThread = try? SessionThread.fetchOne(db, id: self.threadId) else {
return nil
return
}
try? MessageSender.send(
@ -99,8 +86,22 @@ public class TypingIndicators {
.filter(ThreadTypingIndicator.Columns.threadId == self.threadId)
.deleteAll(db)
}
}
fileprivate func refreshTimeout() {
let threadId: String = self.threadId
let direction: Direction = self.direction
return nil
// Schedule the 'stopCallback' to cancel the typing indicator
stopTimer?.invalidate()
stopTimer = Timer.scheduledTimerOnMainThread(
withTimeInterval: (direction == .outgoing ? 3 : 5),
repeats: false
) { _ in
Storage.shared.write { db in
TypingIndicators.didStopTyping(db, threadId: threadId, direction: direction)
}
}
}
private func scheduleRefreshCallback(_ db: Database, shouldSend: Bool = true) {
@ -138,56 +139,76 @@ public class TypingIndicators {
// MARK: - Functions
public static func didStartTyping(
_ db: Database,
public static func didStartTypingNeedsToStart(
threadId: String,
threadVariant: SessionThread.Variant,
threadIsMessageRequest: Bool,
direction: Direction,
timestampMs: Int64?
) {
) -> Bool {
switch direction {
case .outgoing:
let updatedIndicator: Indicator? = (
outgoing.wrappedValue[threadId] ??
Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
)?.starting(db)
// If we already have an existing typing indicator for this thread then just
// refresh it's timeout (no need to do anything else)
if let existingIndicator: Indicator = outgoing.wrappedValue[threadId] {
existingIndicator.refreshTimeout()
return false
}
outgoing.mutate { $0[threadId] = updatedIndicator }
let newIndicator: Indicator? = Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
newIndicator?.refreshTimeout()
outgoing.mutate { $0[threadId] = newIndicator }
return true
case .incoming:
let updatedIndicator: Indicator? = (
incoming.wrappedValue[threadId] ??
Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
)?.starting(db)
// If we already have an existing typing indicator for this thread then just
// refresh it's timeout (no need to do anything else)
if let existingIndicator: Indicator = incoming.wrappedValue[threadId] {
existingIndicator.refreshTimeout()
return false
}
let newIndicator: Indicator? = Indicator(
threadId: threadId,
threadVariant: threadVariant,
threadIsMessageRequest: threadIsMessageRequest,
direction: direction,
timestampMs: timestampMs
)
newIndicator?.refreshTimeout()
incoming.mutate { $0[threadId] = updatedIndicator }
incoming.mutate { $0[threadId] = newIndicator }
return true
}
}
public static func start(_ db: Database, threadId: String, direction: Direction) {
switch direction {
case .outgoing: outgoing.wrappedValue[threadId]?.start(db)
case .incoming: incoming.wrappedValue[threadId]?.start(db)
}
}
public static func didStopTyping(_ db: Database, threadId: String, direction: Direction) {
switch direction {
case .outgoing:
let updatedIndicator: Indicator? = outgoing.wrappedValue[threadId]?.stoping(db)
outgoing.mutate { $0[threadId] = updatedIndicator }
if let indicator: Indicator = outgoing.wrappedValue[threadId] {
indicator.stop(db)
outgoing.mutate { $0[threadId] = nil }
}
case .incoming:
let updatedIndicator: Indicator? = incoming.wrappedValue[threadId]?.stoping(db)
incoming.mutate { $0[threadId] = updatedIndicator }
if let indicator: Indicator = incoming.wrappedValue[threadId] {
indicator.stop(db)
incoming.mutate { $0[threadId] = nil }
}
}
}
}

@ -68,19 +68,34 @@ public extension SnodeReceivedMessageInfo {
public extension SnodeReceivedMessageInfo {
static func pruneExpiredMessageHashInfo(for snode: Snode, namespace: Int, associatedWith publicKey: String) {
// Delete any expired SnodeReceivedMessageInfo values associated to a specific node
// Delete any expired SnodeReceivedMessageInfo values associated to a specific node (even though
// this runs very quickly we fetch the rowIds we want to delete from a 'read' call to avoid
// blocking the write queue since this method is called very frequently)
let rowIds: [Int64] = Storage.shared
.read { db in
// Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want
// to clear out the legacy hashes)
let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.isNotEmpty(db)
guard hasNonLegacyHash else { return [] }
return try SnodeReceivedMessageInfo
.select(Column.rowID)
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000))
.asRequest(of: Int64.self)
.fetchAll(db)
}
.defaulting(to: [])
// If there are no rowIds to delete then do nothing
guard !rowIds.isEmpty else { return }
Storage.shared.write { db in
// Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want
// to clear out the legacy hashes)
let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.isNotEmpty(db)
guard hasNonLegacyHash else { return }
try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000))
.filter(rowIds.contains(Column.rowID))
.deleteAll(db)
}
}

@ -60,6 +60,7 @@ public final class Storage {
// Configure the database and create the DatabasePool for interacting with the database
var config = Configuration()
config.maximumReaderCount = 10 // Increase the max read connection limit - Default is 5
config.observesSuspensionNotifications = true // Minimise `0xDEAD10CC` exceptions
config.prepareDatabase { db in
var keySpec: Data = Storage.getOrGenerateDatabaseKeySpec()
defer { keySpec.resetBytes(in: 0..<keySpec.count) } // Reset content immediately after use
@ -180,7 +181,6 @@ public final class Storage {
self?.hasCompletedMigrations = true
self?.migrationProgressUpdater = nil
SUKLegacy.clearLegacyDatabaseInstance()
// SUKLegacy.deleteLegacyDatabaseFilesAndKey() // TODO: Add a "Delete legacy database" migration to run after the '003' migrations
if let error = error {
SNLog("[Migration Error] Migration failed with error: \(error)")

@ -28,6 +28,12 @@ public extension Dictionary.Values {
// MARK: - Functional Convenience
public extension Dictionary {
public subscript(_ key: Key?) -> Value? {
guard let key: Key = key else { return nil }
return self[key]
}
func setting(_ key: Key?, _ value: Value?) -> [Key: Value] {
guard let key: Key = key else { return self }

@ -126,6 +126,9 @@ public final class JobRunner {
queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) }
// Don't start the queue if the job can't be started
guard canStartJob else { return }
// Start the job runner if needed
db.afterNextTransactionCommit { _ in
queues.wrappedValue[updatedJob.variant]?.start()
@ -253,6 +256,15 @@ public final class JobRunner {
JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true }
}
/// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run
/// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their
/// failure - they _should_ be picked up again the next time the app is launched)
public static func stopAndClearPendingJobs() {
queues.wrappedValue.values.forEach { queue in
queue.stopAndClearPendingJobs()
}
}
public static func isCurrentlyRunning(_ job: Job?) -> Bool {
guard let job: Job = job, let jobId: Int64 = job.id else { return false }
@ -347,6 +359,8 @@ private final class JobQueue {
}
}
private static let deferralLoopThreshold: Int = 3
private let type: QueueType
private let executionType: ExecutionType
private let qosClass: DispatchQoS
@ -376,6 +390,7 @@ private final class JobQueue {
private var queue: Atomic<[Job]> = Atomic([])
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty }
@ -555,7 +570,16 @@ private final class JobQueue {
runNextJob()
}
fileprivate func stopAndClearPendingJobs() {
isRunning.mutate { $0 = false }
queue.mutate { $0 = [] }
deferLoopTracker.mutate { $0 = [:] }
}
private func runNextJob() {
// Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job)
guard isRunning.wrappedValue else { return }
// Ensure this is running on the correct queue
guard DispatchQueue.getSpecific(key: queueKey) == queueContext else {
internalQueue.async { [weak self] in
@ -652,7 +676,7 @@ private final class JobQueue {
return
}
// Update the state to indicate it's running
// Update the state to indicate the particular job is running
//
// Note: We need to store 'numJobsRemaining' in it's own variable because
// the 'SNLog' seems to dispatch to it's own queue which ends up getting
@ -662,7 +686,6 @@ private final class JobQueue {
trigger?.invalidate() // Need to invalidate to prevent a memory leak
trigger = nil
}
isRunning.mutate { $0 = true }
jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in
jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id)
numJobsRunning = jobsCurrentlyRunning.count
@ -779,13 +802,20 @@ private final class JobQueue {
// `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over
// and over and reset their retry backoff in case they fail next time
case .recurringOnLaunch, .recurringOnActive:
Storage.shared.write { db in
_ = try job
.with(
failureCount: 0,
nextRunTimestamp: 0
)
.saved(db)
if
let jobId: Int64 = job.id,
job.failureCount != 0 &&
job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude
{
Storage.shared.write { db in
_ = try Job
.filter(id: jobId)
.updateAll(
db,
Job.Columns.failureCount.set(to: 0),
Job.Columns.nextRunTimestamp.set(to: 0)
)
}
}
default: break
@ -927,8 +957,48 @@ private final class JobQueue {
/// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant
/// on other jobs, and it should automatically manage those dependencies)
private func handleJobDeferred(_ job: Job) {
var stuckInDeferLoop: Bool = false
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
deferLoopTracker.mutate {
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
$0 = $0.setting(
job.id,
(1, [Date().timeIntervalSince1970])
)
return
}
let timeNow: TimeInterval = Date().timeIntervalSince1970
stuckInDeferLoop = (
lastRecord.count >= JobQueue.deferralLoopThreshold &&
(timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count)
)
$0 = $0.setting(
job.id,
(
lastRecord.count + 1,
// Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster
// than one loop per second
lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow]
)
)
}
// It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately
// defers itself but then attempts to run again (resulting in an infinite loop); this won't block
// the app since it's on a background thread but can result in 100% of a CPU being used (and a
// battery drain)
//
// This code will maintain an in-memory store for any jobs which are deferred too quickly (ie.
// more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds)
guard !stuckInDeferLoop else {
deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) }
handleJobFailed(job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false)
return
}
internalQueue.async { [weak self] in
self?.runNextJob()
}

@ -11,4 +11,6 @@ public enum JobRunnerError: Error {
case missingRequiredDetails
case missingDependencies
case possibleDeferralLoop
}

Loading…
Cancel
Save