WIP: implement new approach to sync disappearing after read across devices

pull/941/head
ryanzhao 1 year ago
parent b05bb7f9bd
commit 2b85c0fde0

@ -72,6 +72,57 @@ public extension DisappearingMessagesJob {
.saved(db)
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, lastReadTimestampMs: Int64, threadId: String) -> Job? {
let messageHashes: [String] = (try? Interaction
.filter(
Interaction.Columns.threadId == threadId &&
Interaction.Columns.timestampMs <= lastReadTimestampMs &&
Interaction.Columns.expiresInSeconds != nil &&
Interaction.Columns.expiresStartedAtMs == nil
)
.select(Interaction.Columns.serverHash)
.fetchAll(db))
.defaulting(to: [])
// If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages
guard (messageHashes.count > 0) else { return nil }
// Update the expiring messages expiresStartedAtMs value
_ = try? Interaction
.filter(messageHashes.contains(Interaction.Columns.serverHash))
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: Double(lastReadTimestampMs))
)
let userPublicKey: String = getUserHexEncodedPublicKey(db)
SnodeAPI.getSwarm(for: userPublicKey)
.tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in
guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return SnodeAPI.getExpiries(
from: snode,
associatedWith: userPublicKey,
of: messageHashes
)
}
.map { response in
Storage.shared.writeAsync { db in
try response.1.expiries.forEach { hash, exipreAtMs in
let expiresInSeconds: TimeInterval = TimeInterval((exipreAtMs - UInt64(lastReadTimestampMs)) / 1000)
_ = try Interaction
.filter(Interaction.Columns.serverHash == hash)
.updateAll(
db,
Interaction.Columns.expiresInSeconds.set(to: expiresInSeconds)
)
}
}
}
return updateNextRunIfNeeded(db)
}
@discardableResult static func updateNextRunIfNeeded(_ db: Database, interactionIds: [Int64], startedAtMs: Double, threadId: String) -> Job? {
struct ExpirationInfo: Codable, Hashable, FetchableRecord {
let id: Int64
@ -102,7 +153,10 @@ public extension DisappearingMessagesJob {
Interaction.Columns.expiresInSeconds != nil &&
Interaction.Columns.expiresStartedAtMs == nil
)
.updateAll(db, Interaction.Columns.expiresStartedAtMs.set(to: startedAtMs))
.updateAll(
db,
Interaction.Columns.expiresStartedAtMs.set(to: startedAtMs)
)
// If there were no changes then none of the provided `interactionIds` are expiring messages
guard (changeCount ?? 0) > 0 else { return nil }

@ -152,6 +152,15 @@ internal extension SessionUtil {
db,
Interaction.Columns.wasRead.set(to: true)
)
// Update old disappearing after read messages to start
JobRunner.upsert(
db,
job: DisappearingMessagesJob.updateNextRunIfNeeded(
db,
lastReadTimestampMs: lastReadTimestampMs,
threadId: threadId
)
)
return nil
}

Loading…
Cancel
Save