diff --git a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift index cd00ea312..2e968c007 100644 --- a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift +++ b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift @@ -86,8 +86,8 @@ public extension DisappearingMessagesJob { Interaction.Columns.expiresInSeconds != nil && Interaction.Columns.expiresStartedAtMs == nil ) - .select(Interaction.Columns.serverHash) - .fetchAll(db)) + .select(Interaction.Columns.serverHash) + .fetchAll(db)) .defaulting(to: []) // If there were no message hashes then none of the messages sent before lastReadTimestampMs are expiring messages @@ -103,18 +103,16 @@ public extension DisappearingMessagesJob { let userPublicKey: String = getUserHexEncodedPublicKey(db) SnodeAPI.getSwarm(for: userPublicKey) - .tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in + .tryFlatMap { swarm -> AnyPublisher in guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } return SnodeAPI.getExpiries( from: snode, associatedWith: userPublicKey, of: messageHashes ) - } - .sinkUntilComplete ( - receiveValue: { response in + .map { (_, response) in Storage.shared.writeAsync { db in - try response.1.expiries.forEach { hash, exipreAtMs in + try response.expiries.forEach { hash, exipreAtMs in let expiresInSeconds: TimeInterval = TimeInterval((exipreAtMs - UInt64(lastReadTimestampMs)) / 1000) _ = try Interaction @@ -126,7 +124,9 @@ public extension DisappearingMessagesJob { } } } - ) + .eraseToAnyPublisher() + } + .sinkUntilComplete () return updateNextRunIfNeeded(db) }