fix an edge case which may cause old messages coming back

pull/629/head
Ryan Zhao 3 years ago
parent ab7bd24ad5
commit 987db2f7ab

@ -42,16 +42,19 @@ public final class BackgroundPoller : NSObject {
return attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.main) { return attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.main) {
SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise<Void> in SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise<Void> in
let (messages, lastRawMessage) = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey) let (messages, lastRawMessage) = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey)
var processedMessages: [JSON] = []
let promises = messages.compactMap { json -> Promise<Void>? in let promises = messages.compactMap { json -> Promise<Void>? in
// Use a best attempt approach here; we don't want to fail the entire process if one of the // Use a best attempt approach here; we don't want to fail the entire process if one of the
// messages failed to parse. // messages failed to parse.
guard let envelope = SNProtoEnvelope.from(json), guard let envelope = SNProtoEnvelope.from(json),
let data = try? envelope.serializedData() else { return nil } let data = try? envelope.serializedData() else { return nil }
let job = MessageReceiveJob(data: data, serverHash: json["hash"] as? String, isBackgroundPoll: true) let job = MessageReceiveJob(data: data, serverHash: json["hash"] as? String, isBackgroundPoll: true)
processedMessages.append(json)
return job.execute() return job.execute()
} }
// Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value // Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value & `receivedMessageHashes`
SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: SnodeAPI.defaultNamespace, associatedWith: publicKey, from: lastRawMessage) SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: SnodeAPI.defaultNamespace, associatedWith: publicKey, from: lastRawMessage)
SnodeAPI.updateReceivedMessages(from: processedMessages, associatedWith: publicKey)
return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects
} }
@ -82,16 +85,19 @@ public final class BackgroundPoller : NSObject {
for result in results { for result in results {
if case .fulfilled(let rawResponse) = result { if case .fulfilled(let rawResponse) = result {
let (messages, lastRawMessage) = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey) let (messages, lastRawMessage) = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey)
var processedMessages: [JSON] = []
let jobPromises = messages.compactMap { json -> Promise<Void>? in let jobPromises = messages.compactMap { json -> Promise<Void>? in
// Use a best attempt approach here; we don't want to fail the entire process if one of the // Use a best attempt approach here; we don't want to fail the entire process if one of the
// messages failed to parse. // messages failed to parse.
guard let envelope = SNProtoEnvelope.from(json), guard let envelope = SNProtoEnvelope.from(json),
let data = try? envelope.serializedData() else { return nil } let data = try? envelope.serializedData() else { return nil }
let job = MessageReceiveJob(data: data, serverHash: json["hash"] as? String, isBackgroundPoll: true) let job = MessageReceiveJob(data: data, serverHash: json["hash"] as? String, isBackgroundPoll: true)
processedMessages.append(json)
return job.execute() return job.execute()
} }
// Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value // Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value & `receivedMessageHashes`
SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: namespaces[index], associatedWith: publicKey, from: lastRawMessage) SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: namespaces[index], associatedWith: publicKey, from: lastRawMessage)
SnodeAPI.updateReceivedMessages(from: processedMessages, associatedWith: publicKey)
promises += jobPromises promises += jobPromises
} }
index += 1 index += 1

