mirror of https://github.com/oxen-io/session-ios
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
125 lines
4.6 KiB
Swift
125 lines
4.6 KiB
Swift
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
|
|
|
|
import Foundation
|
|
import Combine
|
|
|
|
/// A subject that stores the last `bufferSize` emissions and emits them for every new subscriber
|
|
///
|
|
/// Note: This implementation was found here: https://github.com/sgl0v/OnSwiftWings
|
|
public final class ReplaySubject<Output, Failure: Error>: Subject {
|
|
private var buffer: [Output] = [Output]()
|
|
private let bufferSize: Int
|
|
private let lock: NSRecursiveLock = NSRecursiveLock()
|
|
private var completion: Subscribers.Completion<Failure>?
|
|
@ThreadSafeObject private var subscriptions: [ReplaySubjectSubscription<Output, Failure>] = []
|
|
|
|
// MARK: - Initialization
|
|
|
|
init(_ bufferSize: Int = 0) {
|
|
self.bufferSize = bufferSize
|
|
}
|
|
|
|
// MARK: - Subject Methods
|
|
|
|
/// Sends a value to the subscriber
|
|
public func send(_ value: Output) {
|
|
lock.lock(); defer { lock.unlock() }
|
|
|
|
buffer.append(value)
|
|
buffer = buffer.suffix(bufferSize)
|
|
subscriptions.forEach { $0.receive(value) }
|
|
}
|
|
|
|
/// Sends a completion signal to the subscriber
|
|
public func send(completion: Subscribers.Completion<Failure>) {
|
|
lock.lock(); defer { lock.unlock() }
|
|
|
|
self.completion = completion
|
|
subscriptions.forEach { $0.receive(completion: completion) }
|
|
}
|
|
|
|
/// Provides this Subject an opportunity to establish demand for any new upstream subscriptions
|
|
public func send(subscription: Subscription) {
|
|
lock.lock(); defer { lock.unlock() }
|
|
|
|
subscription.request(.unlimited)
|
|
}
|
|
|
|
/// This function is called to attach the specified `Subscriber` to the`Publisher
|
|
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
|
|
lock.lock(); defer { lock.unlock() }
|
|
|
|
/// According to the below comment the `subscriber.receive(subscription: subscription)` code runs asynchronously
|
|
/// which aligns with testing (resulting in the `request(_ newDemand: Subscribers.Demand)` function getting called after this
|
|
/// function returns
|
|
///
|
|
/// Later in the thread it's mentioned that as of `iOS 13.3` this behaviour changed to be synchronous but as of writing the minimum
|
|
/// deployment version is set to `iOS 13.0` which I assume is why we are seeing the async behaviour which results in `receiveValue`
|
|
/// not being called in some cases
|
|
///
|
|
/// When the project is eventually updated to have a minimum version higher than `iOS 13.3` we should re-test this behaviour to see if
|
|
/// we can revert this change
|
|
///
|
|
/// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631/20
|
|
let subscription: ReplaySubjectSubscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber)) { [weak self, buffer = buffer, completion = completion] subscription in
|
|
self?._subscriptions.performUpdate { $0.appending(subscription) }
|
|
subscription.replay(buffer, completion: completion)
|
|
}
|
|
subscriber.receive(subscription: subscription)
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
public final class ReplaySubjectSubscription<Output, Failure: Error>: Subscription {
|
|
private let downstream: AnySubscriber<Output, Failure>
|
|
private var isCompleted: Bool = false
|
|
private var demand: Subscribers.Demand = .none
|
|
private var onInitialDemand: ((ReplaySubjectSubscription) -> ())?
|
|
|
|
// MARK: - Initialization
|
|
|
|
init(downstream: AnySubscriber<Output, Failure>, onInitialDemand: @escaping (ReplaySubjectSubscription) -> ()) {
|
|
self.downstream = downstream
|
|
self.onInitialDemand = onInitialDemand
|
|
}
|
|
|
|
// MARK: - Subscription
|
|
|
|
public func request(_ newDemand: Subscribers.Demand) {
|
|
demand += newDemand
|
|
onInitialDemand?(self)
|
|
onInitialDemand = nil
|
|
}
|
|
|
|
public func cancel() {
|
|
isCompleted = true
|
|
}
|
|
|
|
// MARK: - Functions
|
|
|
|
public func receive(_ value: Output) {
|
|
guard !isCompleted, demand > 0 else { return }
|
|
|
|
demand += downstream.receive(value)
|
|
demand -= 1
|
|
}
|
|
|
|
public func receive(completion: Subscribers.Completion<Failure>) {
|
|
guard !isCompleted else { return }
|
|
|
|
isCompleted = true
|
|
downstream.receive(completion: completion)
|
|
}
|
|
|
|
public func replay(_ values: [Output], completion: Subscribers.Completion<Failure>?) {
|
|
guard !isCompleted else { return }
|
|
|
|
values.forEach { value in receive(value) }
|
|
|
|
if let completion = completion {
|
|
receive(completion: completion)
|
|
}
|
|
}
|
|
}
|