mirror of
https://github.com/apple/swift.git
synced 2025-12-14 20:36:38 +01:00
496 lines
17 KiB
Swift
496 lines
17 KiB
Swift
// RUN: %target-typecheck-verify-swift -strict-concurrency=complete -disable-availability-checking -parse-as-library
|
|
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library)
|
|
|
|
// REQUIRES: concurrency
|
|
// REQUIRES: executable_test
|
|
// REQUIRES: concurrency_runtime
|
|
|
|
// rdar://78109470
|
|
// UNSUPPORTED: back_deployment_runtime
|
|
// UNSUPPORTED: freestanding
|
|
|
|
import _Concurrency
|
|
import StdlibUnittest
|
|
|
|
struct SomeError: Error, Equatable {
|
|
var value = Int.random(in: 0..<100)
|
|
}
|
|
|
|
class NotSendable {}
|
|
|
|
@MainActor func testWarnings() {
|
|
var x = 0
|
|
_ = AsyncStream {
|
|
x += 1 // expected-warning {{mutation of captured var 'x' in concurrently-executing code}}
|
|
return 0
|
|
}
|
|
|
|
_ = AsyncThrowingStream {
|
|
x += 1 // expected-warning {{mutation of captured var 'x' in concurrently-executing code}}
|
|
return
|
|
}
|
|
}
|
|
|
|
@MainActor var tests = TestSuite("AsyncStream")
|
|
|
|
@main struct Main {
|
|
static func main() async {
|
|
if #available(SwiftStdlib 5.5, *) {
|
|
final class Expectation: @unchecked Sendable {
|
|
var fulfilled = false
|
|
}
|
|
|
|
tests.test("factory method") {
|
|
let (stream, continuation) = AsyncStream.makeStream(of: String.self)
|
|
continuation.yield("hello")
|
|
|
|
var iterator = stream.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
}
|
|
|
|
tests.test("throwing factory method") {
|
|
let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self, throwing: Error.self)
|
|
continuation.yield("hello")
|
|
|
|
var iterator = stream.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with no awaiting next") {
|
|
_ = AsyncStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with no awaiting next throwing") {
|
|
_ = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
}
|
|
|
|
tests.test("yield with awaiting next throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(await iterator.next(isolation: #isolation), "world")
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(await iterator.next(isolation: #isolation), nil)
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and throw") {
|
|
let thrownError = SomeError()
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish(throwing: thrownError)
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
_ = try await iterator.next(isolation: #isolation)
|
|
expectUnreachable("expected thrown error")
|
|
} catch {
|
|
if let failure = error as? SomeError {
|
|
expectEqual(failure, thrownError)
|
|
} else {
|
|
expectUnreachable("unexpected error type")
|
|
}
|
|
}
|
|
}
|
|
|
|
tests.test("yield with no awaiting next detached") {
|
|
_ = AsyncStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
}
|
|
}
|
|
}
|
|
|
|
tests.test("yield with no awaiting next detached throwing") {
|
|
_ = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
}
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next detached") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
}
|
|
|
|
tests.test("yield with awaiting next detached throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 detached") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(await iterator.next(isolation: #isolation), "world")
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 detached throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish detached") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(await iterator.next(isolation: #isolation), nil)
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish detached throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and throw detached") {
|
|
let thrownError = SomeError()
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish(throwing: thrownError)
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
_ = try await iterator.next(isolation: #isolation)
|
|
expectUnreachable("expected thrown error")
|
|
} catch {
|
|
if let failure = error as? SomeError {
|
|
expectEqual(failure, thrownError)
|
|
} else {
|
|
expectUnreachable("unexpected error type")
|
|
}
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish detached with value after finish") {
|
|
let series = AsyncStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
continuation.yield("This should not be emitted")
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
expectEqual(await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(await iterator.next(isolation: #isolation), nil)
|
|
expectEqual(await iterator.next(isolation: #isolation), nil)
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish detached with value after finish throwing") {
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
continuation.yield("This should not be emitted")
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish detached with throw after finish throwing") {
|
|
let thrownError = SomeError()
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
detach {
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
continuation.finish(throwing: thrownError)
|
|
}
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("yield with awaiting next 2 and finish with throw after finish throwing") {
|
|
let thrownError = SomeError()
|
|
let series = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.yield("hello")
|
|
continuation.yield("world")
|
|
continuation.finish()
|
|
continuation.finish(throwing: thrownError)
|
|
}
|
|
var iterator = series.makeAsyncIterator()
|
|
do {
|
|
expectEqual(try await iterator.next(isolation: #isolation), "hello")
|
|
expectEqual(try await iterator.next(isolation: #isolation), "world")
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
expectEqual(try await iterator.next(isolation: #isolation), nil)
|
|
} catch {
|
|
expectUnreachable("unexpected error thrown")
|
|
}
|
|
}
|
|
|
|
tests.test("cancellation behavior on deinit with no values being awaited") {
|
|
let expectation = Expectation()
|
|
|
|
func scopedLifetime(_ expectation: Expectation) {
|
|
_ = AsyncStream(String.self) { continuation in
|
|
continuation.onTermination = { @Sendable _ in expectation.fulfilled = true }
|
|
}
|
|
}
|
|
|
|
scopedLifetime(expectation)
|
|
|
|
expectTrue(expectation.fulfilled)
|
|
}
|
|
|
|
tests.test("termination behavior on deinit with no values being awaited") {
|
|
let expectation = Expectation()
|
|
|
|
func scopedLifetime(_ expectation: Expectation) {
|
|
_ = AsyncStream(String.self) { continuation in
|
|
continuation.onTermination = { @Sendable _ in expectation.fulfilled = true }
|
|
continuation.finish()
|
|
}
|
|
}
|
|
|
|
scopedLifetime(expectation)
|
|
|
|
expectTrue(expectation.fulfilled)
|
|
}
|
|
|
|
tests.test("cancellation behavior on deinit with no values being awaited") {
|
|
let expectation = Expectation()
|
|
|
|
func scopedLifetime(_ expectation: Expectation) {
|
|
_ = AsyncStream(String.self) { continuation in
|
|
continuation.onTermination = { @Sendable terminal in
|
|
switch terminal {
|
|
case .cancelled:
|
|
expectation.fulfilled = true
|
|
default: break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
scopedLifetime(expectation)
|
|
|
|
expectTrue(expectation.fulfilled)
|
|
}
|
|
|
|
tests.test("cancellation behavior on deinit with no values being awaited throwing") {
|
|
let expectation = Expectation()
|
|
|
|
func scopedLifetime(_ expectation: Expectation) {
|
|
_ = AsyncThrowingStream(String.self) { continuation in
|
|
continuation.onTermination = { @Sendable terminal in
|
|
switch terminal {
|
|
case .cancelled:
|
|
expectation.fulfilled = true
|
|
default: break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
scopedLifetime(expectation)
|
|
|
|
expectTrue(expectation.fulfilled)
|
|
}
|
|
|
|
tests.test("continuation equality") {
|
|
let (_, continuation1) = AsyncStream<Int>.makeStream()
|
|
let (_, continuation2) = AsyncStream<Int>.makeStream()
|
|
expectTrue(continuation1 == continuation1)
|
|
expectTrue(continuation1 != continuation2)
|
|
expectTrue(continuation1.hashValue == continuation1.hashValue)
|
|
expectTrue(continuation1.hashValue != continuation2.hashValue)
|
|
}
|
|
|
|
tests.test("throwing continuation equality") {
|
|
let (_, continuation1) = AsyncThrowingStream<Int, Error>.makeStream()
|
|
let (_, continuation2) = AsyncThrowingStream<Int, Error>.makeStream()
|
|
expectTrue(continuation1 == continuation1)
|
|
expectTrue(continuation1 != continuation2)
|
|
expectTrue(continuation1.hashValue == continuation1.hashValue)
|
|
expectTrue(continuation1.hashValue != continuation2.hashValue)
|
|
}
|
|
|
|
// MARK: - Multiple consumers
|
|
|
|
tests.test("finish behavior with multiple consumers") {
|
|
let (stream, continuation) = AsyncStream<Int>.makeStream()
|
|
let (controlStream, controlContinuation) = AsyncStream<Int>.makeStream()
|
|
var controlIterator = controlStream.makeAsyncIterator()
|
|
|
|
func makeConsumingTaskWithIndex(_ index: Int) -> Task<Void, Never> {
|
|
Task { @MainActor in
|
|
controlContinuation.yield(index)
|
|
for await i in stream {
|
|
controlContinuation.yield(i)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set up multiple consumers
|
|
let consumer1 = makeConsumingTaskWithIndex(1)
|
|
expectEqual(await controlIterator.next(isolation: #isolation), 1)
|
|
|
|
let consumer2 = makeConsumingTaskWithIndex(2)
|
|
expectEqual(await controlIterator.next(isolation: #isolation), 2)
|
|
|
|
// Ensure the iterators are suspended
|
|
await MainActor.run {}
|
|
|
|
// Terminate the stream
|
|
continuation.finish()
|
|
|
|
// Ensure the consuming Tasks both complete
|
|
_ = await consumer1.value
|
|
_ = await consumer2.value
|
|
}
|
|
|
|
await runAllTestsAsync()
|
|
}
|
|
}
|
|
}
|
|
|
|
|