fix an issue where message is not received & minor refactor on disappearing message job

pull/941/head
ryanzhao 2 years ago
parent ee5de25d4a
commit 5063feeb6a

@ -253,6 +253,10 @@ extension DisappearingMessagesConfiguration {
] ]
.map { TimeInterval($0) } .map { TimeInterval($0) }
#if targetEnvironment(simulator) #if targetEnvironment(simulator)
result.insert(
TimeInterval(60),
at: 0
)
result.insert( result.insert(
TimeInterval(10), TimeInterval(10),
at: 0 at: 0
@ -268,6 +272,10 @@ extension DisappearingMessagesConfiguration {
] ]
.map { TimeInterval($0) } .map { TimeInterval($0) }
#if targetEnvironment(simulator) #if targetEnvironment(simulator)
result.insert(
TimeInterval(60),
at: 0
)
result.insert( result.insert(
TimeInterval(10), TimeInterval(10),
at: 0 at: 0

@ -79,21 +79,30 @@ public extension DisappearingMessagesJob {
} }
static func updateNextRunIfNeeded(_ db: Database, lastReadTimestampMs: Int64, threadId: String) { static func updateNextRunIfNeeded(_ db: Database, lastReadTimestampMs: Int64, threadId: String) {
let messageHashes: [String] = (try? Interaction struct ExpirationInfo: Codable, Hashable, FetchableRecord {
let expiresInSeconds: TimeInterval
let serverHash: String
}
var expirationInfo: [String: TimeInterval] = (try? Interaction
.filter( .filter(
Interaction.Columns.threadId == threadId && Interaction.Columns.threadId == threadId &&
Interaction.Columns.timestampMs <= lastReadTimestampMs && Interaction.Columns.timestampMs <= lastReadTimestampMs &&
Interaction.Columns.expiresInSeconds > 0 && Interaction.Columns.expiresInSeconds > 0 &&
Interaction.Columns.expiresStartedAtMs == nil Interaction.Columns.expiresStartedAtMs == nil
) )
.select(Interaction.Columns.serverHash) .select(
Interaction.Columns.expiresInSeconds,
Interaction.Columns.serverHash
)
.asRequest(of: ExpirationInfo.self)
.fetchAll(db)) .fetchAll(db))
.defaulting(to: []) .defaulting(to: [])
.compactMap{ $0 } .grouped(by: \.serverHash)
.compactMapValues{ $0.first?.expiresInSeconds }
// If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages // If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages
guard (messageHashes.count > 0) else { return } guard (expirationInfo.count > 0) else { return }
let timestampMs: Int64 = SnodeAPI.currentOffsetTimestampMs() let timestampMs: Int64 = SnodeAPI.currentOffsetTimestampMs()
let userPublicKey: String = getUserHexEncodedPublicKey(db) let userPublicKey: String = getUserHexEncodedPublicKey(db)
@ -103,27 +112,33 @@ public extension DisappearingMessagesJob {
return SnodeAPI.getExpiries( return SnodeAPI.getExpiries(
from: snode, from: snode,
associatedWith: userPublicKey, associatedWith: userPublicKey,
of: messageHashes of: expirationInfo.map { $0.key }
) )
.map { (_, response) in .map { (_, response) in
Storage.shared.writeAsync { db in Storage.shared.writeAsync { db in
try response.expiries.forEach { hash, expireAtMs in try response.expiries.forEach { hash, expireAtMs in
let expiresInSeconds: TimeInterval = TimeInterval((expireAtMs - UInt64(timestampMs))) / 1000 guard let expireInSeconds: TimeInterval = expirationInfo[hash] else { return }
let startAtMs: TimeInterval = TimeInterval(expireAtMs - UInt64(expireInSeconds * 1000))
_ = try Interaction _ = try Interaction
.filter(Interaction.Columns.serverHash == hash) .filter(Interaction.Columns.serverHash == hash)
.updateAll( .updateAll(
db, db,
Interaction.Columns.expiresInSeconds.set(to: expiresInSeconds) Interaction.Columns.expiresStartedAtMs.set(to: startAtMs)
) )
guard let index = expirationInfo.index(forKey: hash) else { return }
expirationInfo.remove(at: index)
} }
_ = try Interaction try expirationInfo.forEach { key, _ in
.filter(messageHashes.contains(Interaction.Columns.serverHash)) _ = try Interaction
.updateAll( .filter(Interaction.Columns.serverHash == key)
db, .updateAll(
Interaction.Columns.expiresStartedAtMs.set(to: TimeInterval(timestampMs)) db,
) Interaction.Columns.expiresStartedAtMs.set(to: timestampMs)
)
}
JobRunner.upsert( JobRunner.upsert(
db, db,

@ -183,6 +183,8 @@ extension MessageReceiver {
) )
let updateControlMewssage: () throws -> () = { let updateControlMewssage: () throws -> () = {
guard message is ExpirationTimerUpdate else { return }
_ = try Interaction _ = try Interaction
.filter(Interaction.Columns.threadId == threadId) .filter(Interaction.Columns.threadId == threadId)
.filter(Interaction.Columns.variant == Interaction.Variant.infoDisappearingMessagesUpdate) .filter(Interaction.Columns.variant == Interaction.Variant.infoDisappearingMessagesUpdate)
@ -243,8 +245,6 @@ extension MessageReceiver {
} }
} }
guard message is ExpirationTimerUpdate else { return }
try updateControlMewssage() try updateControlMewssage()
} }
} }

Loading…
Cancel
Save