[WIP] started adding logic to ignore messages invalidated by config

Created a ConfigMessageReceiveJob just to clean up the logs a bit
Updated the poller to make any MessageReceive jobs dependant on any ConfigMessageReceive jobs which are created
Updated legacy groups to delete the group content when you are removed
Fixed an issue where the JobRunner wouldn't stop pending jobs while clearing data
Fixed another issue with the profile view in the message cell
pull/751/head
Morgan Pretty 1 year ago
parent 9794877692
commit 3b772b7f90

@ -587,6 +587,7 @@
FD2B4AFD294688D000AB4848 /* SessionUtil+Contacts.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4AFC294688D000AB4848 /* SessionUtil+Contacts.swift */; }; FD2B4AFD294688D000AB4848 /* SessionUtil+Contacts.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4AFC294688D000AB4848 /* SessionUtil+Contacts.swift */; };
FD2B4AFF2946C93200AB4848 /* ConfigurationSyncJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */; }; FD2B4AFF2946C93200AB4848 /* ConfigurationSyncJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */; };
FD2B4B042949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4B032949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift */; }; FD2B4B042949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD2B4B032949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift */; };
FD3003662A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD3003652A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift */; };
FD368A6829DE8F9C000DBF1E /* _012_AddFTSIfNeeded.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD368A6729DE8F9B000DBF1E /* _012_AddFTSIfNeeded.swift */; }; FD368A6829DE8F9C000DBF1E /* _012_AddFTSIfNeeded.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD368A6729DE8F9B000DBF1E /* _012_AddFTSIfNeeded.swift */; };
FD368A6A29DE9E30000DBF1E /* UIContextualAction+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD368A6929DE9E30000DBF1E /* UIContextualAction+Utilities.swift */; }; FD368A6A29DE9E30000DBF1E /* UIContextualAction+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD368A6929DE9E30000DBF1E /* UIContextualAction+Utilities.swift */; };
FD37E9C328A1C6F3003AE748 /* ThemeManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD37E9C228A1C6F3003AE748 /* ThemeManager.swift */; }; FD37E9C328A1C6F3003AE748 /* ThemeManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD37E9C228A1C6F3003AE748 /* ThemeManager.swift */; };
@ -1725,6 +1726,7 @@
FD2B4AFC294688D000AB4848 /* SessionUtil+Contacts.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SessionUtil+Contacts.swift"; sourceTree = "<group>"; }; FD2B4AFC294688D000AB4848 /* SessionUtil+Contacts.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SessionUtil+Contacts.swift"; sourceTree = "<group>"; };
FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConfigurationSyncJob.swift; sourceTree = "<group>"; }; FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConfigurationSyncJob.swift; sourceTree = "<group>"; };
FD2B4B032949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "QueryInterfaceRequest+Utilities.swift"; sourceTree = "<group>"; }; FD2B4B032949887A00AB4848 /* QueryInterfaceRequest+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "QueryInterfaceRequest+Utilities.swift"; sourceTree = "<group>"; };
FD3003652A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConfigMessageReceiveJob.swift; sourceTree = "<group>"; };
FD368A6729DE8F9B000DBF1E /* _012_AddFTSIfNeeded.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = _012_AddFTSIfNeeded.swift; sourceTree = "<group>"; }; FD368A6729DE8F9B000DBF1E /* _012_AddFTSIfNeeded.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = _012_AddFTSIfNeeded.swift; sourceTree = "<group>"; };
FD368A6929DE9E30000DBF1E /* UIContextualAction+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "UIContextualAction+Utilities.swift"; sourceTree = "<group>"; }; FD368A6929DE9E30000DBF1E /* UIContextualAction+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "UIContextualAction+Utilities.swift"; sourceTree = "<group>"; };
FD37E9C228A1C6F3003AE748 /* ThemeManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThemeManager.swift; sourceTree = "<group>"; }; FD37E9C228A1C6F3003AE748 /* ThemeManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThemeManager.swift; sourceTree = "<group>"; };
@ -4296,6 +4298,7 @@
FDD2506F2837199200198BDA /* GarbageCollectionJob.swift */, FDD2506F2837199200198BDA /* GarbageCollectionJob.swift */,
C352A2FE25574B6300338F3E /* MessageSendJob.swift */, C352A2FE25574B6300338F3E /* MessageSendJob.swift */,
C352A31225574F5200338F3E /* MessageReceiveJob.swift */, C352A31225574F5200338F3E /* MessageReceiveJob.swift */,
FD3003652A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift */,
C352A32E2557549C00338F3E /* NotifyPushServerJob.swift */, C352A32E2557549C00338F3E /* NotifyPushServerJob.swift */,
FDF0B74E28079E5E004C14C5 /* SendReadReceiptsJob.swift */, FDF0B74E28079E5E004C14C5 /* SendReadReceiptsJob.swift */,
C352A348255781F400338F3E /* AttachmentDownloadJob.swift */, C352A348255781F400338F3E /* AttachmentDownloadJob.swift */,
@ -5729,6 +5732,7 @@
C3471ECB2555356A00297E91 /* MessageSender+Encryption.swift in Sources */, C3471ECB2555356A00297E91 /* MessageSender+Encryption.swift in Sources */,
FDF40CDE2897A1BC006A0CC4 /* _004_RemoveLegacyYDB.swift in Sources */, FDF40CDE2897A1BC006A0CC4 /* _004_RemoveLegacyYDB.swift in Sources */,
FDF0B74928060D13004C14C5 /* QuotedReplyModel.swift in Sources */, FDF0B74928060D13004C14C5 /* QuotedReplyModel.swift in Sources */,
FD3003662A25D5B300B5A5FB /* ConfigMessageReceiveJob.swift in Sources */,
7B81682C28B72F480069F315 /* PendingChange.swift in Sources */, 7B81682C28B72F480069F315 /* PendingChange.swift in Sources */,
FD77289A284AF1BD0018502F /* Sodium+Utilities.swift in Sources */, FD77289A284AF1BD0018502F /* Sodium+Utilities.swift in Sources */,
FD5C7309285007920029977D /* BlindedIdLookup.swift in Sources */, FD5C7309285007920029977D /* BlindedIdLookup.swift in Sources */,

