use new endpoint to shorten messages' TTL

pull/941/head
Ryan Zhao 1 year ago
parent 99610cf374
commit 73e05ec19e

@ -75,8 +75,25 @@ public extension DisappearingMessagesJob {
struct ExpirationInfo: Codable, Hashable, FetchableRecord {
let id: Int64
let expiresInSeconds: TimeInterval
let serverHash: String
}
let interactionExpirationInfosByExpiresInSeconds: [TimeInterval: [ExpirationInfo]] = (try? Interaction
.filter(interactionIds.contains(Interaction.Columns.id))
.filter(
Interaction.Columns.expiresInSeconds != nil &&
Interaction.Columns.expiresStartedAtMs == nil
)
.select(
Interaction.Columns.id,
Interaction.Columns.expiresInSeconds,
Interaction.Columns.serverHash
)
.asRequest(of: ExpirationInfo.self)
.fetchAll(db))
.defaulting(to: [])
.grouped(by: \.expiresInSeconds)
// Update the expiring messages expiresStartedAtMs value
let changeCount: Int? = try? Interaction
.filter(interactionIds.contains(Interaction.Columns.id))
@ -89,6 +106,33 @@ public extension DisappearingMessagesJob {
// If there were no changes then none of the provided `interactionIds` are expiring messages
guard (changeCount ?? 0) > 0 else { return nil }
interactionExpirationInfosByExpiresInSeconds.forEach { expiresInSeconds, expirationInfos in
let expirationTimestampMs: Int64 = Int64(ceil(startedAtMs + expiresInSeconds * 1000))
SnodeAPI.updateExpiry(
publicKey: getUserHexEncodedPublicKey(db),
updatedExpiryMs: expirationTimestampMs,
serverHashes: expirationInfos.map { $0.serverHash },
shortenOnly: true
).map2 { results in
var unchangedMessages: [String: UInt64] = [:]
results.forEach { _, result in
guard let unchanged = result.unchanged else { return }
unchangedMessages.merge(unchanged) { (current, _) in current }
}
guard !unchangedMessages.isEmpty else { return }
unchangedMessages.forEach { serverHash, serverExpirationTimestampMs in
let expiresInSeconds: TimeInterval = (TimeInterval(serverExpirationTimestampMs) - startedAtMs) / 1000
_ = try? Interaction
.filter(Interaction.Columns.serverHash == serverHash)
.updateAll(db, Interaction.Columns.expiresInSeconds.set(to: expiresInSeconds))
}
}.retainUntilComplete()
}
return updateNextRunIfNeeded(db)
}

@ -772,9 +772,9 @@ public final class SnodeAPI {
publicKey: String,
updatedExpiryMs: Int64,
serverHashes: [String],
shortenOnly: Bool = true,
shortenOnly: Bool = false,
extendOnly: Bool = false
) -> Promise<[String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64])]> {
) -> Promise<[String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64]?)]> {
guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else {
return Promise(error: SnodeAPIError.noKeyPair)
}
@ -796,7 +796,7 @@ public final class SnodeAPI {
return attempt(maxRetryCount: maxRetryCount, recoveringOn: Threading.workQueue) {
getSwarm(for: publicKey)
.then2 { swarm -> Promise<[String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64])]> in
.then2 { swarm -> Promise<[String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64]?)]> in
// "expire" || ShortenOrExtend || expiry || messages[0] || ... || messages[N]
let verificationBytes = SnodeAPIEndpoint.expire.rawValue.bytes
.appending(contentsOf: shortenOrExtend?.data(using: .ascii)?.bytes)
@ -823,13 +823,13 @@ public final class SnodeAPI {
return attempt(maxRetryCount: maxRetryCount, recoveringOn: Threading.workQueue) {
invoke(.expire, on: snode, associatedWith: publicKey, parameters: parameters)
.map2 { responseData -> [String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64])] in
.map2 { responseData -> [String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64]?)] in
guard let responseJson: JSON = try? JSONSerialization.jsonObject(with: responseData, options: [ .fragmentsAllowed ]) as? JSON else {
throw HTTP.Error.invalidJSON
}
guard let swarm = responseJson["swarm"] as? JSON else { throw HTTP.Error.invalidJSON }
var result: [String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64])] = [:]
var result: [String: (hashes: [String], expiry: UInt64, unchanged: [String: UInt64]?)] = [:]
for (snodePublicKey, rawJSON) in swarm {
guard let json = rawJSON as? JSON else { throw HTTP.Error.invalidJSON }
@ -846,13 +846,14 @@ public final class SnodeAPI {
guard
let hashes: [String] = json["updated"] as? [String],
let unchanged: [String: UInt64] = json["unchanged"] as? [String: UInt64],
let expiryApplied: UInt64 = json["expiry"] as? UInt64,
let signature: String = json["signature"] as? String
else {
throw HTTP.Error.invalidJSON
}
let maybeUnchanged: [String: UInt64]? = json["unchanged"] as? [String: UInt64]
// The signature format is ( PUBKEY_HEX || EXPIRY || RMSGs... || UMSGs... || CMSG_EXPs... )
// where RMSGs are the requested expiry hashes, UMSGs are the actual updated hashes, and
// CMSG_EXPs are (HASH || EXPIRY) values, ascii-sorted by hash, for the unchanged message
@ -861,7 +862,7 @@ public final class SnodeAPI {
.appending(contentsOf: "\(expiryApplied)".data(using: .ascii)?.bytes)
.appending(contentsOf: serverHashes.joined().bytes)
.appending(contentsOf: hashes.joined().bytes)
.appending(contentsOf: unchanged.map { "\($0)\($1)" }.sorted().joined().bytes)
.appending(contentsOf: maybeUnchanged?.map { "\($0)\($1)" }.sorted().joined().bytes)
let isValid = sodium.sign.verify(
message: verificationBytes,
publicKey: Bytes(Data(hex: snodePublicKey)),
@ -873,7 +874,7 @@ public final class SnodeAPI {
throw SnodeAPIError.signatureVerificationFailed
}
result[snodePublicKey] = (hashes, expiryApplied, unchanged)
result[snodePublicKey] = (hashes, expiryApplied, maybeUnchanged)
}
return result

Loading…
Cancel
Save