fix a bug on getting expirations from snode

pull/941/head
Ryan Zhao 1 year ago
parent 43e38c5644
commit 9dfaa09a95

@ -78,7 +78,7 @@ public extension DisappearingMessagesJob {
.saved(db)
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, lastReadTimestampMs: Int64, threadId: String) -> Job? {
static func updateNextRunIfNeeded(_ db: Database, lastReadTimestampMs: Int64, threadId: String) {
let messageHashes: [String] = (try? Interaction
.filter(
Interaction.Columns.threadId == threadId &&
@ -89,9 +89,12 @@ public extension DisappearingMessagesJob {
.select(Interaction.Columns.serverHash)
.fetchAll(db))
.defaulting(to: [])
.compactMap{ $0 }
// If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages
guard (messageHashes.count > 0) else { return nil }
guard (messageHashes.count > 0) else { return }
let timestampMs: Int64 = SnodeAPI.currentOffsetTimestampMs()
let userPublicKey: String = getUserHexEncodedPublicKey(db)
SnodeAPI.getSwarm(for: userPublicKey)
@ -104,24 +107,36 @@ public extension DisappearingMessagesJob {
)
.map { (_, response) in
Storage.shared.writeAsync { db in
try response.expiries.forEach { hash, exipreAtMs in
let expiresInSeconds: TimeInterval = TimeInterval((exipreAtMs - UInt64(lastReadTimestampMs)) / 1000)
try response.expiries.forEach { hash, expireAtMs in
let expiresInSeconds: TimeInterval = TimeInterval((expireAtMs - UInt64(timestampMs))) / 1000
_ = try Interaction
.filter(Interaction.Columns.serverHash == hash)
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: Double(lastReadTimestampMs)),
Interaction.Columns.expiresInSeconds.set(to: expiresInSeconds)
)
}
_ = try Interaction
.filter(messageHashes.contains(Interaction.Columns.serverHash))
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: TimeInterval(timestampMs))
)
JobRunner.upsert(
db,
job: updateNextRunIfNeeded(db)
)
}
}
.mapError { error in
return error
}
.eraseToAnyPublisher()
}
.sinkUntilComplete ()
return updateNextRunIfNeeded(db)
.sinkUntilComplete()
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? {

@ -150,13 +150,10 @@ internal extension SessionUtil {
Interaction.Columns.wasRead.set(to: true)
)
// Update old disappearing after read messages to start
JobRunner.upsert(
DisappearingMessagesJob.updateNextRunIfNeeded(
db,
job: DisappearingMessagesJob.updateNextRunIfNeeded(
db,
lastReadTimestampMs: lastReadTimestampMs,
threadId: threadId
)
lastReadTimestampMs: lastReadTimestampMs,
threadId: threadId
)
return nil
}

@ -4,7 +4,7 @@ import Foundation
import Sodium
import SessionUtilitiesKit
public class GetExpiriesResponse: SnodeResponse {
public class GetExpiriesResponse: Codable {
private enum CodingKeys: String, CodingKey {
case expiries
}
@ -13,11 +13,9 @@ public class GetExpiriesResponse: SnodeResponse {
// MARK: - Initialization
required init(from decoder: Decoder) throws {
required public init(from decoder: Decoder) throws {
let container: KeyedDecodingContainer<CodingKeys> = try decoder.container(keyedBy: CodingKeys.self)
expiries = ((try? container.decode([String: UInt64].self, forKey: .expiries)) ?? [:])
try super.init(from: decoder)
}
}

Loading…
Cancel
Save