mirror of https://github.com/oxen-io/session-ios
				
				
				
			
			You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			124 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Swift
		
	
			
		
		
	
	
			124 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Swift
		
	
| // Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
 | |
| 
 | |
| import Combine
 | |
| 
 | |
| /// A subject that stores the last `bufferSize` emissions and emits them for every new subscriber
 | |
| ///
 | |
| /// Note: This implementation was found here: https://github.com/sgl0v/OnSwiftWings
 | |
| public final class ReplaySubject<Output, Failure: Error>: Subject {
 | |
|     private var buffer: [Output] = [Output]()
 | |
|     private let bufferSize: Int
 | |
|     private let lock: NSRecursiveLock = NSRecursiveLock()
 | |
|     private var subscriptions: Atomic<[ReplaySubjectSubscription<Output, Failure>]> = Atomic([])
 | |
|     private var completion: Subscribers.Completion<Failure>?
 | |
|     
 | |
|     // MARK: - Initialization
 | |
| 
 | |
|     init(_ bufferSize: Int = 0) {
 | |
|         self.bufferSize = bufferSize
 | |
|     }
 | |
|     
 | |
|     // MARK: - Subject Methods
 | |
|     
 | |
|     /// Sends a value to the subscriber
 | |
|     public func send(_ value: Output) {
 | |
|         lock.lock(); defer { lock.unlock() }
 | |
|         
 | |
|         buffer.append(value)
 | |
|         buffer = buffer.suffix(bufferSize)
 | |
|         subscriptions.wrappedValue.forEach { $0.receive(value) }
 | |
|     }
 | |
|     
 | |
|     /// Sends a completion signal to the subscriber
 | |
|     public func send(completion: Subscribers.Completion<Failure>) {
 | |
|         lock.lock(); defer { lock.unlock() }
 | |
|         
 | |
|         self.completion = completion
 | |
|         subscriptions.wrappedValue.forEach { $0.receive(completion: completion) }
 | |
|     }
 | |
|     
 | |
|     /// Provides this Subject an opportunity to establish demand for any new upstream subscriptions
 | |
|     public func send(subscription: Subscription) {
 | |
|         lock.lock(); defer { lock.unlock() }
 | |
|         
 | |
|         subscription.request(.unlimited)
 | |
|     }
 | |
|     
 | |
|     /// This function is called to attach the specified `Subscriber` to the`Publisher
 | |
|     public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
 | |
|         lock.lock(); defer { lock.unlock() }
 | |
|         
 | |
|         /// According to the below comment the `subscriber.receive(subscription: subscription)` code runs asynchronously
 | |
|         /// which aligns with testing (resulting in the `request(_ newDemand: Subscribers.Demand)` function getting called after this
 | |
|         /// function returns
 | |
|         ///
 | |
|         /// Later in the thread it's mentioned that as of `iOS 13.3` this behaviour changed to be synchronous but as of writing the minimum
 | |
|         /// deployment version is set to `iOS 13.0` which I assume is why we are seeing the async behaviour which results in `receiveValue`
 | |
|         /// not being called in some cases
 | |
|         ///
 | |
|         /// When the project is eventually updated to have a minimum version higher than `iOS 13.3` we should re-test this behaviour to see if
 | |
|         /// we can revert this change
 | |
|         ///
 | |
|         /// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631/20
 | |
|         let subscription: ReplaySubjectSubscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber)) { [weak self, buffer = buffer, completion = completion] subscription in
 | |
|             self?.subscriptions.mutate { $0.append(subscription) }
 | |
|             subscription.replay(buffer, completion: completion)
 | |
|         }
 | |
|         subscriber.receive(subscription: subscription)
 | |
|     }
 | |
| }
 | |
| 
 | |
| // MARK: -
 | |
| 
 | |
| public final class ReplaySubjectSubscription<Output, Failure: Error>: Subscription {
 | |
|     private let downstream: AnySubscriber<Output, Failure>
 | |
|     private var isCompleted: Bool = false
 | |
|     private var demand: Subscribers.Demand = .none
 | |
|     private var onInitialDemand: ((ReplaySubjectSubscription) -> ())?
 | |
|     
 | |
|     // MARK: - Initialization
 | |
| 
 | |
|     init(downstream: AnySubscriber<Output, Failure>, onInitialDemand: @escaping (ReplaySubjectSubscription) -> ()) {
 | |
|         self.downstream = downstream
 | |
|         self.onInitialDemand = onInitialDemand
 | |
|     }
 | |
|     
 | |
|     // MARK: - Subscription
 | |
| 
 | |
|     public func request(_ newDemand: Subscribers.Demand) {
 | |
|         demand += newDemand
 | |
|         onInitialDemand?(self)
 | |
|         onInitialDemand = nil
 | |
|     }
 | |
| 
 | |
|     public func cancel() {
 | |
|         isCompleted = true
 | |
|     }
 | |
|     
 | |
|     // MARK: - Functions
 | |
| 
 | |
|     public func receive(_ value: Output) {
 | |
|         guard !isCompleted, demand > 0 else { return }
 | |
| 
 | |
|         demand += downstream.receive(value)
 | |
|         demand -= 1
 | |
|     }
 | |
| 
 | |
|     public func receive(completion: Subscribers.Completion<Failure>) {
 | |
|         guard !isCompleted else { return }
 | |
|         
 | |
|         isCompleted = true
 | |
|         downstream.receive(completion: completion)
 | |
|     }
 | |
| 
 | |
|     public func replay(_ values: [Output], completion: Subscribers.Completion<Failure>?) {
 | |
|         guard !isCompleted else { return }
 | |
|         
 | |
|         values.forEach { value in receive(value) }
 | |
|         
 | |
|         if let completion = completion {
 | |
|             receive(completion: completion)
 | |
|         }
 | |
|     }
 | |
| }
 |