diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index 71c4f15a5..f747a1cc2 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -184,31 +184,21 @@ public final class ClosedGroupPoller { guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) } var promises: [Promise] = [] - var messageCount: Int = 0 - let totalMessagesCount: Int = messageResults - .map { result -> Int in - switch result { - case .fulfilled(let messages): return messages.count - default: return 0 + let allMessages: [SnodeReceivedMessage] = messageResults + .reduce([]) { result, next in + switch next { + case .fulfilled(let messages): return result.appending(contentsOf: messages) + default: return result } } - .reduce(0, +) + var messageCount: Int = 0 + let totalMessagesCount: Int = allMessages.count - messageResults.forEach { result in - guard case .fulfilled(let messages) = result else { return } - guard !messages.isEmpty else { return } - - var jobToRun: Job? - - Storage.shared.write { db in - var jobDetailMessages: [MessageReceiveJob.Details.MessageInfo] = [] - - messages.forEach { message in + Storage.shared.write { db in + let processedMessages: [ProcessedMessage] = allMessages + .compactMap { message -> ProcessedMessage? in do { - let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) - - jobDetailMessages = jobDetailMessages - .appending(processedMessage?.messageInfo) + return try Message.processRawReceivedMessage(db, rawMessage: message) } catch { switch error { @@ -219,28 +209,30 @@ public final class ClosedGroupPoller { MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break - + default: SNLog("Failed to deserialize envelope due to error: \(error).") } + + return nil } } - - messageCount += jobDetailMessages.count - jobToRun = Job( - variant: .messageReceive, - behaviour: .runOnce, - threadId: groupPublicKey, - details: MessageReceiveJob.Details( - messages: jobDetailMessages, - isBackgroundPoll: isBackgroundPoll - ) - ) - - // If we are force-polling then add to the JobRunner so they are persistent and will retry on - // the next app run if they fail but don't let them auto-start - JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll) - } + messageCount = processedMessages.count + + let jobToRun: Job? = Job( + variant: .messageReceive, + behaviour: .runOnce, + threadId: groupPublicKey, + details: MessageReceiveJob.Details( + messages: processedMessages.map { $0.messageInfo }, + isBackgroundPoll: isBackgroundPoll + ) + ) + + // If we are force-polling then add to the JobRunner so they are persistent and will retry on + // the next app run if they fail but don't let them auto-start + JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll) + // We want to try to handle the receive jobs immediately in the background if isBackgroundPoll { promises = promises.appending( diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index f63a29486..9e2c0d310 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -136,49 +136,44 @@ public final class Poller { var messageCount: Int = 0 Storage.shared.write { db in - var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] - - messages.forEach { message in - do { - let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) - let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId) - - threadMessages[key] = (threadMessages[key] ?? []) - .appending(processedMessage?.messageInfo) - } - catch { - switch error { - // Ignore duplicate & selfSend message errors (and don't bother logging - // them as there will be a lot since we each service node duplicates messages) - case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, - MessageReceiverError.duplicateMessage, - MessageReceiverError.duplicateControlMessage, - MessageReceiverError.selfSend: - break + messages + .compactMap { message -> ProcessedMessage? in + do { + return try Message.processRawReceivedMessage(db, rawMessage: message) + } + catch { + switch error { + // Ignore duplicate & selfSend message errors (and don't bother logging + // them as there will be a lot since we each service node duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + default: SNLog("Failed to deserialize envelope due to error: \(error).") + } - default: SNLog("Failed to deserialize envelope due to error: \(error).") + return nil } } - } - - messageCount = threadMessages - .values - .reduce(into: 0) { prev, next in prev += next.count } - - threadMessages.forEach { threadId, threadMessages in - JobRunner.add( - db, - job: Job( - variant: .messageReceive, - behaviour: .runOnce, - threadId: threadId, - details: MessageReceiveJob.Details( - messages: threadMessages, - isBackgroundPoll: false + .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } + .forEach { threadId, threadMessages in + messageCount += threadMessages.count + + JobRunner.add( + db, + job: Job( + variant: .messageReceive, + behaviour: .runOnce, + threadId: threadId, + details: MessageReceiveJob.Details( + messages: threadMessages.map { $0.messageInfo }, + isBackgroundPoll: false + ) ) ) - ) - } + } } SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (duplicates: \(messages.count - messageCount))") diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 6104b5680..0c1950947 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -688,9 +688,11 @@ private final class JobQueue { } private func scheduleNextSoonestJob() { + let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false) .select(.nextRunTimestamp) + .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .asRequest(of: TimeInterval.self) .fetchOne(db) }