Further work on the JobRunner

Fixed an issue where the hash retrieved when fetching messages from the service node might not be the latest one
Updated the MessageReceiveJob to batch process messages (on failure only the failed messages will retry)
pull/612/head
Morgan Pretty 3 years ago
parent ed9f4ea6c6
commit 94742c80ec

@ -45,33 +45,76 @@ public final class BackgroundPoller : NSObject {
private static func getMessages(for publicKey: String) -> Promise<Void> { private static func getMessages(for publicKey: String) -> Promise<Void> {
return SnodeAPI.getSwarm(for: publicKey).then(on: DispatchQueue.main) { swarm -> Promise<Void> in return SnodeAPI.getSwarm(for: publicKey).then(on: DispatchQueue.main) { swarm -> Promise<Void> in
guard let snode = swarm.randomElement() else { throw SnodeAPI.Error.generic } guard let snode = swarm.randomElement() else { throw SnodeAPI.Error.generic }
return attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.main) { return attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.main) {
return SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise<Void> in return SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise<Void> in
let messages: [SnodeReceivedMessage] = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey) let messages: [SnodeReceivedMessage] = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey)
let promises = messages.compactMap { message -> Promise<Void>? in
// Use a best attempt approach here; we don't want to fail the entire process
// if one of the messages failed to parse
guard
let envelope = SNProtoEnvelope.from(message),
let data = try? envelope.serializedData()
else { return nil }
let job = MessageReceiveJob(
data: data,
serverHash: message.info.hash,
isBackgroundPoll: true
)
return job.execute()
}
// Now that the MessageReceiveJob's have been created we can persist the received messages guard !messages.isEmpty else { return Promise.value(()) }
if !messages.isEmpty {
GRDBStorage.shared.write { db in var jobsToRun: [Job] = []
messages.forEach { try? $0.info.save(db) }
GRDBStorage.shared.write { db in
var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:]
// TODO: Test this updated logic
messages.forEach { message in
guard let envelope = SNProtoEnvelope.from(message) else { return }
// Extract the threadId and add that to the messageReceive job for
// multi-threading and garbage collection purposes
let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope)
do {
threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? [])
.appending(
MessageReceiveJob.Details.MessageInfo(
data: try envelope.serializedData(),
serverHash: message.info.hash
)
)
// Persist the received message after the MessageReceiveJob is created
_ = try message.info.saved(db)
}
catch {
SNLog("Failed to deserialize envelope due to error: \(error).")
}
} }
threadMessages
.forEach { threadId, threadMessages in
let maybeJob: Job? = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: threadId,
details: MessageReceiveJob.Details(
messages: threadMessages,
isBackgroundPoll: false
)
)
guard let job: Job = maybeJob else { return }
JobRunner.add(db, job: job)
jobsToRun.append(job)
}
} }
return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects let promises = jobsToRun.compactMap { job -> Promise<Void>? in
let (promise, seal) = Promise<Void>.pending()
// Note: In the background we just want jobs to fail silently
MessageReceiveJob.run(
job,
success: { _, _ in seal.fulfill(()) },
failure: { _, _, _ in seal.fulfill(()) },
deferred: { _ in seal.fulfill(()) }
)
return promise
}
return when(fulfilled: promises)
} }
} }
} }

@ -814,8 +814,12 @@ enum _003_YDBToGRDBMigration: Migration {
nextRunTimestamp: 0, nextRunTimestamp: 0,
threadId: threadId, threadId: threadId,
details: MessageReceiveJob.Details( details: MessageReceiveJob.Details(
data: legacyJob.data, messages: [
serverHash: legacyJob.serverHash, MessageReceiveJob.Details.MessageInfo(
data: legacyJob.data,
serverHash: legacyJob.serverHash
)
],
isBackgroundPoll: legacyJob.isBackgroundPoll isBackgroundPoll: legacyJob.isBackgroundPoll
) )
)?.inserted(db) )?.inserted(db)

