Files
swift-mirror/test/Concurrency/Runtime/startImmediately.swift
Pavel Yaskevich 95d4ef5756 [Concurrency] Remove deprecated Task.startSynchronously API
This is the original spelling which was not accepted. We kept
it for a bit to give adopters time to switch but it's time to
remove it now.
2025-07-25 13:15:02 -07:00

563 lines
22 KiB
Swift

// RUN: %empty-directory(%t)
// RUN: %target-build-swift -plugin-path %swift-plugin-dir -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out
// RUN: %target-codesign %t/a.out
// RUN: %target-run %t/a.out | %FileCheck %s --dump-input=always
// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: concurrency_runtime
// REQUIRES: libdispatch
// UNSUPPORTED: back_deployment_runtime
// UNSUPPORTED: back_deploy_concurrency
// UNSUPPORTED: use_os_stdlib
// UNSUPPORTED: freestanding
import _Concurrency
import Dispatch
enum CompareHow {
case equal
case notEqual
func check(_ wasEqual: Bool) -> Bool {
switch self {
case .equal: wasEqual
case .notEqual: !wasEqual
}
}
}
#if canImport(Darwin)
import Darwin
typealias ThreadID = pthread_t
func getCurrentThreadID() -> ThreadID { pthread_self() }
func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool {
how.check(pthread_equal(a, b) != 0)
}
#elseif canImport(Glibc)
import Glibc
typealias ThreadID = pthread_t
func getCurrentThreadID() -> ThreadID { pthread_self() }
func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool {
how.check(pthread_equal(a, b) != 0)
}
#elseif os(Windows)
import WinSDK
typealias ThreadID = UInt32
func getCurrentThreadID() -> ThreadID { GetCurrentThreadId() }
func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool {
how.check(a == b)
}
#elseif os(WASI)
typealias ThreadID = UInt32
func getCurrentThreadID() -> ThreadID { 0 }
func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool {
how.check(a == b)
}
#endif
extension ThreadID: @unchecked Sendable {}
@globalActor
actor MyGlobalActor {
static let shared: MyGlobalActor = MyGlobalActor()
@MyGlobalActor
static func test() {}
}
final class NaiveQueueExecutor: SerialExecutor, TaskExecutor {
let queue: DispatchQueue
init(queue: DispatchQueue) {
self.queue = queue
}
public func enqueue(_ job: consuming ExecutorJob) {
let unowned = UnownedJob(job)
print("NaiveQueueExecutor(\(self.queue.label)) enqueue [thread:\(getCurrentThreadID())]")
queue.async {
unowned.runSynchronously(on: self.asUnownedSerialExecutor())
}
}
}
@globalActor
actor DifferentGlobalActor {
static let queue = DispatchQueue(label: "DifferentGlobalActor-queue")
let executor: NaiveQueueExecutor
nonisolated let unownedExecutor: UnownedSerialExecutor
init() {
self.executor = NaiveQueueExecutor(queue: DifferentGlobalActor.queue)
self.unownedExecutor = executor.asUnownedSerialExecutor()
}
static let shared: DifferentGlobalActor = DifferentGlobalActor()
@DifferentGlobalActor
static func test() {}
}
// Test on all platforms
func syncOnMyGlobalActor() -> [Task<Void, Never>] {
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
print("Confirmed to be on @MyGlobalActor")
// This task must be guaranteed to happen AFTER 'tt' because we are already on this actor
// so this enqueue must happen after we give up the actor.
print("schedule Task { @MyGlobalActor }, before immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
let t1 = Task { @MyGlobalActor in
print("inside Task { @MyGlobalActor }, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
}
print("before immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
let outerTID = getCurrentThreadID()
let tt = Task.immediate { @MyGlobalActor in
let innerTID = getCurrentThreadID()
print("inside immediate, outer thread = \(outerTID)")
print("inside immediate, inner thread = \(innerTID)")
if (compareThreadIDs(outerTID, .notEqual, innerTID)) {
print("ERROR! Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!")
}
print("inside immediate, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)")
_ = try? await Task.sleep(for: .seconds(1))
print("after sleep, inside immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
}
return [t1, tt]
}
func syncOnMyGlobalActorHopToDifferentActor() -> [Task<Void, Never>] {
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
print("Confirmed to be on @MyGlobalActor")
// This task must be guaranteed to happen AFTER 'tt' because we are already on this actor
// so this enqueue must happen after we give up the actor.
print("schedule Task { @DifferentGlobalActor }, before immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
let t1 = Task { @DifferentGlobalActor in
print("inside Task { @DifferentGlobalActor } [thread:\(getCurrentThreadID())] @ :\(#line)")
DifferentGlobalActor.shared.preconditionIsolated("Expected Task{} to be on DifferentGlobalActor")
}
print("before immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
let outerTID = getCurrentThreadID()
let tt = Task.immediate { @DifferentGlobalActor in
let innerTID = getCurrentThreadID()
print("inside immediate, outer thread = \(outerTID)")
print("inside immediate, inner thread = \(innerTID)")
if (compareThreadIDs(outerTID, .equal, innerTID)) {
// This case specifically is NOT synchronously run because we specified a different isolation for the closure
// and FORCED a hop to the DifferentGlobalActor executor.
print("ERROR! Outer Thread ID must NOT equal Thread ID inside runSynchronously synchronous part!")
}
// We crucially need to see this task be enqueued on the different global actor,
// so it did not execute "synchronously" after all - it had to hop to the other actor.
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.immediate { @DifferentGlobalActor in } to be on DifferentGlobalActor")
print("inside immediate, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)")
_ = try? await Task.sleep(for: .milliseconds(100))
print("inside immediate, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
dispatchPrecondition(condition: .onQueue(DifferentGlobalActor.queue))
DifferentGlobalActor.shared.preconditionIsolated("Expected Task.immediate { @DifferentGlobalActor in } to be on DifferentGlobalActor")
// do something here
await MyGlobalActor.test()
DifferentGlobalActor.test()
}
return [t1, tt]
}
func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) {
let sem1 = DispatchSemaphore(value: 0)
let sem2 = DispatchSemaphore(value: 0)
let queue = DispatchQueue(label: "CustomQueue")
queue.async {
// This is in order so we don't have a "current task" nor any "current executor"
print("before immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
let outerTID = getCurrentThreadID()
let tt = Task.immediate {
dispatchPrecondition(condition: .onQueue(queue))
let innerTID = getCurrentThreadID()
if compareThreadIDs(outerTID, .notEqual, innerTID) {
print("inside immediate, outer thread = \(outerTID)")
print("inside immediate, inner thread = \(innerTID)")
print("ERROR! Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!")
}
print("inside immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
switch behavior {
case .suspend:
// sleep until woken up by outer task; i.e. actually suspend
print("inside immediate, before sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
_ = try? await Task.sleep(for: .seconds(10))
print("inside immediate, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)")
case .dontSuspend:
print("inside immediate, done [thread:\(getCurrentThreadID())] @ :\(#line)")
()
}
sem1.signal()
}
print("after immediate, outside; cancel (wakeup) the synchronous task! [thread:\(getCurrentThreadID())] @ :\(#line)")
tt.cancel() // wake up the sleep
sem1.wait()
sem2.signal()
}
sem2.wait()
}
enum SynchronousTaskBehavior {
case suspend
case dontSuspend
}
print("\n\n==== ------------------------------------------------------------------")
print("syncOnMyGlobalActor()")
await Task { @MyGlobalActor in
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
for t in syncOnMyGlobalActor() {
await t.value
}
}.value
// CHECK-LABEL: syncOnMyGlobalActor()
// CHECK: Confirmed to be on @MyGlobalActor
// CHECK: schedule Task { @MyGlobalActor }, before immediate [thread:[[CALLING_THREAD:.*]]]
// CHECK: before immediate [thread:[[CALLING_THREAD]]]
// CHECK-NOT: ERROR!
// CHECK: inside immediate, sleep now
// CHECK: inside Task { @MyGlobalActor }, after sleep
// resume on some other thread
// CHECK: after sleep, inside immediate
print("\n\n==== ------------------------------------------------------------------")
print("syncOnMyGlobalActorHopToDifferentActor()")
await Task { @MyGlobalActor in
MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here")
for t in syncOnMyGlobalActorHopToDifferentActor() {
await t.value
}
}.value
// Assertion Notes: We expect the task to be on the specified queue as we force the Task.immediate
// task to enqueue on the DifferentGlobalActor, however we CANNOT use threads to verify this behavior,
// because dispatch may still pull tricks and reuse threads. We can only verify that we're on the right
// queue, and that the `enqueue` calls on the target executor happen when we expect them to.
//
// CHECK: syncOnMyGlobalActorHopToDifferentActor()
// CHECK: Confirmed to be on @MyGlobalActor
// CHECK: before immediate
// This IS actually enqueueing on the target actor (not synchronous), as expected:
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
// CHECK: inside immediate, sleep now
// After the sleep we get back onto the specified executor as expected
// CHECK: NaiveQueueExecutor(DifferentGlobalActor-queue) enqueue
// CHECK: inside immediate, after sleep
print("\n\n==== ------------------------------------------------------------------")
var behavior: SynchronousTaskBehavior = .suspend
print("syncOnNonTaskThread(synchronousTask: \(behavior))")
syncOnNonTaskThread(synchronousTask: behavior)
// CHECK-LABEL: syncOnNonTaskThread(synchronousTask: suspend)
// No interleaving allowed between "before" and "inside":
// CHECK-NEXT: before immediate [thread:[[CALLING_THREAD2:.*]]]
// CHECK-NOT: ERROR!
// CHECK-NEXT: inside immediate [thread:[[CALLING_THREAD2]]]
// CHECK-NEXT: inside immediate, before sleep [thread:[[CALLING_THREAD2]]]
// CHECK-NEXT: after immediate, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD2]]]
// CHECK-NEXT: inside immediate, after sleep
print("\n\n==== ------------------------------------------------------------------")
behavior = .dontSuspend
print("syncOnNonTaskThread(synchronousTask: \(behavior))")
syncOnNonTaskThread(synchronousTask: behavior)
// CHECK-LABEL: syncOnNonTaskThread(synchronousTask: dontSuspend)
// CHECK-NEXT: before immediate [thread:[[CALLING_THREAD3:.*]]]
// CHECK-NOT: ERROR!
// CHECK-NEXT: inside immediate [thread:[[CALLING_THREAD3]]]
// CHECK: inside immediate, done [thread:[[CALLING_THREAD3]]]
// CHECK: after immediate, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD3]]]
print("\n\n==== ------------------------------------------------------------------")
print("callActorFromStartSynchronousTask() - not on specific queue")
callActorFromStartSynchronousTask(recipient: .recipient(Recipient()))
// CHECK: callActorFromStartSynchronousTask()
// No interleaving allowed between "before" and "inside":
// CHECK: before immediate [thread:[[CALLING_THREAD4:.*]]]
// CHECK-NEXT: inside immediate [thread:[[CALLING_THREAD4]]]
// It is important that as we suspend on the actor call, the 'after' immediate gets to run
// CHECK-NEXT: inside immediate, call rec.sync() [thread:[[CALLING_THREAD4]]]
// CHECK: after immediate
// CHECK-NOT: ERROR!
// CHECK: inside immediate, call rec.sync() done
// CHECK-NOT: ERROR!
// CHECK: inside immediate, done
/// Don't want to involve protocol calls to not confuse the test with additional details,
/// so we use concrete types here.
enum TargetActorToCall {
case recipient(Recipient)
case recipientOnQueue(RecipientOnQueue)
}
protocol RecipientProtocol where Self: Actor {
func callAndSuspend(syncTaskThreadID: ThreadID) async
}
// default actor, must not declare an 'unownedExecutor'
actor Recipient: RecipientProtocol {
func callAndSuspend(syncTaskThreadID: ThreadID) async {
self.preconditionIsolated()
print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)")
if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) {
print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)")
}
try? await Task.sleep(for: .milliseconds(100))
}
}
func callActorFromStartSynchronousTask(recipient rec: TargetActorToCall) {
let sem1 = DispatchSemaphore(value: 0)
let sem2 = DispatchSemaphore(value: 0)
let queue = DispatchQueue(label: "CustomQueue")
queue.async {
let outerTID = getCurrentThreadID()
print("before immediate [thread:\(outerTID)] @ :\(#line)")
let tt = Task.immediate {
dispatchPrecondition(condition: .onQueue(queue))
let innerTID = getCurrentThreadID()
precondition(compareThreadIDs(outerTID, .equal, innerTID), "Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!")
print("inside immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
for i in 1..<10 {
queue.async {
print("- async work on queue")
}
}
print("inside immediate, call rec.sync() [thread:\(getCurrentThreadID())] @ :\(#line)")
switch rec {
case .recipient(let recipient): await recipient.callAndSuspend(syncTaskThreadID: innerTID)
case .recipientOnQueue(let recipient): await recipient.callAndSuspend(syncTaskThreadID: innerTID)
}
print("inside immediate, call rec.sync() done [thread:\(getCurrentThreadID())] @ :\(#line)")
// after suspension we are supposed to hop off to the global pool,
// thus the thread IDs cannot be the same anymore
print("Inner thread id = \(innerTID)")
print("Current thread id = \(getCurrentThreadID())")
// Dispatch may end up reusing the thread used to service the queue so we
// cannot truly assert exact thread identity in such tests.
// Usually this will be on a different thread by now though.
if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) {
print("NOTICE: Task resumed on same thread as it entered the synchronous task!")
}
print("inside immediate, done [thread:\(getCurrentThreadID())] @ :\(#line)")
sem1.signal()
}
print("after immediate [thread:\(getCurrentThreadID())] @ :\(#line)")
sem2.signal()
}
sem1.wait()
sem2.wait()
}
print("\n\n==== ------------------------------------------------------------------")
print("callActorFromStartSynchronousTask() - actor in custom executor with its own queue")
let actorQueue = DispatchQueue(label: "recipient-actor-queue")
callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue(queue: actorQueue)))
// CHECK-LABEL: callActorFromStartSynchronousTask() - actor in custom executor with its own queue
// No interleaving allowed between "before" and "inside":
// CHECK: before immediate [thread:[[CALLING_THREAD4:.*]]]
// CHECK-NEXT: inside immediate [thread:[[CALLING_THREAD4]]]
// As we call into an actor, we must enqueue to its custom executor;
// Make sure the enqueue happens as expected and only then do we give up the calling thread
// allowing the 'after immediate' to run.
//
// CHECK-NEXT: inside immediate, call rec.sync() [thread:[[CALLING_THREAD4]]]
// CHECK: after immediate
// CHECK-NOT: ERROR!
// CHECK: inside immediate, call rec.sync() done
// CHECK-NOT: ERROR!
// CHECK: inside immediate, done
actor RecipientOnQueue: RecipientProtocol {
let executor: NaiveQueueExecutor
nonisolated let unownedExecutor: UnownedSerialExecutor
init(queue: DispatchQueue) {
self.executor = NaiveQueueExecutor(queue: queue)
self.unownedExecutor = executor.asUnownedSerialExecutor()
}
func callAndSuspend(syncTaskThreadID: ThreadID) async {
self.preconditionIsolated()
dispatchPrecondition(condition: .onQueue(self.executor.queue))
print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)")
if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) {
print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)")
}
try? await Task.sleep(for: .milliseconds(100))
}
}
print("\n\n==== ------------------------------------------------------------------")
print("call_taskImmediate_insideActor()")
actor A {
func f() {
Task.immediate(name: "hello") { print("Task.immediate (\(Task.name!))") }
Task.immediate() { print("Task.immediate") }
Task.immediate(name: "hello") { @MainActor in print("Task.immediate { @MainActor } (\(Task.name!))") }
Task.immediate() { @MainActor in print("Task.immediate { @MainActor }") }
}
}
func call_immediate_insideActor() async {
await A().f()
}
await call_immediate_insideActor()
print("\n\n==== ------------------------------------------------------------------")
print("call_taskImmediate_taskExecutor()")
@TaskLocal
nonisolated(unsafe) var niceTaskLocalValueYouGotThere: String = ""
// FIXME: rdar://155596073 Task executors execution may not always hop as expected
func call_taskImmediate_taskExecutor(taskExecutor: NaiveQueueExecutor) async {
await Task.immediate(executorPreference: taskExecutor) {
print("Task.immediate(executorPreference:)")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue))
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue))
}.value
await Task.immediate(executorPreference: taskExecutor) { @MainActor in
print("Task.immediate(executorPreference:) { @MainActor }")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // since @MainActor requirement > preference
await Task.yield()
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // since @MainActor requirement > preference
}.value
await $niceTaskLocalValueYouGotThere.withValue("value") {
assert(niceTaskLocalValueYouGotThere == "value")
// Task.immediate copies task locals
await Task.immediate(executorPreference: taskExecutor) {
assert(niceTaskLocalValueYouGotThere == "value")
}.value
// Task.immediateDetached does not copy task locals
await Task.immediateDetached(executorPreference: taskExecutor) {
assert(niceTaskLocalValueYouGotThere == "")
}.value
}
await withTaskGroup { group in
print("withTaskGroup { group.addTask(executorPreference:) { ... } }")
group.addImmediateTask(executorPreference: taskExecutor) {
print("addImmediateTask")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
let _: Bool = group.addImmediateTaskUnlessCancelled(executorPreference: taskExecutor) {
print("addImmediateTaskUnlessCancelled")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
}
await withThrowingTaskGroup { group in
print("withThrowingTaskGroup { group.addTask(executorPreference:) { ... } }")
group.addImmediateTask(executorPreference: taskExecutor) {
print("addImmediateTask")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
let _: Bool = group.addImmediateTaskUnlessCancelled(executorPreference: taskExecutor) {
print("addImmediateTaskUnlessCancelled")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
}
await withDiscardingTaskGroup { group in
print("withDiscardingTaskGroup { group.addTask(executorPreference:) { ... } }")
group.addImmediateTask(executorPreference: taskExecutor) {
print("addImmediateTask")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
let _: Bool = group.addImmediateTaskUnlessCancelled(executorPreference: taskExecutor) {
print("addImmediateTaskUnlessCancelled")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
}
await withDiscardingTaskGroup { group in
print("withDiscardingTaskGroup { group.addTask(executorPreference:) { ... } }")
group.addImmediateTask(executorPreference: taskExecutor) {
print("addImmediateTask")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
let _: Bool = group.addImmediateTaskUnlessCancelled(executorPreference: taskExecutor) {
print("addImmediateTaskUnlessCancelled")
dispatchPrecondition(condition: .notOnQueue(taskExecutor.queue)) // "immediately" on caller
await Task.yield()
dispatchPrecondition(condition: .onQueue(taskExecutor.queue)) // hopped to preferred executor
}
}
}
// FIXME: rdar://155596073 task executors can be somewhat racy it seems and not always hop as we'd want them to
// await call_taskImmediate_taskExecutor(
// taskExecutor: NaiveQueueExecutor(queue: DispatchQueue(label: "my-queue")))
print("DONE!") // CHECK: DONE!