Fixed a few more bugs and made a couple of optimisations to the GarbageCollectionJob

Added an index on Quote.authorId and added a garbage collection job to remove orphaned Profile entries
Added a few more indexes to improve the GarbageCollectionJob performance
Added some debug code to force a re-migration on next launch if the DB is invalid (only affects testers so code should be removed)
Fixed an issue where the GetSnodePool job wasn't properly blocking
Fixed an issue where a user could send the same message multiple times if they clicked the send button quickly enough
Fixed an issue where profiles might not have been getting created correctly for ClosedGroup members which have no threads/interactions
pull/612/head
Morgan Pretty 3 years ago
parent 34fea96db3
commit 6b9a19c761

@ -6798,7 +6798,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer"; CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 354; CURRENT_PROJECT_VERSION = 355;
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = ( FRAMEWORK_SEARCH_PATHS = (
"$(inherited)", "$(inherited)",
@ -6870,7 +6870,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer"; CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 354; CURRENT_PROJECT_VERSION = 355;
DEVELOPMENT_TEAM = SUQ8J2PCT7; DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = ( FRAMEWORK_SEARCH_PATHS = (
"$(inherited)", "$(inherited)",

@ -338,6 +338,15 @@ extension ConversationVC:
return present(modal, animated: true, completion: nil) return present(modal, animated: true, completion: nil)
} }
// Clearing this out immediately (even though it already happens in 'messageSent') to prevent
// "double sending" if the user rapidly taps the send button
DispatchQueue.main.async { [weak self] in
self?.snInputView.text = ""
self?.snInputView.quoteDraftInfo = nil
self?.resetMentions()
}
// Note: 'shouldBeVisible' is set to true the first time a thread is saved so we can // Note: 'shouldBeVisible' is set to true the first time a thread is saved so we can
// use it to determine if the user is creating a new thread and update the 'isApproved' // use it to determine if the user is creating a new thread and update the 'isApproved'
// flags appropriately // flags appropriately

@ -66,9 +66,9 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
minEstimatedTotalTime: minEstimatedTotalTime minEstimatedTotalTime: minEstimatedTotalTime
) )
}, },
migrationsCompletion: { [weak self] successful, needsConfigSync in migrationsCompletion: { [weak self] error, needsConfigSync in
guard successful else { guard error == nil else {
self?.showFailedMigrationAlert() self?.showFailedMigrationAlert(error: error)
return return
} }
@ -225,15 +225,28 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
} }
} }
private func showFailedMigrationAlert() { private func showFailedMigrationAlert(error: Error?) {
let alert = UIAlertController( let alert = UIAlertController(
title: "Session", title: "Session",
message: "DATABASE_MIGRATION_FAILED".localized(), message: ((error as? StorageError) == StorageError.devRemigrationRequired ?
"The database has changed since the last version and you need to re-migrate (this will close the app and migrate on the next launch)" :
"DATABASE_MIGRATION_FAILED".localized()
),
preferredStyle: .alert preferredStyle: .alert
) )
switch (error as? StorageError) {
case .devRemigrationRequired:
alert.addAction(UIAlertAction(title: "Re-Migrate Database", style: .default) { _ in
Storage.deleteDatabaseFiles()
try? Storage.deleteDbKeys()
exit(1)
})
default:
alert.addAction(UIAlertAction(title: "modal_share_logs_title".localized(), style: .default) { _ in alert.addAction(UIAlertAction(title: "modal_share_logs_title".localized(), style: .default) { _ in
ShareLogsModal.shareLogs(from: alert) { [weak self] in ShareLogsModal.shareLogs(from: alert) { [weak self] in
self?.showFailedMigrationAlert() self?.showFailedMigrationAlert(error: error)
} }
}) })
alert.addAction(UIAlertAction(title: "vc_restore_title".localized(), style: .destructive) { _ in alert.addAction(UIAlertAction(title: "vc_restore_title".localized(), style: .destructive) { _ in
@ -252,9 +265,9 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
minEstimatedTotalTime: minEstimatedTotalTime minEstimatedTotalTime: minEstimatedTotalTime
) )
}, },
migrationsCompletion: { [weak self] successful, needsConfigSync in migrationsCompletion: { [weak self] error, needsConfigSync in
guard successful else { guard error == nil else {
self?.showFailedMigrationAlert() self?.showFailedMigrationAlert(error: error)
return return
} }
@ -262,6 +275,8 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD
} }
) )
}) })
}
alert.addAction(UIAlertAction(title: "Close", style: .default) { _ in alert.addAction(UIAlertAction(title: "Close", style: .default) { _ in
DDLog.flushLog() DDLog.flushLog()
exit(0) exit(0)

@ -123,7 +123,9 @@ enum _001_InitialSetupMigration: Migration {
t.column(.threadId, .text) t.column(.threadId, .text)
.notNull() .notNull()
.primaryKey() .primaryKey()
t.column(.server, .text).notNull() t.column(.server, .text)
.indexed() // Quicker querying
.notNull()
t.column(.roomToken, .text).notNull() t.column(.roomToken, .text).notNull()
t.column(.publicKey, .text).notNull() t.column(.publicKey, .text).notNull()
t.column(.isActive, .boolean) t.column(.isActive, .boolean)
@ -328,10 +330,12 @@ enum _001_InitialSetupMigration: Migration {
.references(Interaction.self, onDelete: .cascade) // Delete if interaction deleted .references(Interaction.self, onDelete: .cascade) // Delete if interaction deleted
t.column(.authorId, .text) t.column(.authorId, .text)
.notNull() .notNull()
.indexed() // Quicker querying
.references(Profile.self) .references(Profile.self)
t.column(.timestampMs, .double).notNull() t.column(.timestampMs, .double).notNull()
t.column(.body, .text) t.column(.body, .text)
t.column(.attachmentId, .text) t.column(.attachmentId, .text)
.indexed() // Quicker querying
.references(Attachment.self, onDelete: .setNull) // Clear if attachment deleted .references(Attachment.self, onDelete: .setNull) // Clear if attachment deleted
} }
@ -345,6 +349,7 @@ enum _001_InitialSetupMigration: Migration {
t.column(.variant, .integer).notNull() t.column(.variant, .integer).notNull()
t.column(.title, .text) t.column(.title, .text)
t.column(.attachmentId, .text) t.column(.attachmentId, .text)
.indexed() // Quicker querying
.references(Attachment.self) // Managed via garbage collection .references(Attachment.self) // Managed via garbage collection
t.primaryKey([.url, .timestamp]) t.primaryKey([.url, .timestamp])

@ -21,19 +21,19 @@ enum _002_SetupStandardJobs: Migration {
_ = try Job( _ = try Job(
variant: .disappearingMessages, variant: .disappearingMessages,
behaviour: .recurringOnLaunch, behaviour: .recurringOnLaunch,
shouldBlockFirstRunEachSession: true shouldBlock: true
).inserted(db) ).inserted(db)
_ = try Job( _ = try Job(
variant: .failedMessageSends, variant: .failedMessageSends,
behaviour: .recurringOnLaunch, behaviour: .recurringOnLaunch,
shouldBlockFirstRunEachSession: true shouldBlock: true
).inserted(db) ).inserted(db)
_ = try Job( _ = try Job(
variant: .failedAttachmentDownloads, variant: .failedAttachmentDownloads,
behaviour: .recurringOnLaunch, behaviour: .recurringOnLaunch,
shouldBlockFirstRunEachSession: true shouldBlock: true
).inserted(db) ).inserted(db)
_ = try Job( _ = try Job(

@ -88,7 +88,10 @@ enum _003_YDBToGRDBMigration: Migration {
transaction.enumerateRows(inCollection: SMKLegacy.contactCollection) { _, object, _, _ in transaction.enumerateRows(inCollection: SMKLegacy.contactCollection) { _, object, _, _ in
guard let contact = object as? SMKLegacy._Contact else { return } guard let contact = object as? SMKLegacy._Contact else { return }
contacts.insert(contact) contacts.insert(contact)
/// Store a record of the all valid profiles (so we can create dummy entries if we need to for closed group members)
validProfileIds.insert(contact.sessionID) validProfileIds.insert(contact.sessionID)
} }
@ -628,12 +631,28 @@ enum _003_YDBToGRDBMigration: Migration {
// Create the 'GroupMember' models for the group (even if the current user is no longer // Create the 'GroupMember' models for the group (even if the current user is no longer
// a member as these objects are used to generate the group avatar icon) // a member as these objects are used to generate the group avatar icon)
func createDummyProfile(profileId: String) {
SNLog("[Migration Warning] Closed group member with unknown user found - Creating empty profile")
// Note: Need to upsert here because it's possible multiple quotes
// will use the same invalid 'authorId' value resulting in a unique
// constraint violation
try? Profile(
id: profileId,
name: profileId
).save(db)
}
try groupModel.groupMemberIds.forEach { memberId in try groupModel.groupMemberIds.forEach { memberId in
try GroupMember( try GroupMember(
groupId: threadId, groupId: threadId,
profileId: memberId, profileId: memberId,
role: .standard role: .standard
).insert(db) ).insert(db)
if !validProfileIds.contains(memberId) {
createDummyProfile(profileId: memberId)
}
} }
try groupModel.groupAdminIds.forEach { adminId in try groupModel.groupAdminIds.forEach { adminId in
@ -642,6 +661,10 @@ enum _003_YDBToGRDBMigration: Migration {
profileId: adminId, profileId: adminId,
role: .admin role: .admin
).insert(db) ).insert(db)
if !validProfileIds.contains(adminId) {
createDummyProfile(profileId: adminId)
}
} }
try (closedGroupZombieMemberIds[legacyThread.uniqueId] ?? []).forEach { zombieId in try (closedGroupZombieMemberIds[legacyThread.uniqueId] ?? []).forEach { zombieId in
@ -650,6 +673,10 @@ enum _003_YDBToGRDBMigration: Migration {
profileId: zombieId, profileId: zombieId,
role: .zombie role: .zombie
).insert(db) ).insert(db)
if !validProfileIds.contains(zombieId) {
createDummyProfile(profileId: zombieId)
}
} }
} }

@ -234,6 +234,41 @@ public enum GarbageCollectionJob: JobExecutor {
) )
""") """)
} }
if typesToCollect.contains(.orphanedProfiles) {
let profile: TypedTableAlias<Profile> = TypedTableAlias()
let thread: TypedTableAlias<SessionThread> = TypedTableAlias()
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let quote: TypedTableAlias<Quote> = TypedTableAlias()
let groupMember: TypedTableAlias<GroupMember> = TypedTableAlias()
let contact: TypedTableAlias<Contact> = TypedTableAlias()
let blindedIdLookup: TypedTableAlias<BlindedIdLookup> = TypedTableAlias()
try db.execute(literal: """
DELETE FROM \(Profile.self)
WHERE \(Column.rowID) IN (
SELECT \(profile.alias[Column.rowID])
FROM \(Profile.self)
LEFT JOIN \(SessionThread.self) ON \(thread[.id]) = \(profile[.id])
LEFT JOIN \(Interaction.self) ON \(interaction[.authorId]) = \(profile[.id])
LEFT JOIN \(Quote.self) ON \(quote[.authorId]) = \(profile[.id])
LEFT JOIN \(GroupMember.self) ON \(groupMember[.profileId]) = \(profile[.id])
LEFT JOIN \(Contact.self) ON \(contact[.id]) = \(profile[.id])
LEFT JOIN \(BlindedIdLookup.self) ON (
blindedIdLookup.blindedId = \(profile[.id]) OR
blindedIdLookup.sessionId = \(profile[.id])
)
WHERE (
\(thread[.id]) IS NULL AND
\(interaction[.authorId]) IS NULL AND
\(quote[.authorId]) IS NULL AND
\(groupMember[.profileId]) IS NULL AND
\(contact[.id]) IS NULL AND
\(blindedIdLookup[.blindedId]) IS NULL
)
)
""")
}
}, },
completion: { _, _ in completion: { _, _ in
// Dispatch async so we can swap from the write queue to a read one (we are done writing) // Dispatch async so we can swap from the write queue to a read one (we are done writing)
@ -353,6 +388,9 @@ public enum GarbageCollectionJob: JobExecutor {
return return
} }
// Update the 'lastGarbageCollection' date to prevent this job from running again
// for the next 23 hours
UserDefaults.standard[.lastGarbageCollection] = Date()
success(job, false) success(job, false)
} }
} }
@ -373,6 +411,7 @@ extension GarbageCollectionJob {
case orphanedOpenGroupCapabilities case orphanedOpenGroupCapabilities
case orphanedBlindedIdLookups case orphanedBlindedIdLookups
case approvedBlindedContactRecords case approvedBlindedContactRecords
case orphanedProfiles
case orphanedAttachments case orphanedAttachments
case orphanedAttachmentFiles case orphanedAttachmentFiles
case orphanedProfileAvatars case orphanedProfileAvatars

@ -14,10 +14,18 @@ enum _002_SetupStandardJobs: Migration {
static func migrate(_ db: Database) throws { static func migrate(_ db: Database) throws {
try autoreleasepool { try autoreleasepool {
_ = try Job(
variant: .getSnodePool,
behaviour: .recurringOnLaunch,
shouldBlock: true
).inserted(db)
// Note: We also want this job to run both onLaunch and onActive as we want it to block
// 'onLaunch' and 'onActive' doesn't support blocking jobs
_ = try Job( _ = try Job(
variant: .getSnodePool, variant: .getSnodePool,
behaviour: .recurringOnActive, behaviour: .recurringOnActive,
shouldBlockFirstRunEachSession: true shouldSkipLaunchBecomeActive: true
).inserted(db) ).inserted(db)
} }

@ -31,10 +31,13 @@ enum _001_InitialSetupMigration: Migration {
t.column(.behaviour, .integer) t.column(.behaviour, .integer)
.notNull() .notNull()
.indexed() // Quicker querying .indexed() // Quicker querying
t.column(.shouldBlockFirstRunEachSession, .boolean) t.column(.shouldBlock, .boolean)
.notNull() .notNull()
.indexed() // Quicker querying .indexed() // Quicker querying
.defaults(to: false) .defaults(to: false)
t.column(.shouldSkipLaunchBecomeActive, .boolean)
.notNull()
.defaults(to: false)
t.column(.nextRunTimestamp, .double) t.column(.nextRunTimestamp, .double)
.notNull() .notNull()
.indexed() // Quicker querying .indexed() // Quicker querying

@ -25,7 +25,8 @@ enum _002_SetupStandardJobs: Migration {
// in 'onActive' (see the `SyncPushTokensJob` for more info) // in 'onActive' (see the `SyncPushTokensJob` for more info)
_ = try Job( _ = try Job(
variant: .syncPushTokens, variant: .syncPushTokens,
behaviour: .recurringOnActive behaviour: .recurringOnActive,
shouldSkipLaunchBecomeActive: true
).inserted(db) ).inserted(db)
} }

@ -31,7 +31,8 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
case failureCount case failureCount
case variant case variant
case behaviour case behaviour
case shouldBlockFirstRunEachSession case shouldBlock
case shouldSkipLaunchBecomeActive
case nextRunTimestamp case nextRunTimestamp
case threadId case threadId
case interactionId case interactionId
@ -136,12 +137,16 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
/// How the job should behave /// How the job should behave
public let behaviour: Behaviour public let behaviour: Behaviour
/// When the app starts or returns from the background this flag controls whether the job should prevent other /// When the app starts this flag controls whether the job should prevent other jobs from starting until after it completes
/// jobs from starting until after it completes
/// ///
/// **Note:** `OnLaunch` blocking jobs will be started on launch and all others will be triggered when becoming /// **Note:** This flag is only supported for jobs with an `OnLaunch` behaviour because there is no way to guarantee
/// active but the "blocking" behaviour will only occur if there are no other jobs already running /// jobs with any other behaviours will be added to the JobRunner before all the `OnLaunch` blocking jobs are completed
public let shouldBlockFirstRunEachSession: Bool /// resulting in the JobRunner no longer blocking
public let shouldBlock: Bool
/// When the app starts it also triggers any `OnActive` jobs, this flag controls whether the job should skip this initial `OnActive`
/// trigger (generally used for the same job registered with both `OnLaunch` and `OnActive` behaviours)
public let shouldSkipLaunchBecomeActive: Bool
/// Seconds since epoch to indicate the next datetime that this job should run /// Seconds since epoch to indicate the next datetime that this job should run
public let nextRunTimestamp: TimeInterval public let nextRunTimestamp: TimeInterval
@ -184,17 +189,25 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
failureCount: UInt, failureCount: UInt,
variant: Variant, variant: Variant,
behaviour: Behaviour, behaviour: Behaviour,
shouldBlockFirstRunEachSession: Bool, shouldBlock: Bool,
shouldSkipLaunchBecomeActive: Bool,
nextRunTimestamp: TimeInterval, nextRunTimestamp: TimeInterval,
threadId: String?, threadId: String?,
interactionId: Int64?, interactionId: Int64?,
details: Data? details: Data?
) { ) {
Job.ensureValidBehaviour(
behaviour: behaviour,
shouldBlock: shouldBlock,
shouldSkipLaunchBecomeActive: shouldSkipLaunchBecomeActive
)
self.id = id self.id = id
self.failureCount = failureCount self.failureCount = failureCount
self.variant = variant self.variant = variant
self.behaviour = behaviour self.behaviour = behaviour
self.shouldBlockFirstRunEachSession = shouldBlockFirstRunEachSession self.shouldBlock = shouldBlock
self.shouldSkipLaunchBecomeActive = shouldSkipLaunchBecomeActive
self.nextRunTimestamp = nextRunTimestamp self.nextRunTimestamp = nextRunTimestamp
self.threadId = threadId self.threadId = threadId
self.interactionId = interactionId self.interactionId = interactionId
@ -205,15 +218,23 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
failureCount: UInt = 0, failureCount: UInt = 0,
variant: Variant, variant: Variant,
behaviour: Behaviour = .runOnce, behaviour: Behaviour = .runOnce,
shouldBlockFirstRunEachSession: Bool = false, shouldBlock: Bool = false,
shouldSkipLaunchBecomeActive: Bool = false,
nextRunTimestamp: TimeInterval = 0, nextRunTimestamp: TimeInterval = 0,
threadId: String? = nil, threadId: String? = nil,
interactionId: Int64? = nil interactionId: Int64? = nil
) { ) {
Job.ensureValidBehaviour(
behaviour: behaviour,
shouldBlock: shouldBlock,
shouldSkipLaunchBecomeActive: shouldSkipLaunchBecomeActive
)
self.failureCount = failureCount self.failureCount = failureCount
self.variant = variant self.variant = variant
self.behaviour = behaviour self.behaviour = behaviour
self.shouldBlockFirstRunEachSession = shouldBlockFirstRunEachSession self.shouldBlock = shouldBlock
self.shouldSkipLaunchBecomeActive = shouldSkipLaunchBecomeActive
self.nextRunTimestamp = nextRunTimestamp self.nextRunTimestamp = nextRunTimestamp
self.threadId = threadId self.threadId = threadId
self.interactionId = interactionId self.interactionId = interactionId
@ -224,13 +245,19 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
failureCount: UInt = 0, failureCount: UInt = 0,
variant: Variant, variant: Variant,
behaviour: Behaviour = .runOnce, behaviour: Behaviour = .runOnce,
shouldBlockFirstRunEachSession: Bool = false, shouldBlock: Bool = false,
shouldSkipLaunchBecomeActive: Bool = false,
nextRunTimestamp: TimeInterval = 0, nextRunTimestamp: TimeInterval = 0,
threadId: String? = nil, threadId: String? = nil,
interactionId: Int64? = nil, interactionId: Int64? = nil,
details: T? details: T?
) { ) {
precondition(T.self != Job.self, "[Job] Fatal error trying to create a Job with a Job as it's details") precondition(T.self != Job.self, "[Job] Fatal error trying to create a Job with a Job as it's details")
Job.ensureValidBehaviour(
behaviour: behaviour,
shouldBlock: shouldBlock,
shouldSkipLaunchBecomeActive: shouldSkipLaunchBecomeActive
)
guard guard
let details: T = details, let details: T = details,
@ -240,13 +267,31 @@ public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePer
self.failureCount = failureCount self.failureCount = failureCount
self.variant = variant self.variant = variant
self.behaviour = behaviour self.behaviour = behaviour
self.shouldBlockFirstRunEachSession = shouldBlockFirstRunEachSession self.shouldBlock = shouldBlock
self.shouldSkipLaunchBecomeActive = shouldSkipLaunchBecomeActive
self.nextRunTimestamp = nextRunTimestamp self.nextRunTimestamp = nextRunTimestamp
self.threadId = threadId self.threadId = threadId
self.interactionId = interactionId self.interactionId = interactionId
self.details = detailsData self.details = detailsData
} }
fileprivate static func ensureValidBehaviour(
behaviour: Behaviour,
shouldBlock: Bool,
shouldSkipLaunchBecomeActive: Bool
) {
// Blocking jobs can only run on launch as we can't guarantee that any other behaviours will get added
// to the JobRunner before any prior blocking jobs have completed (resulting in them being non-blocking)
precondition(
!shouldBlock || behaviour == .recurringOnLaunch || behaviour == .runOnceNextLaunch,
"[Job] Fatal error trying to create a blocking job which doesn't run on launch"
)
precondition(
!shouldSkipLaunchBecomeActive || behaviour == .recurringOnActive,
"[Job] Fatal error trying to create a job which skips on 'OnActive' triggered during launch with doesn't run on active"
)
}
// MARK: - Custom Database Interaction // MARK: - Custom Database Interaction
public mutating func didInsert(with rowID: Int64, for column: String?) { public mutating func didInsert(with rowID: Int64, for column: String?) {
@ -306,7 +351,8 @@ public extension Job {
failureCount: failureCount, failureCount: failureCount,
variant: self.variant, variant: self.variant,
behaviour: self.behaviour, behaviour: self.behaviour,
shouldBlockFirstRunEachSession: self.shouldBlockFirstRunEachSession, shouldBlock: self.shouldBlock,
shouldSkipLaunchBecomeActive: self.shouldSkipLaunchBecomeActive,
nextRunTimestamp: nextRunTimestamp, nextRunTimestamp: nextRunTimestamp,
threadId: self.threadId, threadId: self.threadId,
interactionId: self.interactionId, interactionId: self.interactionId,
@ -322,7 +368,8 @@ public extension Job {
failureCount: self.failureCount, failureCount: self.failureCount,
variant: self.variant, variant: self.variant,
behaviour: self.behaviour, behaviour: self.behaviour,
shouldBlockFirstRunEachSession: self.shouldBlockFirstRunEachSession, shouldBlock: self.shouldBlock,
shouldSkipLaunchBecomeActive: self.shouldSkipLaunchBecomeActive,
nextRunTimestamp: self.nextRunTimestamp, nextRunTimestamp: self.nextRunTimestamp,
threadId: self.threadId, threadId: self.threadId,
interactionId: self.interactionId, interactionId: self.interactionId,

@ -100,7 +100,7 @@ public final class Storage {
migrations: [TargetMigrations], migrations: [TargetMigrations],
async: Bool = true, async: Bool = true,
onProgressUpdate: ((CGFloat, TimeInterval) -> ())?, onProgressUpdate: ((CGFloat, TimeInterval) -> ())?,
onComplete: @escaping (Bool, Bool) -> () onComplete: @escaping (Error?, Bool) -> ()
) { ) {
guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return } guard isValid, let dbWriter: DatabaseWriter = dbWriter else { return }
@ -176,7 +176,7 @@ public final class Storage {
} }
// Store the logic to run when the migration completes // Store the logic to run when the migration completes
let migrationCompleted: (Error?) -> () = { [weak self] error in let migrationCompleted: (Database, Error?) -> () = { [weak self] db, error in
self?.hasCompletedMigrations = true self?.hasCompletedMigrations = true
self?.migrationProgressUpdater = nil self?.migrationProgressUpdater = nil
SUKLegacy.clearLegacyDatabaseInstance() SUKLegacy.clearLegacyDatabaseInstance()
@ -186,18 +186,27 @@ public final class Storage {
SNLog("[Migration Error] Migration failed with error: \(error)") SNLog("[Migration Error] Migration failed with error: \(error)")
} }
onComplete((error == nil), needsConfigSync) // TODO: Remove this once everyone has updated
var finalError: Error? = error
let jobTableInfo: [Row] = (try? Row.fetchAll(db, sql: "PRAGMA table_info(\(Job.databaseTableName))"))
.defaulting(to: [])
if !jobTableInfo.contains(where: { $0["name"] == "shouldSkipLaunchBecomeActive" }) {
finalError = StorageError.devRemigrationRequired
}
// TODO: Remove this once everyone has updated
onComplete(finalError, needsConfigSync)
} }
// Note: The non-async migration should only be used for unit tests // Note: The non-async migration should only be used for unit tests
guard async else { guard async else {
do { try self.migrator?.migrate(dbWriter) } do { try self.migrator?.migrate(dbWriter) }
catch { migrationCompleted(error) } catch { try? dbWriter.read { db in migrationCompleted(db, error) } }
return return
} }
self.migrator?.asyncMigrate(dbWriter) { _, error in self.migrator?.asyncMigrate(dbWriter) { db, error in
migrationCompleted(error) migrationCompleted(db, error)
} }
} }

@ -14,4 +14,6 @@ public enum StorageError: Error {
case objectNotSaved case objectNotSaved
case invalidSearchPattern case invalidSearchPattern
case devRemigrationRequired
} }

@ -102,6 +102,7 @@ public final class JobRunner {
internal static var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) internal static var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:])
fileprivate static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([]) fileprivate static var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
private static var hasCompletedInitialBecomeActive: Atomic<Bool> = Atomic(false)
// MARK: - Configuration // MARK: - Configuration
@ -184,7 +185,7 @@ public final class JobRunner {
Job.Behaviour.runOnceNextLaunch Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour) ].contains(Job.Columns.behaviour)
) )
.filter(Job.Columns.shouldBlockFirstRunEachSession == true) .filter(Job.Columns.shouldBlock == true)
.order(Job.Columns.id) .order(Job.Columns.id)
.fetchAll(db) .fetchAll(db)
let nonblockingJobs: [Job] = try Job let nonblockingJobs: [Job] = try Job
@ -194,7 +195,7 @@ public final class JobRunner {
Job.Behaviour.runOnceNextLaunch Job.Behaviour.runOnceNextLaunch
].contains(Job.Columns.behaviour) ].contains(Job.Columns.behaviour)
) )
.filter(Job.Columns.shouldBlockFirstRunEachSession == false) .filter(Job.Columns.shouldBlock == false)
.order(Job.Columns.id) .order(Job.Columns.id)
.fetchAll(db) .fetchAll(db)
@ -218,65 +219,38 @@ public final class JobRunner {
} }
public static func appDidBecomeActive() { public static func appDidBecomeActive() {
// Note: When becoming active we want to start all non-on-launch blocking jobs as let hasCompletedInitialBecomeActive: Bool = JobRunner.hasCompletedInitialBecomeActive.wrappedValue
// long as there are no other jobs already running let jobsToRun: [Job] = Storage.shared
let alreadyRunningOtherJobs: Bool = queues.wrappedValue
.contains(where: { _, queue -> Bool in queue.isRunning.wrappedValue })
let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = Storage.shared
.read { db in .read { db in
guard !alreadyRunningOtherJobs else { return try Job
let onActiveJobs: [Job] = try Job
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive) .filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
.order(Job.Columns.id) .order(Job.Columns.id)
.fetchAll(db) .fetchAll(db)
return ([], onActiveJobs)
}
let blockingJobs: [Job] = try Job
.filter(
Job.Behaviour.allCases
.filter {
$0 != .recurringOnLaunch &&
$0 != .runOnceNextLaunch
} }
.contains(Job.Columns.behaviour) .defaulting(to: [])
) .filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive }
.filter(Job.Columns.shouldBlockFirstRunEachSession == true)
.order(Job.Columns.id)
.fetchAll(db)
let nonBlockingJobs: [Job] = try Job
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
.filter(Job.Columns.shouldBlockFirstRunEachSession == false)
.order(Job.Columns.id)
.fetchAll(db)
return (blockingJobs, nonBlockingJobs)
}
.defaulting(to: ([], []))
// Store the current queue state locally to avoid multiple atomic retrievals // Store the current queue state locally to avoid multiple atomic retrievals
let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true) let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
guard !jobsToRun.blocking.isEmpty || !jobsToRun.nonBlocking.isEmpty else { guard !jobsToRun.isEmpty else {
if !blockingQueueIsRunning { if !blockingQueueIsRunning {
jobQueues.forEach { _, queue in queue.start() } jobQueues.forEach { _, queue in queue.start() }
} }
return return
} }
// Add and start any blocking jobs
blockingQueue.wrappedValue?.appDidFinishLaunching(with: jobsToRun.blocking, canStart: true)
// Add and start any non-blocking jobs (if there are no blocking jobs) // Add and start any non-blocking jobs (if there are no blocking jobs)
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant) let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.grouped(by: \.variant)
jobQueues.forEach { variant, queue in jobQueues.forEach { variant, queue in
queue.appDidBecomeActive( queue.appDidBecomeActive(
with: (jobsByVariant[variant] ?? []), with: (jobsByVariant[variant] ?? []),
canStart: (!blockingQueueIsRunning && jobsToRun.blocking.isEmpty) canStart: !blockingQueueIsRunning
) )
} }
JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true }
} }
public static func isCurrentlyRunning(_ job: Job?) -> Bool { public static func isCurrentlyRunning(_ job: Job?) -> Bool {
@ -849,7 +823,7 @@ private final class JobQueue {
} }
// If this is the blocking queue and a "blocking" job failed then rerun it immediately // If this is the blocking queue and a "blocking" job failed then rerun it immediately
if self.type == .blocking && job.shouldBlockFirstRunEachSession { if self.type == .blocking && job.shouldBlock {
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately") SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately")
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }

@ -11,7 +11,7 @@ public enum AppSetup {
public static func setupEnvironment( public static func setupEnvironment(
appSpecificBlock: @escaping () -> (), appSpecificBlock: @escaping () -> (),
migrationProgressChanged: ((CGFloat, TimeInterval) -> ())? = nil, migrationProgressChanged: ((CGFloat, TimeInterval) -> ())? = nil,
migrationsCompletion: @escaping (Bool, Bool) -> () migrationsCompletion: @escaping (Error?, Bool) -> ()
) { ) {
guard !AppSetup.hasRun else { return } guard !AppSetup.hasRun else { return }
@ -60,7 +60,7 @@ public enum AppSetup {
public static func runPostSetupMigrations( public static func runPostSetupMigrations(
backgroundTask: OWSBackgroundTask? = nil, backgroundTask: OWSBackgroundTask? = nil,
migrationProgressChanged: ((CGFloat, TimeInterval) -> ())? = nil, migrationProgressChanged: ((CGFloat, TimeInterval) -> ())? = nil,
migrationsCompletion: @escaping (Bool, Bool) -> () migrationsCompletion: @escaping (Error?, Bool) -> ()
) { ) {
var backgroundTask: OWSBackgroundTask? = (backgroundTask ?? OWSBackgroundTask(labelStr: #function)) var backgroundTask: OWSBackgroundTask? = (backgroundTask ?? OWSBackgroundTask(labelStr: #function))
@ -71,9 +71,9 @@ public enum AppSetup {
SNMessagingKit.migrations() SNMessagingKit.migrations()
], ],
onProgressUpdate: migrationProgressChanged, onProgressUpdate: migrationProgressChanged,
onComplete: { success, needsConfigSync in onComplete: { error, needsConfigSync in
DispatchQueue.main.async { DispatchQueue.main.async {
migrationsCompletion(success, needsConfigSync) migrationsCompletion(error, needsConfigSync)
// The 'if' is only there to prevent the "variable never read" warning from showing // The 'if' is only there to prevent the "variable never read" warning from showing
if backgroundTask != nil { backgroundTask = nil } if backgroundTask != nil { backgroundTask = nil }

Loading…
Cancel
Save