You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-ios/SessionUtilitiesKit/Combine/Publisher+Utilities.swift

142 lines
5.3 KiB
Swift

// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Combine
public protocol CombineCompatible {}
public extension Publisher {
/// Provides a subject that shares a single subscription to the upstream publisher and replays at most
/// `bufferSize` items emitted by that publisher
/// - Parameter bufferSize: limits the number of items that can be replayed
func shareReplay(_ bufferSize: Int) -> AnyPublisher<Output, Failure> {
return multicast(subject: ReplaySubject(bufferSize))
.autoconnect()
.eraseToAnyPublisher()
}
func sink(into subject: PassthroughSubject<Output, Failure>, includeCompletions: Bool = false) -> AnyCancellable {
return sink(
receiveCompletion: { completion in
guard includeCompletions else { return }
subject.send(completion: completion)
},
receiveValue: { value in subject.send(value) }
)
}
/// The standard `.receive(on: DispatchQueue.main)` seems to ocassionally dispatch to the
/// next run loop before emitting data, this method checks if it's running on the main thread already and
/// if so just emits directly rather than routing via `.receive(on:)`
func receiveOnMain(immediately receiveImmediately: Bool = false) -> AnyPublisher<Output, Failure> {
guard receiveImmediately else {
return self.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
return self
.flatMap { value -> AnyPublisher<Output, Failure> in
guard Thread.isMainThread else {
return Just(value)
.setFailureType(to: Failure.self)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
return Just(value)
.setFailureType(to: Failure.self)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}
// MARK: - Convenience
public extension Publisher {
func sink(into subject: PassthroughSubject<Output, Failure>?, includeCompletions: Bool = false) -> AnyCancellable {
guard let targetSubject: PassthroughSubject<Output, Failure> = subject else { return AnyCancellable {} }
return sink(into: targetSubject, includeCompletions: includeCompletions)
}
Work on the PromiseKit refactor # Conflicts: # Session.xcodeproj/project.pbxproj # Session/Conversations/ConversationVC+Interaction.swift # Session/Home/Message Requests/MessageRequestsViewModel.swift # Session/Notifications/AppNotifications.swift # Session/Notifications/PushRegistrationManager.swift # Session/Notifications/SyncPushTokensJob.swift # Session/Notifications/UserNotificationsAdaptee.swift # Session/Settings/BlockedContactsViewModel.swift # Session/Settings/NukeDataModal.swift # Session/Settings/SettingsViewModel.swift # Session/Utilities/BackgroundPoller.swift # SessionMessagingKit/Database/Models/ClosedGroup.swift # SessionMessagingKit/File Server/FileServerAPI.swift # SessionMessagingKit/Open Groups/OpenGroupAPI.swift # SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift # SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+UnsendRequests.swift # SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift # SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift # SessionMessagingKit/Sending & Receiving/MessageSender.swift # SessionMessagingKit/Sending & Receiving/Notifications/PushNotificationAPI.swift # SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift # SessionMessagingKit/Sending & Receiving/Pollers/CurrentUserPoller.swift # SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift # SessionMessagingKit/Utilities/ProfileManager.swift # SessionSnodeKit/Networking/SnodeAPI.swift # SessionSnodeKit/OnionRequestAPI.swift # SessionUtilitiesKit/Networking/HTTP.swift
2 years ago
/// Automatically retains the subscription until it emits a 'completion' event
func sinkUntilComplete(
receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)? = nil,
receiveValue: ((Output) -> Void)? = nil
) {
var retainCycle: Cancellable? = nil
retainCycle = self
.sink(
receiveCompletion: { result in
receiveCompletion?(result)
// Redundant but without reading 'retainCycle' it will warn that the variable
// isn't used
if retainCycle != nil { retainCycle = nil }
},
receiveValue: (receiveValue ?? { _ in })
)
}
}
public extension AnyPublisher {
/// Converts the publisher to output a Result instead of throwing an error, can be used to ensure a subscription never
/// closes due to a failure
func asResult() -> AnyPublisher<Result<Output, Failure>, Never> {
self
.map { Result<Output, Failure>.success($0) }
.catch { Just(Result<Output, Failure>.failure($0)).eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}
// MARK: - Data Decoding
public extension AnyPublisher where Output == Data, Failure == Error {
func decoded<R: Decodable>(
as type: R.Type,
using dependencies: Dependencies = Dependencies()
) -> AnyPublisher<R, Failure> {
self
.flatMap { data -> AnyPublisher<R, Error> in
do {
return Just(try data.decoded(as: type, using: dependencies))
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
catch {
return Fail(error: error)
.eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
}
public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error {
func decoded<R: Decodable>(
as type: R.Type,
using dependencies: Dependencies = Dependencies()
) -> AnyPublisher<(ResponseInfoType, R), Error> {
self
.flatMap { responseInfo, maybeData -> AnyPublisher<(ResponseInfoType, R), Error> in
guard let data: Data = maybeData else {
return Fail(error: HTTPError.parsingFailed)
.eraseToAnyPublisher()
}
do {
return Just((responseInfo, try data.decoded(as: type, using: dependencies)))
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
catch {
return Fail(error: HTTPError.parsingFailed)
.eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
}