// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. // // stringlint:disable import Foundation import Combine public protocol CombineCompatible {} public enum PublisherError: Error, CustomStringConvertible { case targetPublisherIsNull case invalidCollectionType // stringlint:ignore_contents public var description: String { switch self { case .targetPublisherIsNull: return "The target publisher is null, likely due to a 'weak self' (PublisherError.targetPublisherIsNull)." case .invalidCollectionType: return "Failed to convert array literal to desired Publisher type (PublisherError.invalidCollectionType)." } } } public extension Publisher { /// Provides a subject that shares a single subscription to the upstream publisher and replays at most /// `bufferSize` items emitted by that publisher /// - Parameter bufferSize: limits the number of items that can be replayed func shareReplay(_ bufferSize: Int) -> AnyPublisher { return multicast(subject: ReplaySubject(bufferSize)) .autoconnect() .eraseToAnyPublisher() } func sink(into subject: PassthroughSubject, includeCompletions: Bool = false) -> AnyCancellable { return sink( receiveCompletion: { completion in guard includeCompletions else { return } subject.send(completion: completion) }, receiveValue: { value in subject.send(value) } ) } func flatMapOptional( maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P? ) -> AnyPublisher where T == P.Output, P : Publisher, P.Failure == Error { return self .mapError { $0 } .flatMap(maxPublishers: maxPublishers) { output -> AnyPublisher in do { guard let result: AnyPublisher = transform(output)?.eraseToAnyPublisher() else { throw PublisherError.targetPublisherIsNull } return result } catch { return Fail(error: error) .eraseToAnyPublisher() } } .eraseToAnyPublisher() } func tryFlatMap( maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) throws -> P ) -> AnyPublisher where T == P.Output, P : Publisher, P.Failure == Error { return self .mapError { $0 } .flatMap(maxPublishers: maxPublishers) { output -> AnyPublisher in do { return try transform(output) .eraseToAnyPublisher() } catch { return Fail(error: error) .eraseToAnyPublisher() } } .eraseToAnyPublisher() } func tryFlatMapOptional( maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) throws -> P? ) -> AnyPublisher where T == P.Output, P : Publisher, P.Failure == Error { return self .mapError { $0 } .flatMap(maxPublishers: maxPublishers) { output -> AnyPublisher in do { guard let result: AnyPublisher = try transform(output)?.eraseToAnyPublisher() else { throw PublisherError.targetPublisherIsNull } return result } catch { return Fail(error: error) .eraseToAnyPublisher() } } .eraseToAnyPublisher() } func catchOptional

( _ handler: @escaping (Self.Failure) -> P? ) -> AnyPublisher where P : Publisher, Self.Output == P.Output, P.Failure == Error { return self .catch { error in guard let result: AnyPublisher = handler(error)?.eraseToAnyPublisher() else { return Fail(error: PublisherError.targetPublisherIsNull) .eraseToAnyPublisher() } return result } .eraseToAnyPublisher() } func subscribe( on scheduler: S, options: S.SchedulerOptions? = nil, using dependencies: Dependencies ) -> AnyPublisher where S: Scheduler { guard !dependencies.forceSynchronous else { return self.eraseToAnyPublisher() } return self.subscribe(on: scheduler, options: options) .eraseToAnyPublisher() } func receive( on scheduler: S, options: S.SchedulerOptions? = nil, using dependencies: Dependencies ) -> AnyPublisher where S: Scheduler { guard !dependencies.forceSynchronous else { return self.eraseToAnyPublisher() } return self.receive(on: scheduler, options: options) .eraseToAnyPublisher() } func manualRefreshFrom(_ refreshTrigger: some Publisher) -> AnyPublisher { return Publishers .CombineLatest(refreshTrigger.prepend(()).setFailureType(to: Failure.self), self) .map { _, value in value } .eraseToAnyPublisher() } func withPrevious() -> AnyPublisher<(previous: Output?, current: Output), Failure> { scan(Optional<(Output?, Output)>.none) { ($0?.1, $1) } .compactMap { $0 } .eraseToAnyPublisher() } func withPrevious(_ initialPreviousValue: Output) -> AnyPublisher<(previous: Output, current: Output), Failure> { scan((initialPreviousValue, initialPreviousValue)) { ($0.1, $1) }.eraseToAnyPublisher() } } // MARK: - Convenience public extension Publisher { func sink(into subject: PassthroughSubject?, includeCompletions: Bool = false) -> AnyCancellable { guard let targetSubject: PassthroughSubject = subject else { return AnyCancellable {} } return sink(into: targetSubject, includeCompletions: includeCompletions) } /// Automatically retains the subscription until it emits a 'completion' event func sinkUntilComplete( receiveCompletion: ((Subscribers.Completion) -> Void)? = nil, receiveValue: ((Output) -> Void)? = nil ) { var retainCycle: Cancellable? = nil retainCycle = self .sink( receiveCompletion: { result in receiveCompletion?(result) // Redundant but without reading 'retainCycle' it will warn that the variable // isn't used if retainCycle != nil { retainCycle = nil } }, receiveValue: (receiveValue ?? { _ in }) ) } } public extension Publisher { /// Converts the publisher to output a Result instead of throwing an error, can be used to ensure a subscription never /// closes due to a failure func asResult() -> AnyPublisher, Never> { self .map { Result.success($0) } .catch { Just(Result.failure($0)).eraseToAnyPublisher() } .eraseToAnyPublisher() } } extension AnyPublisher: @retroactive ExpressibleByArrayLiteral where Output: RangeReplaceableCollection { public init(arrayLiteral elements: Output.Element...) { self = Just(Output(elements)).setFailureType(to: Failure.self).eraseToAnyPublisher() } }