@ -299,8 +299,14 @@ final class VisibleMessageCell: MessageCell, TappableLabelDelegate {
) )
// Profile picture view (should always be handled as a standard 'contact' profile picture) // Profile picture view (should always be handled as a standard 'contact' profile picture)
let profileShouldBeVisible: Bool = (
cellViewModel.canHaveProfile &&
cellViewModel.shouldShowProfile &&
cellViewModel.profile != nil
)
profilePictureViewLeadingConstraint.constant = (isGroupThread ? VisibleMessageCell.groupThreadHSpacing : 0) profilePictureViewLeadingConstraint.constant = (isGroupThread ? VisibleMessageCell.groupThreadHSpacing : 0)
profilePictureView.isHidden = (!cellViewModel.shouldShowProfile || cellViewModel.profile == nil) profilePictureView.isHidden = !cellViewModel.canHaveProfile
profilePictureView.alpha = (profileShouldBeVisible ? 1 : 0)
profilePictureView.update( profilePictureView.update(
publicKey: cellViewModel.authorId, publicKey: cellViewModel.authorId,
threadVariant: .contact, // Always show the display picture in 'contact' mode threadVariant: .contact, // Always show the display picture in 'contact' mode

@ -229,6 +229,12 @@ final class NukeDataModal: Modal {
PushNotificationAPI.unregister(data).sinkUntilComplete() PushNotificationAPI.unregister(data).sinkUntilComplete()
} }
/// Stop and cancel all current jobs (don't want to inadvertantly have a job store data after it's table has already been cleared)
///
/// **Note:** This is file as long as this process kills the app, if it doesn't then we need an alternate mechanism to flag that
/// the `JobRunner` is allowed to start it's queues again
JobRunner.stopAndClearPendingJobs()
// Clear the app badge and notifications // Clear the app badge and notifications
AppEnvironment.shared.notificationPresenter.clearAllNotifications() AppEnvironment.shared.notificationPresenter.clearAllNotifications()
CurrentAppContext().setMainAppBadgeNumber(0) CurrentAppContext().setMainAppBadgeNumber(0)

@ -58,5 +58,6 @@ public enum SNMessagingKit { // Just to make the external API nice
JobRunner.add(executor: GroupLeavingJob.self, for: .groupLeaving) JobRunner.add(executor: GroupLeavingJob.self, for: .groupLeaving)
JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload) JobRunner.add(executor: AttachmentDownloadJob.self, for: .attachmentDownload)
JobRunner.add(executor: ConfigurationSyncJob.self, for: .configurationSync) JobRunner.add(executor: ConfigurationSyncJob.self, for: .configurationSync)
JobRunner.add(executor: ConfigMessageReceiveJob.self, for: .configMessageReceive)
} }
} }

@ -154,6 +154,8 @@ enum _013_SessionUtilChanges: Migration {
.indexed() .indexed()
t.column(.data, .blob) t.column(.data, .blob)
.notNull() .notNull()
t.column(.timestampMs, .integer)
.notNull()
t.primaryKey([.variant, .publicKey]) t.primaryKey([.variant, .publicKey])
} }

