//===----------------------------------------------------------------------===// // // This source file is part of the Swift.org open source project // // Copyright (c) 2014 - 2018 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information // See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors // //===----------------------------------------------------------------------===// import Foundation /// Abstraction layer so we can store a heterogeneous collection of tasks in an /// array. private protocol AnyTask: Sendable { func waitForCompletion() async func cancel() } extension Task: AnyTask { func waitForCompletion() async { _ = try? await value } } /// A type that is able to track dependencies between tasks. package protocol DependencyTracker: Sendable { /// Which tasks need to finish before a task described by `self` may start executing. /// `pendingTasks` is sorted in the order in which the tasks were enqueued to `AsyncQueue`. func dependencies(in pendingTasks: [PendingTask]) -> [PendingTask] } /// A dependency tracker where each task depends on every other, i.e. a serial /// queue. package struct Serial: DependencyTracker { package func dependencies(in pendingTasks: [PendingTask]) -> [PendingTask] { if let lastTask = pendingTasks.last { return [lastTask] } return [] } } package struct PendingTask: Sendable { /// The task that is pending. fileprivate let task: any AnyTask package let metadata: TaskMetadata /// A unique value used to identify the task. This allows tasks to get /// removed from `pendingTasks` again after they finished executing. fileprivate let id: UUID } /// A list of pending tasks that can be sent across actor boundaries and is guarded by a lock. /// /// - Note: Unchecked sendable because the tasks are being protected by a lock. private class PendingTasks: @unchecked Sendable { /// Lock guarding `pendingTasks`. private let lock = NSLock() /// Pending tasks that have not finished execution yet. /// /// - Important: This must only be accessed while `lock` has been acquired. private var tasks: [PendingTask] = [] init() { self.lock.name = "AsyncQueue" } /// Capture a lock and execute the closure, which may modify the pending tasks. func withLock(_ body: (_ pendingTasks: inout [PendingTask]) throws -> T) rethrows -> T { try lock.withLock { try body(&tasks) } } } /// A queue that allows the execution of asynchronous blocks of code. package final class AsyncQueue: Sendable { private let pendingTasks: PendingTasks = PendingTasks() package init() {} /// Schedule a new closure to be executed on the queue. /// /// If this is a serial queue, all previously added tasks are guaranteed to /// finished executing before this closure gets executed. /// /// If this is a barrier, all previously scheduled tasks are guaranteed to /// finish execution before the barrier is executed and all tasks that are /// added later will wait until the barrier finishes execution. @discardableResult package func async( priority: TaskPriority? = nil, metadata: TaskMetadata, @_inheritActorContext operation: @escaping @Sendable () async -> Success ) -> Task { let throwingTask = asyncThrowing(priority: priority, metadata: metadata, operation: operation) return Task(priority: priority) { do { return try await throwingTask.valuePropagatingCancellation } catch { // We know this can never happen because `operation` does not throw. preconditionFailure("Executing a task threw an error even though the operation did not throw") } } } /// Same as ``AsyncQueue/async(priority:barrier:operation:)`` but allows the /// operation to throw. /// /// - Important: The caller is responsible for handling any errors thrown from /// the operation by awaiting the result of the returned task. package func asyncThrowing( priority: TaskPriority? = nil, metadata: TaskMetadata, @_inheritActorContext operation: @escaping @Sendable () async throws -> Success ) -> Task { let id = UUID() return pendingTasks.withLock { tasks in // Build the list of tasks that need to finished execution before this one // can be executed let dependencies = metadata.dependencies(in: tasks) // Schedule the task. let task = Task(priority: priority) { [pendingTasks] in // IMPORTANT: The only throwing call in here must be the call to // operation. Otherwise the assumption that the task will never throw // if `operation` does not throw, which we are making in `async` does // not hold anymore. for dependency in dependencies { await dependency.task.waitForCompletion() } let result = try await operation() pendingTasks.withLock { tasks in tasks.removeAll(where: { $0.id == id }) } return result } tasks.append(PendingTask(task: task, metadata: metadata, id: id)) return task } } } /// Convenience overloads for serial queues. extension AsyncQueue where TaskMetadata == Serial { /// Same as ``async(priority:operation:)`` but specialized for serial queues /// that don't specify any metadata. @discardableResult package func async( priority: TaskPriority? = nil, @_inheritActorContext operation: @escaping @Sendable () async -> Success ) -> Task { return self.async(priority: priority, metadata: Serial(), operation: operation) } /// Same as ``asyncThrowing(priority:metadata:operation:)`` but specialized /// for serial queues that don't specify any metadata. package func asyncThrowing( priority: TaskPriority? = nil, @_inheritActorContext operation: @escaping @Sendable () async throws -> Success ) -> Task { return self.asyncThrowing(priority: priority, metadata: Serial(), operation: operation) } }