// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Combine /// A subject that stores the last `bufferSize` emissions and emits them for every new subscriber /// /// Note: This implementation was found here: https://github.com/sgl0v/OnSwiftWings public final class ReplaySubject: Subject { private var buffer: [Output] = [Output]() private let bufferSize: Int private let lock: NSRecursiveLock = NSRecursiveLock() private var subscriptions: Atomic<[ReplaySubjectSubscription]> = Atomic([]) private var completion: Subscribers.Completion? // MARK: - Initialization init(_ bufferSize: Int = 0) { self.bufferSize = bufferSize } // MARK: - Subject Methods /// Sends a value to the subscriber public func send(_ value: Output) { lock.lock(); defer { lock.unlock() } buffer.append(value) buffer = buffer.suffix(bufferSize) subscriptions.wrappedValue.forEach { $0.receive(value) } } /// Sends a completion signal to the subscriber public func send(completion: Subscribers.Completion) { lock.lock(); defer { lock.unlock() } self.completion = completion subscriptions.wrappedValue.forEach { $0.receive(completion: completion) } } /// Provides this Subject an opportunity to establish demand for any new upstream subscriptions public func send(subscription: Subscription) { lock.lock(); defer { lock.unlock() } subscription.request(.unlimited) } /// This function is called to attach the specified `Subscriber` to the`Publisher public func receive(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { lock.lock(); defer { lock.unlock() } /// According to the below comment the `subscriber.receive(subscription: subscription)` code runs asynchronously /// which aligns with testing (resulting in the `request(_ newDemand: Subscribers.Demand)` function getting called after this /// function returns /// /// Later in the thread it's mentioned that as of `iOS 13.3` this behaviour changed to be synchronous but as of writing the minimum /// deployment version is set to `iOS 13.0` which I assume is why we are seeing the async behaviour which results in `receiveValue` /// not being called in some cases /// /// When the project is eventually updated to have a minimum version higher than `iOS 13.3` we should re-test this behaviour to see if /// we can revert this change /// /// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631/20 let subscription: ReplaySubjectSubscription = ReplaySubjectSubscription(downstream: AnySubscriber(subscriber)) { [weak self, buffer = buffer, completion = completion] subscription in self?.subscriptions.mutate { $0.append(subscription) } subscription.replay(buffer, completion: completion) } subscriber.receive(subscription: subscription) } } // MARK: - public final class ReplaySubjectSubscription: Subscription { private let downstream: AnySubscriber private var isCompleted: Bool = false private var demand: Subscribers.Demand = .none private var onInitialDemand: ((ReplaySubjectSubscription) -> ())? // MARK: - Initialization init(downstream: AnySubscriber, onInitialDemand: @escaping (ReplaySubjectSubscription) -> ()) { self.downstream = downstream self.onInitialDemand = onInitialDemand } // MARK: - Subscription public func request(_ newDemand: Subscribers.Demand) { demand += newDemand onInitialDemand?(self) onInitialDemand = nil } public func cancel() { isCompleted = true } // MARK: - Functions public func receive(_ value: Output) { guard !isCompleted, demand > 0 else { return } demand += downstream.receive(value) demand -= 1 } public func receive(completion: Subscribers.Completion) { guard !isCompleted else { return } isCompleted = true downstream.receive(completion: completion) } public func replay(_ values: [Output], completion: Subscribers.Completion?) { guard !isCompleted else { return } values.forEach { value in receive(value) } if let completion = completion { receive(completion: completion) } } }