@ -23,33 +23,66 @@ public enum MessageReceiveJob: JobExecutor {
return return
} }
var processingError: Error? var updatedJob: Job = job
var leastSevereError: Error?
GRDBStorage.shared.write { db in GRDBStorage.shared.write { db in
do { var remainingMessagesToProcess: [Details.MessageInfo] = []
let isRetry: Bool = (job.failureCount > 0)
let (message, proto) = try MessageReceiver.parse( for messageInfo in details.messages {
db, do {
data: details.data, // Note: The main reason why the 'MessageReceiver.parse' can fail but then succeed
isRetry: isRetry // later on is when we get a closed group message which is signed using a new key
) // but haven't received the key yet (the key gets sent directly to the user rather
message.serverHash = details.serverHash // than via the closed group so this is unfortunately a possible case)
let isRetry: Bool = (job.failureCount > 0)
try MessageReceiver.handle( let (message, proto) = try MessageReceiver.parse(
db, db,
message: message, data: messageInfo.data,
associatedWithProto: proto, isRetry: isRetry
openGroupId: nil, )
isBackgroundPoll: details.isBackgroundPoll message.serverHash = messageInfo.serverHash
)
} try MessageReceiver.handle(
catch { db,
processingError = error message: message,
associatedWithProto: proto,
openGroupId: nil,
isBackgroundPoll: details.isBackgroundPoll
)
}
catch {
// We failed to process this message so add it to the list to re-process
remainingMessagesToProcess.append(messageInfo)
// If the current message is a permanent failure then override it with the new error (we want
// to retry if there is a single non-permanent error)
switch leastSevereError {
case let error as MessageReceiverError where !error.isRetryable:
leastSevereError = error
default: break
}
}
} }
// If any messages failed to process then we want to update the job to only include
// those failed messages
updatedJob = try job
.with(
details: Details(
messages: remainingMessagesToProcess,
isBackgroundPoll: details.isBackgroundPoll
)
)
.defaulting(to: job)
.saved(db)
}
} }
// Handle the result // Handle the result
switch processingError { switch leastSevereError {
case let error as MessageReceiverError where !error.isRetryable: case let error as MessageReceiverError where !error.isRetryable:
SNLog("Message receive job permanently failed due to error: \(error)") SNLog("Message receive job permanently failed due to error: \(error)")
failure(job, error, true) failure(job, error, true)
@ -68,8 +101,28 @@ public enum MessageReceiveJob: JobExecutor {
extension MessageReceiveJob { extension MessageReceiveJob {
public struct Details: Codable { public struct Details: Codable {
public let data: Data public struct MessageInfo: Codable {
public let serverHash: String? public let data: Data
public let serverHash: String?
public init(
data: Data,
serverHash: String?
) {
self.data = data
self.serverHash = serverHash
}
}
public let messages: [MessageInfo]
public let isBackgroundPoll: Bool public let isBackgroundPoll: Bool
public init(
messages: [MessageInfo],
isBackgroundPoll: Bool
) {
self.messages = messages
self.isBackgroundPoll = isBackgroundPoll
}
} }
} }

@ -33,6 +33,9 @@ enum _001_InitialSetupMigration: Migration {
} }
try db.create(table: SnodeReceivedMessageInfo.self) { t in try db.create(table: SnodeReceivedMessageInfo.self) { t in
t.column(.id, .integer)
.notNull()
.primaryKey(autoincrement: true)
t.column(.key, .text) t.column(.key, .text)
.notNull() .notNull()
.indexed() .indexed()
@ -41,7 +44,7 @@ enum _001_InitialSetupMigration: Migration {
.notNull() .notNull()
.indexed() .indexed()
t.primaryKey([.key, .hash]) t.uniqueKey([.key, .hash])
} }
} }
} }

@ -7,7 +7,6 @@ import SessionUtilitiesKit
enum _002_YDBToGRDBMigration: Migration { enum _002_YDBToGRDBMigration: Migration {
static let identifier: String = "YDBToGRDBMigration" static let identifier: String = "YDBToGRDBMigration"
// TODO: Autorelease pool???
static func migrate(_ db: Database) throws { static func migrate(_ db: Database) throws {
// MARK: - OnionRequestPath, Snode Pool & Swarm // MARK: - OnionRequestPath, Snode Pool & Swarm
@ -126,20 +125,20 @@ enum _002_YDBToGRDBMigration: Migration {
try autoreleasepool { try autoreleasepool {
try receivedMessageResults.forEach { key, hashes in try receivedMessageResults.forEach { key, hashes in
try hashes.forEach { hash in try hashes.forEach { hash in
try SnodeReceivedMessageInfo( _ = try SnodeReceivedMessageInfo(
key: key, key: key,
hash: hash, hash: hash,
expirationDateMs: 0 expirationDateMs: 0
).insert(db) ).inserted(db)
} }
} }
try lastMessageResults.forEach { key, data in try lastMessageResults.forEach { key, data in
try SnodeReceivedMessageInfo( _ = try SnodeReceivedMessageInfo(
key: key, key: key,
hash: data.hash, hash: data.hash,
expirationDateMs: ((data.json["expirationDate"] as? Int64) ?? 0) expirationDateMs: ((data.json["expirationDate"] as? Int64) ?? 0)
).insert(db) ).inserted(db)
} }
} }
} }

@ -4,19 +4,39 @@ import Foundation
import GRDB import GRDB
import SessionUtilitiesKit import SessionUtilitiesKit
public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "snodeReceivedMessageInfo" } public static var databaseTableName: String { "snodeReceivedMessageInfo" }
public typealias Columns = CodingKeys public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression { public enum CodingKeys: String, CodingKey, ColumnExpression {
case id
case key case key
case hash case hash
case expirationDateMs case expirationDateMs
} }
/// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into
/// the database yet this value will be `nil`
public var id: Int64? = nil
/// The key this message hash is associated to
///
/// This will be a combination of {address}.{port}.{publicKey} for new rows and just the {publicKey} for legacy rows
public let key: String public let key: String
/// The is the hash for the received message
public let hash: String public let hash: String
/// This is the timestamp (in milliseconds since epoch) when the message hash should expire
///
/// **Note:** A value of `0` means this hash should not expire
public let expirationDateMs: Int64 public let expirationDateMs: Int64
// MARK: - Custom Database Interaction
public mutating func didInsert(with rowID: Int64, for column: String?) {
self.id = rowID
}
} }
// MARK: - Convenience // MARK: - Convenience
@ -51,12 +71,17 @@ public extension SnodeReceivedMessageInfo {
} }
} }
/// This method fetches the last non-expired hash from the database for message retrieval
///
/// **Note:** This method uses a `write` instead of a read because there is a single write queue for the database and it's very common for
/// this method to be called after the hash value has been updated but before the various `read` threads have been updated, resulting in a
/// pointless fetch for data the app has already received
static func fetchLastNotExpired(for snode: Snode, associatedWith publicKey: String) -> SnodeReceivedMessageInfo? { static func fetchLastNotExpired(for snode: Snode, associatedWith publicKey: String) -> SnodeReceivedMessageInfo? {
return GRDBStorage.shared.read { db in return GRDBStorage.shared.write { db in
try? SnodeReceivedMessageInfo try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey)) .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey))
.order(SnodeReceivedMessageInfo.Columns.expirationDateMs) .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000))
.reversed() .order(SnodeReceivedMessageInfo.Columns.id.desc)
.fetchOne(db) .fetchOne(db)
} }
} }

Loading…
Cancel
Save