@ -122,6 +122,7 @@ public final class ClosedGroupPoller : NSObject {
if !rawMessages.isEmpty { if !rawMessages.isEmpty {
SNLog("Received \(rawMessages.count) new message(s) in closed group with public key: \(groupPublicKey).") SNLog("Received \(rawMessages.count) new message(s) in closed group with public key: \(groupPublicKey).")
} }
var processedMessages: [JSON] = []
rawMessages.forEach { json in rawMessages.forEach { json in
guard let envelope = SNProtoEnvelope.from(json) else { return } guard let envelope = SNProtoEnvelope.from(json) else { return }
do { do {
@ -130,13 +131,15 @@ public final class ClosedGroupPoller : NSObject {
SNMessagingKitConfiguration.shared.storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction) SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
} }
processedMessages.append(json)
} catch { } catch {
SNLog("Failed to deserialize envelope due to error: \(error).") SNLog("Failed to deserialize envelope due to error: \(error).")
} }
} }
// Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value // Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value & `receivedMessageHashes`
SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: SnodeAPI.closedGroupNamespace, associatedWith: groupPublicKey, from: lastRawMessage) SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: SnodeAPI.closedGroupNamespace, associatedWith: groupPublicKey, from: lastRawMessage)
SnodeAPI.updateReceivedMessages(from: processedMessages, associatedWith: groupPublicKey)
} }
promise.catch2 { error in promise.catch2 { error in
SNLog("Polling failed for closed group with public key: \(groupPublicKey) due to error: \(error).") SNLog("Polling failed for closed group with public key: \(groupPublicKey) due to error: \(error).")

@ -98,6 +98,7 @@ public final class Poller : NSObject {
if !messages.isEmpty { if !messages.isEmpty {
SNLog("Received \(messages.count) new message(s).") SNLog("Received \(messages.count) new message(s).")
} }
var processedMessages: [JSON] = []
messages.forEach { json in messages.forEach { json in
guard let envelope = SNProtoEnvelope.from(json) else { return } guard let envelope = SNProtoEnvelope.from(json) else { return }
do { do {
@ -106,13 +107,15 @@ public final class Poller : NSObject {
SNMessagingKitConfiguration.shared.storage.write { transaction in SNMessagingKitConfiguration.shared.storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction) SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
} }
processedMessages.append(json)
} catch { } catch {
SNLog("Failed to deserialize envelope due to error: \(error).") SNLog("Failed to deserialize envelope due to error: \(error).")
} }
} }
// Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value // Now that the MessageReceiveJob's have been created we can update the `lastMessageHash` value & `receivedMessageHashes`
SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: SnodeAPI.defaultNamespace, associatedWith: userPublicKey, from: lastRawMessage) SnodeAPI.updateLastMessageHashValueIfPossible(for: snode, namespace: SnodeAPI.defaultNamespace, associatedWith: userPublicKey, from: lastRawMessage)
SnodeAPI.updateReceivedMessages(from: processedMessages, associatedWith: userPublicKey)
strongSelf.pollCount += 1 strongSelf.pollCount += 1
if strongSelf.pollCount == Poller.maxPollCount { if strongSelf.pollCount == Poller.maxPollCount {

@ -705,6 +705,20 @@ public final class SnodeAPI : NSObject {
} }
} }
public static func updateReceivedMessages(from messages: [JSON], associatedWith publicKey: String) {
let oldReceivedMessages = SNSnodeKitConfiguration.shared.storage.getReceivedMessages(for: publicKey)
var newReceivedMessages = oldReceivedMessages
for message in messages {
guard let hash = message["hash"] as? String else { continue }
newReceivedMessages.insert(hash)
}
if oldReceivedMessages != newReceivedMessages {
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setReceivedMessages(to: newReceivedMessages, for: publicKey, using: transaction)
}
}
}
private static func removeDuplicates(from rawMessages: [JSON], associatedWith publicKey: String) -> [JSON] { private static func removeDuplicates(from rawMessages: [JSON], associatedWith publicKey: String) -> [JSON] {
let oldReceivedMessages = SNSnodeKitConfiguration.shared.storage.getReceivedMessages(for: publicKey) let oldReceivedMessages = SNSnodeKitConfiguration.shared.storage.getReceivedMessages(for: publicKey)
var newReceivedMessages = oldReceivedMessages var newReceivedMessages = oldReceivedMessages
@ -717,12 +731,6 @@ public final class SnodeAPI : NSObject {
newReceivedMessages.insert(hash) newReceivedMessages.insert(hash)
return !isDuplicate return !isDuplicate
} }
// Avoid the sync write transaction if possible
if oldReceivedMessages != newReceivedMessages {
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
SNSnodeKitConfiguration.shared.storage.setReceivedMessages(to: newReceivedMessages, for: publicKey, using: transaction)
}
}
return result return result
} }

@ -110,7 +110,6 @@ extension Storage {
if now >= expirationDate { if now >= expirationDate {
Storage.writeSync { transaction in Storage.writeSync { transaction in
self.removeLastMessageHashInfo(for: snode, namespace: namespace, associatedWith: publicKey, using: transaction) self.removeLastMessageHashInfo(for: snode, namespace: namespace, associatedWith: publicKey, using: transaction)
self.setReceivedMessages(to: Set(), for: publicKey, using: transaction)
} }
} }
} }

Loading…
Cancel
Save