Files
swift-composable-architectu…/Sources/ComposableArchitecture/Internal/Create.swift
Brandon Williams 108e3a536f Concurrency Beta (#1189)
* more main actor audit

* wip

* wip

* fix

* better task result ==

* task result tests

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* fix merge conflicts

* wip

* wip

* lots of doc fixes and modernizations

* lots more docs and better hashable conformance for TaskResult

* more docs

* clean up

* more tests and docs

* clean up

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* small clean up

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* explicit

* wip

* fix bug in TestStore.receive

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* fixes

* wip

* tools for non-deterministic TestStore.receive

* fix

* wip

* wip

* remove inAnyOrder stuff

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* convert download case study to use async/await

* animations

* fix tests

* remove executor experiment

* wip

* wip

* wip

* wip

* wip

* speech simplification

* wip

* wip

* wip

* wip

* wip

* wip

* add a few todos

* wrote some tests

* simplify speech recognizer

* fix tests

* update some docs about error throwing behavior

* wip

* wip

* fix

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Swift 5.5.2 fixes

* wip

* Bump timeout

* wip

* wip

* Finesse

* proper way to detect main queue

* extra guard

* revert main queue check

* move stuff around

* docs

* fixed a bunch of warnings

* Fix references

* clean up

* clean up

* fix a bunch of warnings

* clean up

* un-soft deprecate concatenate

* async teststore.send

* fix uikit tests

* drop sendable

* wip

* wip

* wip

* wip

* wip

* clean up

* clean up

* reorganize, remove extra task cancellation handler

* wip

* wip

* wip

* wip

* wip

* wip

* Make TestStore.send async (#1190)

* async teststore.send

* fix uikit tests

* Converted all tests to async

* clean up

* added docs

* Update Sources/ComposableArchitecture/TestStore.swift

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* Update Sources/ComposableArchitecture/TestStore.swift

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* docs and readme update

* Update README.md

* Update Tests/ComposableArchitectureTests/StoreTests.swift

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* fix

* Update Sources/ComposableArchitecture/TestStore.swift

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* Update Sources/ComposableArchitecture/TestStore.swift

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* Update Sources/ComposableArchitecture/TestStore.swift

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* clean up

Co-authored-by: Stephen Celis <stephen@stephencelis.com>

* wip

* wip

* wip

* make fetchNumber throwing and fix tests

* effect basics clean up

* use local state for isLoading in refreshable case study

* clean up

* fix test

* wip

* wip

* wip

* wip

* wip

* wip

* fixes

* clean up

* clean up

* Simplify

* wip

* clean up

* wip

* AsyncStream.finished()

* give Send a public initializer

* make send public

* temporarily make box public

* remove concurrency flag

* wip

* wip

* wip

* wip

* wip

* docs

* speech

* simplify

* clean up;

* unchecked sendable

* clean up

* clean up

* wip

* docs

* docs

* more docs

* lots of docs

* wip

* wip

* wip

* more docs for streamWithContinuation

* wip

* wip

* wip

* Make internal, too

* wip

* Remove sendability detection

It breaks things, like:

    let request = UncheckedSendable(
      SKProductsRequest(productIdentifiers: []
    )
    // UncheckedSendable<NSObject> // *not* _<SKProductsRequest>

* wip

* doc clean up;

* fixed some todos

* docs

* wip

* remove thread safety FAQ from readme

* fix test

* wip

* docs clean up

* docs clean up

* added a testing article and fixed some docs

* rearrange

* docs clean up

* wip

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* Update Sources/ComposableArchitecture/Documentation.docc/Articles/Testing.md

Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>

* wip

* wip

* wip

Co-authored-by: Stephen Celis <stephen@stephencelis.com>
Co-authored-by: Thomas Grapperon <35562418+tgrapperon@users.noreply.github.com>
2022-08-08 01:04:16 -04:00

194 lines
5.9 KiB
Swift

// https://github.com/CombineCommunity/CombineExt/blob/master/Sources/Operators/Create.swift
// Copyright (c) 2020 Combine Community, and/or Shai Mishali
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
import Combine
import Darwin
final class DemandBuffer<S: Subscriber>: @unchecked Sendable {
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
private let lock: os_unfair_lock_t
init(subscriber: S) {
self.subscriber = subscriber
self.lock = os_unfair_lock_t.allocate(capacity: 1)
self.lock.initialize(to: os_unfair_lock())
}
deinit {
self.lock.deinitialize(count: 1)
self.lock.deallocate()
}
func buffer(value: S.Input) -> Subscribers.Demand {
precondition(
self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️")
switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
}
}
func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(
self.completion == nil, "Completion have already occurred, which is quite awkward 🥺")
self.completion = completion
_ = flush()
}
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
self.lock.sync {
if let newDemand = newDemand {
demandState.requested += newDemand
}
// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}
if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}
let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
}
}
struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}
extension AnyPublisher {
private init(
_ callback: @escaping (Effect<Output, Failure>.Subscriber) -> Cancellable
) {
self = Publishers.Create(callback: callback).eraseToAnyPublisher()
}
static func create(
_ factory: @escaping (Effect<Output, Failure>.Subscriber) -> Cancellable
) -> AnyPublisher<Output, Failure> {
AnyPublisher(factory)
}
}
extension Publishers {
fileprivate class Create<Output, Failure: Swift.Error>: Publisher {
private let callback: (Effect<Output, Failure>.Subscriber) -> Cancellable
init(callback: @escaping (Effect<Output, Failure>.Subscriber) -> Cancellable) {
self.callback = callback
}
func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
subscriber.receive(subscription: Subscription(callback: callback, downstream: subscriber))
}
}
}
extension Publishers.Create {
fileprivate final class Subscription<Downstream: Subscriber>: Combine.Subscription
where Downstream.Input == Output, Downstream.Failure == Failure {
private let buffer: DemandBuffer<Downstream>
private var cancellable: Cancellable?
init(
callback: @escaping (Effect<Output, Failure>.Subscriber) -> Cancellable,
downstream: Downstream
) {
self.buffer = DemandBuffer(subscriber: downstream)
let cancellable = callback(
.init(
send: { [weak self] in _ = self?.buffer.buffer(value: $0) },
complete: { [weak self] in self?.buffer.complete(completion: $0) }
)
)
self.cancellable = cancellable
}
func request(_ demand: Subscribers.Demand) {
_ = self.buffer.demand(demand)
}
func cancel() {
self.cancellable?.cancel()
}
}
}
extension Publishers.Create.Subscription: CustomStringConvertible {
var description: String {
return "Create.Subscription<\(Output.self), \(Failure.self)>"
}
}
extension Effect {
public struct Subscriber {
private let _send: (Output) -> Void
private let _complete: (Subscribers.Completion<Failure>) -> Void
init(
send: @escaping (Output) -> Void,
complete: @escaping (Subscribers.Completion<Failure>) -> Void
) {
self._send = send
self._complete = complete
}
public func send(_ value: Output) {
self._send(value)
}
public func send(completion: Subscribers.Completion<Failure>) {
self._complete(completion)
}
}
}