// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. import Foundation import GRDB import Combine import DifferenceKit import SessionUtilitiesKit // MARK: - ObservableTableSource public protocol ObservableTableSource: AnyObject, SectionedTableData { typealias TargetObservation = TableObservation<[SectionModel]> typealias TargetPublisher = AnyPublisher<(([SectionModel], StagedChangeset<[SectionModel]>)), Error> var dependencies: Dependencies { get } var state: TableDataState { get } var observableState: ObservableTableSourceState { get } var observation: TargetObservation { get } // MARK: - Functions func didReturnFromBackground() } extension ObservableTableSource { public var pendingTableDataSubject: CurrentValueSubject<([SectionModel], StagedChangeset<[SectionModel]>), Never> { self.observableState.pendingTableDataSubject } public var observation: TargetObservation { ObservationBuilder.changesetSubject(self.observableState.pendingTableDataSubject) } public var tableDataPublisher: TargetPublisher { self.observation.finalPublisher(self, using: dependencies) } public func didReturnFromBackground() {} public func forceRefresh() { self.observableState._forcedRefresh.send(()) } } // MARK: - State Manager (ObservableTableSource) public class ObservableTableSourceState: SectionedTableData { public let forcedRefresh: AnyPublisher public let pendingTableDataSubject: CurrentValueSubject<([SectionModel], StagedChangeset<[SectionModel]>), Never> // MARK: - Internal Variables fileprivate var hasEmittedInitialData: Bool fileprivate let _forcedRefresh: PassthroughSubject = PassthroughSubject() // MARK: - Initialization init() { self.hasEmittedInitialData = false self.forcedRefresh = _forcedRefresh.shareReplay(0) self.pendingTableDataSubject = CurrentValueSubject(([], StagedChangeset())) } } // MARK: - TableObservation public struct TableObservation { fileprivate let generatePublisher: (any ObservableTableSource, Dependencies) -> AnyPublisher fileprivate let generatePublisherWithChangeset: ((any ObservableTableSource, Dependencies) -> AnyPublisher)? init(generatePublisher: @escaping (any ObservableTableSource, Dependencies) -> AnyPublisher) { self.generatePublisher = generatePublisher self.generatePublisherWithChangeset = nil } init(generatePublisherWithChangeset: @escaping (any ObservableTableSource, Dependencies) -> AnyPublisher<(T, StagedChangeset), Error>) where T: Collection { self.generatePublisher = { _, _ in Fail(error: StorageError.invalidData).eraseToAnyPublisher() } self.generatePublisherWithChangeset = { source, dependencies in generatePublisherWithChangeset(source, dependencies).map { $0 as Any }.eraseToAnyPublisher() } } fileprivate func finalPublisher( _ source: S, using dependencies: Dependencies ) -> S.TargetPublisher { typealias TargetData = (([S.SectionModel], StagedChangeset<[S.SectionModel]>)) switch (self, self.generatePublisherWithChangeset) { case (_, .some(let generatePublisherWithChangeset)): return generatePublisherWithChangeset(source, dependencies) .tryMap { data -> TargetData in guard let convertedData: TargetData = data as? TargetData else { throw StorageError.invalidData } return convertedData } .eraseToAnyPublisher() case (let validObservation as S.TargetObservation, _): // Doing `removeDuplicates` in case the conversion from the original data to [SectionModel] // can result in duplicate output even with some different inputs return validObservation.generatePublisher(source, dependencies) .removeDuplicates() .mapToSessionTableViewData(for: source) default: return Fail(error: StorageError.invalidData).eraseToAnyPublisher() } } } extension TableObservation: ExpressibleByArrayLiteral where T: Collection { public init(arrayLiteral elements: T.Element?...) { self.init( generatePublisher: { _, _ in guard let convertedElements: T = Array(elements.compactMap { $0 }) as? T else { return Fail(error: StorageError.invalidData).eraseToAnyPublisher() } return Just(convertedElements) .setFailureType(to: Error.self) .eraseToAnyPublisher() } ) } } // MARK: - ObservationBuilder public enum ObservationBuilder { /// The `subject` will emit immediately when there is a subscriber and store the most recent value to be emitted whenever a new subscriber is /// added static func subject(_ subject: CurrentValueSubject) -> TableObservation { return TableObservation { _, _ in return subject .removeDuplicates() .eraseToAnyPublisher() } } /// The `subject` will emit immediately when there is a subscriber and store the most recent value to be emitted whenever a new subscriber is /// added static func subject(_ subject: CurrentValueSubject) -> TableObservation { return TableObservation { _, _ in return subject .removeDuplicates() .setFailureType(to: Error.self) .eraseToAnyPublisher() } } /// The `ValueObserveration` will trigger whenever any of the data fetched in the closure is updated, please see the following link for tips /// to help optimise performance https://github.com/groue/GRDB.swift#valueobservation-performance static func databaseObservation(_ source: S, fetch: @escaping (Database) throws -> T) -> TableObservation { /// **Note:** This observation will be triggered twice immediately (and be de-duped by the `removeDuplicates`) /// this is due to the behaviour of `ValueConcurrentObserver.asyncStartObservation` which triggers it's own /// fetch (after the ones in `ValueConcurrentObserver.asyncStart`/`ValueConcurrentObserver.syncStart`) /// just in case the database has changed between the two reads - unfortunately it doesn't look like there is a way to prevent this return TableObservation { viewModel, dependencies in return ValueObservation .trackingConstantRegion(fetch) .removeDuplicates() .handleEvents(didFail: { SNLog("[\(type(of: viewModel))] Observation failed with error: \($0)") }) .publisher(in: dependencies.storage, scheduling: dependencies.scheduler) .manualRefreshFrom(source.observableState.forcedRefresh) } } /// The `ValueObserveration` will trigger whenever any of the data fetched in the closure is updated, please see the following link for tips /// to help optimise performance https://github.com/groue/GRDB.swift#valueobservation-performance static func databaseObservation(_ source: S, fetch: @escaping (Database) throws -> [T]) -> TableObservation<[T]> { /// **Note:** This observation will be triggered twice immediately (and be de-duped by the `removeDuplicates`) /// this is due to the behaviour of `ValueConcurrentObserver.asyncStartObservation` which triggers it's own /// fetch (after the ones in `ValueConcurrentObserver.asyncStart`/`ValueConcurrentObserver.syncStart`) /// just in case the database has changed between the two reads - unfortunately it doesn't look like there is a way to prevent this return TableObservation { viewModel, dependencies in return ValueObservation .trackingConstantRegion(fetch) .removeDuplicates() .handleEvents(didFail: { SNLog("[\(type(of: viewModel))] Observation failed with error: \($0)") }) .publisher(in: dependencies.storage, scheduling: dependencies.scheduler) .manualRefreshFrom(source.observableState.forcedRefresh) } } /// The `changesetSubject` will emit immediately when there is a subscriber and store the most recent value to be emitted whenever a new subscriber is /// added static func changesetSubject( _ subject: CurrentValueSubject<([T], StagedChangeset<[T]>), Never> ) -> TableObservation<[T]> { return TableObservation { viewModel, dependencies in subject .withPrevious(([], StagedChangeset())) .filter { prev, next in /// Suppress events with no changes (these will be sent in order to clear out the `StagedChangeset` value as if we /// don't do so then resubscribing will result in an attempt to apply an invalid changeset to the `tableView` resulting /// in a crash) !next.1.isEmpty } .map { _, current -> ([T], StagedChangeset<[T]>) in current } .setFailureType(to: Error.self) .eraseToAnyPublisher() } } } // MARK: - Convenience Transforms public extension TableObservation { func map(transform: @escaping (T) -> R) -> TableObservation { return TableObservation { viewModel, dependencies in self.generatePublisher(viewModel, dependencies).map(transform).eraseToAnyPublisher() } } func mapWithPrevious(transform: @escaping (T?, T) -> R) -> TableObservation { return TableObservation { viewModel, dependencies in self.generatePublisher(viewModel, dependencies) .withPrevious() .map(transform) .eraseToAnyPublisher() } } } public extension Array { func mapToSessionTableViewData( for source: S? ) -> [ArraySection>] where Element == ArraySection> { // Update the data to include the proper position for each element return self.map { section in ArraySection( model: section.model, elements: section.elements.enumerated().map { index, element in element.updatedPosition(for: index, count: section.elements.count) } ) } } } public extension Publisher { func mapToSessionTableViewData( for source: S ) -> AnyPublisher<(Output, StagedChangeset), Failure> where Output == [ArraySection>] { return self .map { [weak source] updatedData -> (Output, StagedChangeset) in let updatedDataWithPositions: Output = updatedData .mapToSessionTableViewData(for: source) // Generate an updated changeset let changeset = StagedChangeset( source: (source?.state.tableData ?? []), target: updatedDataWithPositions ) return (updatedDataWithPositions, changeset) } .filter { [weak source] _, changeset in source?.observableState.hasEmittedInitialData == false || // Always emit at least once !changeset.isEmpty // Do nothing if there were no changes } .handleEvents(receiveOutput: { [weak source] _ in source?.observableState.hasEmittedInitialData = true }) .eraseToAnyPublisher() } }