@ -23,6 +23,7 @@ enum _014_GenerateInitialUserConfigDumps: Migration {
// Create the initial config state // Create the initial config state
let userPublicKey: String = getUserHexEncodedPublicKey(db) let userPublicKey: String = getUserHexEncodedPublicKey(db)
let timestampMs: Int64 = Int64(Date().timeIntervalSince1970 * 1000)
SessionUtil.loadState(db, userPublicKey: userPublicKey, ed25519SecretKey: secretKey) SessionUtil.loadState(db, userPublicKey: userPublicKey, ed25519SecretKey: secretKey)
@ -56,7 +57,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration {
.createDump( .createDump(
conf: conf, conf: conf,
for: .userProfile, for: .userProfile,
publicKey: userPublicKey publicKey: userPublicKey,
timestampMs: timestampMs
)? )?
.save(db) .save(db)
} }
@ -120,7 +122,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration {
.createDump( .createDump(
conf: conf, conf: conf,
for: .contacts, for: .contacts,
publicKey: userPublicKey publicKey: userPublicKey,
timestampMs: timestampMs
)? )?
.save(db) .save(db)
} }
@ -144,7 +147,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration {
.createDump( .createDump(
conf: conf, conf: conf,
for: .convoInfoVolatile, for: .convoInfoVolatile,
publicKey: userPublicKey publicKey: userPublicKey,
timestampMs: timestampMs
)? )?
.save(db) .save(db)
} }
@ -179,7 +183,8 @@ enum _014_GenerateInitialUserConfigDumps: Migration {
.createDump( .createDump(
conf: conf, conf: conf,
for: .userGroups, for: .userGroups,
publicKey: userPublicKey publicKey: userPublicKey,
timestampMs: timestampMs
)? )?
.save(db) .save(db)
} }

@ -13,6 +13,7 @@ public struct ConfigDump: Codable, Equatable, Hashable, FetchableRecord, Persist
case variant case variant
case publicKey case publicKey
case data case data
case timestampMs
} }
public enum Variant: String, Codable, DatabaseValueConvertible { public enum Variant: String, Codable, DatabaseValueConvertible {
@ -33,14 +34,19 @@ public struct ConfigDump: Codable, Equatable, Hashable, FetchableRecord, Persist
/// The data for this dump /// The data for this dump
public let data: Data public let data: Data
/// When the configDump was created in milliseconds since epoch
public let timestampMs: Int64
internal init( internal init(
variant: Variant, variant: Variant,
publicKey: String, publicKey: String,
data: Data data: Data,
timestampMs: Int64
) { ) {
self.variant = variant self.variant = variant
self.publicKey = publicKey self.publicKey = publicKey
self.data = data self.data = data
self.timestampMs = timestampMs
} }
} }

@ -0,0 +1,63 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import SessionUtilitiesKit
public enum ConfigMessageReceiveJob: JobExecutor {
public static var maxFailureCount: Int = 0
public static var requiresThreadId: Bool = true
public static let requiresInteractionId: Bool = false
public static func run(
_ job: Job,
queue: DispatchQueue,
success: @escaping (Job, Bool) -> (),
failure: @escaping (Job, Error?, Bool) -> (),
deferred: @escaping (Job) -> ()
) {
guard
let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData)
else {
failure(job, JobRunnerError.missingRequiredDetails, true)
return
}
// Ensure no standard messages are sent through this job
guard !details.messages.contains(where: { $0.variant != .sharedConfigMessage }) else {
SNLog("[ConfigMessageReceiveJob] Standard messages incorrectly sent to the 'configMessageReceive' job")
failure(job, MessageReceiverError.invalidMessage, true)
return
}
var lastError: Error?
let sharedConfigMessages: [SharedConfigMessage] = details.messages
.compactMap { $0.message as? SharedConfigMessage }
Storage.shared.write { db in
// Send any SharedConfigMessages to the SessionUtil to handle it
do {
try SessionUtil.handleConfigMessages(
db,
messages: sharedConfigMessages,
publicKey: (job.threadId ?? "")
)
}
catch { lastError = error }
}
// Handle the result
switch lastError {
case let error as MessageReceiverError where !error.isRetryable: failure(job, error, true)
case .some(let error): failure(job, error, false)
case .none: success(job, false)
}
}
}
// MARK: - ConfigMessageReceiveJob.Details
extension ConfigMessageReceiveJob {
typealias Details = MessageReceiveJob.Details
}

