// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Combine
public protocol CombineCompatible {}
public extension Publisher {
/// Provides a subject that shares a single subscription to the upstream publisher and replays at most
/// `bufferSize` items emitted by that publisher
/// - Parameter bufferSize: limits the number of items that can be replayed
func shareReplay(_ bufferSize: Int) -> AnyPublisher<Output, Failure> {
return multicast(subject: ReplaySubject(bufferSize))
func sink(into subject: PassthroughSubject<Output, Failure>, includeCompletions: Bool = false) -> AnyCancellable {
return sink(
receiveCompletion: { completion in
guard includeCompletions else { return }
subject.send(completion: completion)
receiveValue: { value in subject.send(value) }
/// The standard `.receive(on: DispatchQueue.main)` seems to ocassionally dispatch to the
/// next run loop before emitting data, this method checks if it's running on the main thread already and
/// if so just emits directly rather than routing via `.receive(on:)`
func receiveOnMain(immediately receiveImmediately: Bool = false) -> AnyPublisher<Output, Failure> {
guard receiveImmediately else {
return self.receive(on: DispatchQueue.main)
return self
.flatMap { value -> AnyPublisher<Output, Failure> in
guard Thread.isMainThread else {
return Just(value)
.setFailureType(to: Failure.self)
.receive(on: DispatchQueue.main)
return Just(value)
.setFailureType(to: Failure.self)
// MARK: - Convenience
public extension Publisher {
func sink(into subject: PassthroughSubject<Output, Failure>?, includeCompletions: Bool = false) -> AnyCancellable {
guard let targetSubject: PassthroughSubject<Output, Failure> = subject else { return AnyCancellable {} }
return sink(into: targetSubject, includeCompletions: includeCompletions)