@ -25,10 +25,17 @@ public enum MessageReceiveJob: JobExecutor {
return return
} }
// Ensure no config messages are sent through this job
guard !details.messages.contains(where: { $0.variant == .sharedConfigMessage }) else {
SNLog("[MessageReceiveJob] Config messages incorrectly sent to the 'messageReceive' job")
failure(job, MessageReceiverError.invalidSharedConfigMessageHandling, true)
return
}
var updatedJob: Job = job var updatedJob: Job = job
var lastError: Error? var lastError: Error?
var remainingMessagesToProcess: [Details.MessageInfo] = [] var remainingMessagesToProcess: [Details.MessageInfo] = []
let nonConfigMessages: [(info: Details.MessageInfo, proto: SNProtoContent)] = details.messages let messageData: [(info: Details.MessageInfo, proto: SNProtoContent)] = details.messages
.filter { $0.variant != .sharedConfigMessage } .filter { $0.variant != .sharedConfigMessage }
.compactMap { messageInfo -> (info: Details.MessageInfo, proto: SNProtoContent)? in .compactMap { messageInfo -> (info: Details.MessageInfo, proto: SNProtoContent)? in
do { do {
@ -44,19 +51,9 @@ public enum MessageReceiveJob: JobExecutor {
return nil return nil
} }
} }
let sharedConfigMessages: [SharedConfigMessage] = details.messages
.compactMap { $0.message as? SharedConfigMessage }
Storage.shared.write { db in Storage.shared.write { db in
// Send any SharedConfigMessages to the SessionUtil to handle it for (messageInfo, protoContent) in messageData {
try SessionUtil.handleConfigMessages(
db,
messages: sharedConfigMessages,
publicKey: (job.threadId ?? "")
)
// Handle the remaining messages
for (messageInfo, protoContent) in nonConfigMessages {
do { do {
try MessageReceiver.handle( try MessageReceiver.handle(
db, db,
@ -98,6 +95,8 @@ public enum MessageReceiveJob: JobExecutor {
// If any messages failed to process then we want to update the job to only include // If any messages failed to process then we want to update the job to only include
// those failed messages // those failed messages
guard !remainingMessagesToProcess.isEmpty else { return }
updatedJob = try job updatedJob = try job
.with( .with(
details: Details( details: Details(

@ -42,13 +42,14 @@ public final class SharedConfigMessage: ControlMessage {
public init( public init(
kind: Kind, kind: Kind,
seqNo: Int64, seqNo: Int64,
data: Data data: Data,
sentTimestamp: UInt64? = nil
) { ) {
self.kind = kind self.kind = kind
self.seqNo = seqNo self.seqNo = seqNo
self.data = data self.data = data
super.init() super.init(sentTimestamp: sentTimestamp)
} }
// MARK: - Codable // MARK: - Codable

@ -69,7 +69,15 @@ extension MessageReceiver {
guard case let .new(publicKeyAsData, name, encryptionKeyPair, membersAsData, adminsAsData, expirationTimer) = message.kind else { guard case let .new(publicKeyAsData, name, encryptionKeyPair, membersAsData, adminsAsData, expirationTimer) = message.kind else {
return return
} }
guard let sentTimestamp: UInt64 = message.sentTimestamp else { return } guard
let sentTimestamp: UInt64 = message.sentTimestamp,
SessionUtil.canPerformChange(
db,
threadId: publicKeyAsData.toHexString(),
targetConfig: .userGroups,
changeTimestampMs: Int64(sentTimestamp)
)
else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") }
try handleNewClosedGroup( try handleNewClosedGroup(
db, db,
@ -473,16 +481,11 @@ extension MessageReceiver {
let wasCurrentUserRemoved: Bool = !members.contains(userPublicKey) let wasCurrentUserRemoved: Bool = !members.contains(userPublicKey)
if wasCurrentUserRemoved { if wasCurrentUserRemoved {
ClosedGroupPoller.shared.stopPolling(for: threadId) try ClosedGroup.removeKeysAndUnsubscribe(
db,
_ = try closedGroup threadId: threadId,
.keyPairs removeGroupData: true,
.deleteAll(db) calledFromConfigHandling: false
let _ = PushNotificationAPI.performOperation(
.unsubscribe,
for: threadId,
publicKey: userPublicKey
) )
} }
} }
@ -584,27 +587,35 @@ extension MessageReceiver {
return SNLog("Ignoring group update for nonexistent group.") return SNLog("Ignoring group update for nonexistent group.")
} }
// Legacy groups used these control messages for making changes, new groups only use them let timestampMs: Int64 = (
// for information purposes message.sentTimestamp.map { Int64($0) } ??
switch threadVariant { SnodeAPI.currentOffsetTimestampMs()
case .legacyGroup: )
// Check that the message isn't from before the group was created // Only actually make the change if SessionUtil says we can (we always want to insert the info
guard Double(message.sentTimestamp ?? 0) > closedGroup.formationTimestamp else { // message though)
return SNLog("Ignoring legacy group update from before thread was created.") if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs ) {
} // Legacy groups used these control messages for making changes, new groups only use them
// for information purposes
// If these values are missing then we probably won't be able to validly handle the message switch threadVariant {
guard case .legacyGroup:
let allMembers: [GroupMember] = try? closedGroup.allMembers.fetchAll(db), // Check that the message isn't from before the group was created
allMembers.contains(where: { $0.profileId == sender }) guard Double(message.sentTimestamp ?? 0) > closedGroup.formationTimestamp else {
else { return SNLog("Ignoring legacy group update from non-member.") } return SNLog("Ignoring legacy group update from before thread was created.")
}
try legacyGroupChanges(sender, closedGroup, allMembers)
// If these values are missing then we probably won't be able to validly handle the message
case .group: guard
break let allMembers: [GroupMember] = try? closedGroup.allMembers.fetchAll(db),
allMembers.contains(where: { $0.profileId == sender })
default: return // Ignore as invalid else { return SNLog("Ignoring legacy group update from non-member.") }
try legacyGroupChanges(sender, closedGroup, allMembers)
case .group:
break
default: return // Ignore as invalid
}
} }
// Insert the info message for this group control message // Insert the info message for this group control message

@ -248,10 +248,11 @@ public class Poller {
var messageCount: Int = 0 var messageCount: Int = 0
var processedMessages: [Message] = [] var processedMessages: [Message] = []
var hadValidHashUpdate: Bool = false var hadValidHashUpdate: Bool = false
var jobsToRun: [Job] = [] var configMessageJobsToRun: [Job] = []
var standardMessageJobsToRun: [Job] = []
Storage.shared.write { db in Storage.shared.write { db in
allMessages let allProcessedMessages: [ProcessedMessage] = allMessages
.compactMap { message -> ProcessedMessage? in .compactMap { message -> ProcessedMessage? in
do { do {
return try Message.processRawReceivedMessage(db, rawMessage: message) return try Message.processRawReceivedMessage(db, rawMessage: message)
@ -284,6 +285,39 @@ public class Poller {
return nil return nil
} }
} }
// Add a job to process the config messages first
let configJobIds: [Int64] = allProcessedMessages
.filter { $0.messageInfo.variant == .sharedConfigMessage }
.grouped { threadId, _, _, _ in threadId }
.compactMap { threadId, threadMessages in
messageCount += threadMessages.count
processedMessages += threadMessages.map { $0.messageInfo.message }
let jobToRun: Job? = Job(
variant: .configMessageReceive,
behaviour: .runOnce,
threadId: threadId,
details: ConfigMessageReceiveJob.Details(
messages: threadMessages.map { $0.messageInfo },
calledFromBackgroundPoller: calledFromBackgroundPoller
)
)
configMessageJobsToRun = configMessageJobsToRun.appending(jobToRun)
// If we are force-polling then add to the JobRunner so they are
// persistent and will retry on the next app run if they fail but
// don't let them auto-start
let updatedJob: Job? = JobRunner
.add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller)
return updatedJob?.id
}
// Add jobs for processing non-config messages which are dependant on the config message
// processing jobs
allProcessedMessages
.filter { $0.messageInfo.variant != .sharedConfigMessage }
.grouped { threadId, _, _, _ in threadId } .grouped { threadId, _, _, _ in threadId }
.forEach { threadId, threadMessages in .forEach { threadId, threadMessages in
messageCount += threadMessages.count messageCount += threadMessages.count
@ -298,12 +332,29 @@ public class Poller {
calledFromBackgroundPoller: calledFromBackgroundPoller calledFromBackgroundPoller: calledFromBackgroundPoller
) )
) )
jobsToRun = jobsToRun.appending(jobToRun) standardMessageJobsToRun = standardMessageJobsToRun.appending(jobToRun)
// If we are force-polling then add to the JobRunner so they are // If we are force-polling then add to the JobRunner so they are
// persistent and will retry on the next app run if they fail but // persistent and will retry on the next app run if they fail but
// don't let them auto-start // don't let them auto-start
JobRunner.add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller) let updatedJob: Job? = JobRunner
.add(db, job: jobToRun, canStartJob: !calledFromBackgroundPoller)
// Create the dependency between the jobs
if let updatedJobId: Int64 = updatedJob?.id {
do {
try configJobIds.forEach { configJobId in
try JobDependencies(
jobId: updatedJobId,
dependantId: configJobId
)
.insert(db)
}
}
catch {
SNLog("Failed to add dependency between config processing and non-config processing messageReceive jobs.")
}
}
} }
// Clean up message hashes and add some logs about the poll results // Clean up message hashes and add some logs about the poll results
@ -334,11 +385,11 @@ public class Poller {
// We want to try to handle the receive jobs immediately in the background // We want to try to handle the receive jobs immediately in the background
return Publishers return Publishers
.MergeMany( .MergeMany(
jobsToRun.map { job -> AnyPublisher<Void, Error> in configMessageJobsToRun.map { job -> AnyPublisher<Void, Error> in
Deferred { Deferred {
Future<Void, Error> { resolver in Future<Void, Error> { resolver in
// Note: In the background we just want jobs to fail silently // Note: In the background we just want jobs to fail silently
MessageReceiveJob.run( ConfigMessageReceiveJob.run(
job, job,
queue: queue, queue: queue,
success: { _, _ in resolver(Result.success(())) }, success: { _, _ in resolver(Result.success(())) },
@ -351,6 +402,27 @@ public class Poller {
} }
) )
.collect() .collect()
.flatMap { _ in
Publishers
.MergeMany(
standardMessageJobsToRun.map { job -> AnyPublisher<Void, Error> in
Deferred {
Future<Void, Error> { resolver in
// Note: In the background we just want jobs to fail silently
MessageReceiveJob.run(
job,
queue: queue,
success: { _, _ in resolver(Result.success(())) },
failure: { _, _, _ in resolver(Result.success(())) },
deferred: { _ in resolver(Result.success(())) }
)
}
}
.eraseToAnyPublisher()
}
)
.collect()
}
.map { _ in processedMessages } .map { _ in processedMessages }
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }

@ -9,6 +9,11 @@ import SessionUtilitiesKit
// MARK: - Convenience // MARK: - Convenience
internal extension SessionUtil { internal extension SessionUtil {
/// This is a buffer period within which we will process messages which would result in a config change, any message which would normally
/// result in a config change which was sent before `lastConfigMessage.timestamp - configChangeBufferPeriod` will not
/// actually have it's changes applied (info messages would still be inserted though)
static let configChangeBufferPeriod: TimeInterval = (2 * 60)
static let columnsRelatedToThreads: [ColumnExpression] = [ static let columnsRelatedToThreads: [ColumnExpression] = [
SessionThread.Columns.pinnedPriority, SessionThread.Columns.pinnedPriority,
SessionThread.Columns.shouldBeVisible SessionThread.Columns.shouldBeVisible
@ -66,7 +71,8 @@ internal extension SessionUtil {
try SessionUtil.createDump( try SessionUtil.createDump(
conf: conf, conf: conf,
for: variant, for: variant,
publicKey: publicKey publicKey: publicKey,
timestampMs: Int64(Date().timeIntervalSince1970 * 1000)
)?.save(db) )?.save(db)
return config_needs_push(conf) return config_needs_push(conf)
@ -293,6 +299,35 @@ internal extension SessionUtil {
} }
} }
} }
static func canPerformChange(
_ db: Database,
threadId: String,
targetConfig: ConfigDump.Variant,
changeTimestampMs: Int64
) -> Bool {
// FIXME: Remove this once `useSharedUtilForUserConfig` is permanent
guard SessionUtil.userConfigsEnabled(db) else { return true }
let targetPublicKey: String = {
switch targetConfig {
default: return getUserHexEncodedPublicKey(db)
}
}()
let configDumpTimestampMs: Int64 = (try? ConfigDump
.filter(
ConfigDump.Columns.variant == targetConfig &&
ConfigDump.Columns.publicKey == targetPublicKey
)
.select(.timestampMs)
.asRequest(of: Int64.self)
.fetchOne(db))
.defaulting(to: 0)
// Ensure the change occurred after the last config message was handled (minus the buffer period)
return (changeTimestampMs > (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000)))
}
} }
// MARK: - External Outgoing Changes // MARK: - External Outgoing Changes

@ -29,7 +29,7 @@ internal extension SessionUtil {
_ db: Database, _ db: Database,
in conf: UnsafeMutablePointer<config_object>?, in conf: UnsafeMutablePointer<config_object>?,
mergeNeedsDump: Bool, mergeNeedsDump: Bool,
latestConfigUpdateSentTimestamp: TimeInterval latestConfigSentTimestampMs: Int64
) throws { ) throws {
guard mergeNeedsDump else { return } guard mergeNeedsDump else { return }
guard conf != nil else { throw SessionUtilError.nilConfigObject } guard conf != nil else { throw SessionUtilError.nilConfigObject }
@ -219,7 +219,7 @@ internal extension SessionUtil {
.map { $0.profileId }, .map { $0.profileId },
admins: updatedAdmins.map { $0.profileId }, admins: updatedAdmins.map { $0.profileId },
expirationTimer: UInt32(group.disappearingConfig?.durationSeconds ?? 0), expirationTimer: UInt32(group.disappearingConfig?.durationSeconds ?? 0),
formationTimestampMs: UInt64((group.joinedAt ?? Int64(latestConfigUpdateSentTimestamp)) * 1000), formationTimestampMs: UInt64((group.joinedAt.map { $0 * 1000 } ?? latestConfigSentTimestampMs)),
calledFromConfigHandling: true calledFromConfigHandling: true
) )
} }

@ -18,7 +18,7 @@ internal extension SessionUtil {
_ db: Database, _ db: Database,
in conf: UnsafeMutablePointer<config_object>?, in conf: UnsafeMutablePointer<config_object>?,
mergeNeedsDump: Bool, mergeNeedsDump: Bool,
latestConfigUpdateSentTimestamp: TimeInterval latestConfigSentTimestampMs: Int64
) throws { ) throws {
typealias ProfileData = (profileName: String, profilePictureUrl: String?, profilePictureKey: Data?) typealias ProfileData = (profileName: String, profilePictureUrl: String?, profilePictureKey: Data?)
@ -50,7 +50,7 @@ internal extension SessionUtil {
fileName: nil fileName: nil
) )
}(), }(),
sentTimestamp: latestConfigUpdateSentTimestamp, sentTimestamp: (TimeInterval(latestConfigSentTimestampMs) / 1000),
calledFromConfigHandling: true calledFromConfigHandling: true
) )

@ -260,7 +260,8 @@ public enum SessionUtil {
internal static func createDump( internal static func createDump(
conf: UnsafeMutablePointer<config_object>?, conf: UnsafeMutablePointer<config_object>?,
for variant: ConfigDump.Variant, for variant: ConfigDump.Variant,
publicKey: String publicKey: String,
timestampMs: Int64
) throws -> ConfigDump? { ) throws -> ConfigDump? {
guard conf != nil else { throw SessionUtilError.nilConfigObject } guard conf != nil else { throw SessionUtilError.nilConfigObject }
@ -279,7 +280,8 @@ public enum SessionUtil {
return ConfigDump( return ConfigDump(
variant: variant, variant: variant,
publicKey: publicKey, publicKey: publicKey,
data: dumpData data: dumpData,
timestampMs: timestampMs
) )
} }
@ -363,7 +365,8 @@ public enum SessionUtil {
return try? SessionUtil.createDump( return try? SessionUtil.createDump(
conf: conf, conf: conf,
for: message.kind.configDumpVariant, for: message.kind.configDumpVariant,
publicKey: publicKey publicKey: publicKey,
timestampMs: (message.sentTimestamp.map { Int64($0) } ?? 0)
) )
} }
} }
@ -427,9 +430,7 @@ public enum SessionUtil {
let needsPush: Bool = try groupedMessages let needsPush: Bool = try groupedMessages
.sorted { lhs, rhs in lhs.key.processingOrder < rhs.key.processingOrder } .sorted { lhs, rhs in lhs.key.processingOrder < rhs.key.processingOrder }
.reduce(false) { prevNeedsPush, next -> Bool in .reduce(false) { prevNeedsPush, next -> Bool in
let messageSentTimestamp: TimeInterval = TimeInterval( let latestConfigSentTimestampMs: Int64 = Int64(next.value.compactMap { $0.sentTimestamp }.max() ?? 0)
(next.value.compactMap { $0.sentTimestamp }.max() ?? 0) / 1000
)
let needsPush: Bool = try SessionUtil let needsPush: Bool = try SessionUtil
.config(for: next.key, publicKey: publicKey) .config(for: next.key, publicKey: publicKey)
.mutate { conf in .mutate { conf in
@ -453,7 +454,7 @@ public enum SessionUtil {
db, db,
in: conf, in: conf,
mergeNeedsDump: config_needs_dump(conf), mergeNeedsDump: config_needs_dump(conf),
latestConfigUpdateSentTimestamp: messageSentTimestamp latestConfigSentTimestampMs: latestConfigSentTimestampMs
) )
case .contacts: case .contacts:
@ -475,7 +476,7 @@ public enum SessionUtil {
db, db,
in: conf, in: conf,
mergeNeedsDump: config_needs_dump(conf), mergeNeedsDump: config_needs_dump(conf),
latestConfigUpdateSentTimestamp: messageSentTimestamp latestConfigSentTimestampMs: latestConfigSentTimestampMs
) )
} }
} }
@ -486,12 +487,25 @@ public enum SessionUtil {
// Need to check if the config needs to be dumped (this might have changed // Need to check if the config needs to be dumped (this might have changed
// after handling the merge changes) // after handling the merge changes)
guard config_needs_dump(conf) else { return config_needs_push(conf) } guard config_needs_dump(conf) else {
try ConfigDump
.filter(
ConfigDump.Columns.variant == next.key &&
ConfigDump.Columns.publicKey == publicKey
)
.updateAll(
db,
ConfigDump.Columns.timestampMs.set(to: latestConfigSentTimestampMs)
)
return config_needs_push(conf)
}
try SessionUtil.createDump( try SessionUtil.createDump(
conf: conf, conf: conf,
for: next.key, for: next.key,
publicKey: publicKey publicKey: publicKey,
timestampMs: latestConfigSentTimestampMs
)?.save(db) )?.save(db)
return config_needs_push(conf) return config_needs_push(conf)

@ -34,6 +34,7 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable,
public static let currentUserPublicKeyKey: SQL = SQL(stringLiteral: CodingKeys.currentUserPublicKey.stringValue) public static let currentUserPublicKeyKey: SQL = SQL(stringLiteral: CodingKeys.currentUserPublicKey.stringValue)
public static let cellTypeKey: SQL = SQL(stringLiteral: CodingKeys.cellType.stringValue) public static let cellTypeKey: SQL = SQL(stringLiteral: CodingKeys.cellType.stringValue)
public static let authorNameKey: SQL = SQL(stringLiteral: CodingKeys.authorName.stringValue) public static let authorNameKey: SQL = SQL(stringLiteral: CodingKeys.authorName.stringValue)
public static let canHaveProfileKey: SQL = SQL(stringLiteral: CodingKeys.canHaveProfile.stringValue)
public static let shouldShowProfileKey: SQL = SQL(stringLiteral: CodingKeys.shouldShowProfile.stringValue) public static let shouldShowProfileKey: SQL = SQL(stringLiteral: CodingKeys.shouldShowProfile.stringValue)
public static let shouldShowDateHeaderKey: SQL = SQL(stringLiteral: CodingKeys.shouldShowDateHeader.stringValue) public static let shouldShowDateHeaderKey: SQL = SQL(stringLiteral: CodingKeys.shouldShowDateHeader.stringValue)
public static let positionInClusterKey: SQL = SQL(stringLiteral: CodingKeys.positionInCluster.stringValue) public static let positionInClusterKey: SQL = SQL(stringLiteral: CodingKeys.positionInCluster.stringValue)
@ -115,6 +116,9 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable,
/// **Note:** This will only be populated for incoming messages /// **Note:** This will only be populated for incoming messages
public let senderName: String? public let senderName: String?
/// A flag indicating whether the profile view can be displayed
public let canHaveProfile: Bool
/// A flag indicating whether the profile view should be displayed /// A flag indicating whether the profile view should be displayed
public let shouldShowProfile: Bool public let shouldShowProfile: Bool
@ -191,6 +195,7 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable,
cellType: self.cellType, cellType: self.cellType,
authorName: self.authorName, authorName: self.authorName,
senderName: self.senderName, senderName: self.senderName,
canHaveProfile: self.canHaveProfile,
shouldShowProfile: self.shouldShowProfile, shouldShowProfile: self.shouldShowProfile,
shouldShowDateHeader: self.shouldShowDateHeader, shouldShowDateHeader: self.shouldShowDateHeader,
containsOnlyEmoji: self.containsOnlyEmoji, containsOnlyEmoji: self.containsOnlyEmoji,
@ -393,6 +398,11 @@ public struct MessageViewModel: FetchableRecordWithRowId, Decodable, Equatable,
return authorDisplayName return authorDisplayName
}(), }(),
canHaveProfile: (
// Only group threads and incoming messages
isGroupThread &&
self.variant == .standardIncoming
),
shouldShowProfile: ( shouldShowProfile: (
// Only group threads // Only group threads
isGroupThread && isGroupThread &&
@ -564,6 +574,7 @@ public extension MessageViewModel {
self.cellType = cellType self.cellType = cellType
self.authorName = "" self.authorName = ""
self.senderName = nil self.senderName = nil
self.canHaveProfile = false
self.shouldShowProfile = false self.shouldShowProfile = false
self.shouldShowDateHeader = false self.shouldShowDateHeader = false
self.containsOnlyEmoji = nil self.containsOnlyEmoji = nil
@ -733,6 +744,7 @@ public extension MessageViewModel {
-- query from crashing when decoding we need to provide default values -- query from crashing when decoding we need to provide default values
\(CellType.textOnlyMessage) AS \(ViewModel.cellTypeKey), \(CellType.textOnlyMessage) AS \(ViewModel.cellTypeKey),
'' AS \(ViewModel.authorNameKey), '' AS \(ViewModel.authorNameKey),
false AS \(ViewModel.canHaveProfileKey),
false AS \(ViewModel.shouldShowProfileKey), false AS \(ViewModel.shouldShowProfileKey),
false AS \(ViewModel.shouldShowDateHeaderKey), false AS \(ViewModel.shouldShowDateHeaderKey),
\(Position.middle) AS \(ViewModel.positionInClusterKey), \(Position.middle) AS \(ViewModel.positionInClusterKey),

@ -111,6 +111,11 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord,
/// This is a job that runs once whenever the user config or a closed group config changes, it retrieves the /// This is a job that runs once whenever the user config or a closed group config changes, it retrieves the
/// state of all config objects and syncs any that are flagged as needing to be synced /// state of all config objects and syncs any that are flagged as needing to be synced
case configurationSync case configurationSync
/// This is a job that runs once whenever a config message is received to attempt to decode it and update the
/// config state with the changes; this job will generally be scheduled along since a `messageReceive` job
/// and will block the standard message receive job
case configMessageReceive
} }
public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable { public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {

@ -80,7 +80,8 @@ public final class JobRunner {
executionType: .serial, executionType: .serial,
qos: .default, qos: .default,
jobVariants: [ jobVariants: [
jobVariants.remove(.messageReceive) jobVariants.remove(.messageReceive),
jobVariants.remove(.configMessageReceive)
].compactMap { $0 } ].compactMap { $0 }
) )
let attachmentDownloadQueue: JobQueue = JobQueue( let attachmentDownloadQueue: JobQueue = JobQueue(
@ -127,15 +128,15 @@ public final class JobRunner {
/// ///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started /// is in the future then the job won't be started
public static func add(_ db: Database, job: Job?, canStartJob: Bool = true) { @discardableResult public static func add(_ db: Database, job: Job?, canStartJob: Bool = true) -> Job? {
// Store the job into the database (getting an id for it) // Store the job into the database (getting an id for it)
guard let updatedJob: Job = try? job?.inserted(db) else { guard let updatedJob: Job = try? job?.inserted(db) else {
SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job")
return return nil
} }
guard !canStartJob || updatedJob.id != nil else { guard !canStartJob || updatedJob.id != nil else {
SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id")
return return nil
} }
// Wait until the transaction has been completed before updating the queue (to ensure anything // Wait until the transaction has been completed before updating the queue (to ensure anything
@ -149,6 +150,8 @@ public final class JobRunner {
queues.wrappedValue[updatedJob.variant]?.start() queues.wrappedValue[updatedJob.variant]?.start()
} }
return updatedJob
} }
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start

Loading…
Cancel
Save