mirror of
https://github.com/apple/sourcekit-lsp.git
synced 2026-03-02 18:23:24 +01:00
694 lines
32 KiB
Swift
694 lines
32 KiB
Swift
//===----------------------------------------------------------------------===//
|
||
//
|
||
// This source file is part of the Swift.org open source project
|
||
//
|
||
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
|
||
// Licensed under Apache License v2.0 with Runtime Library Exception
|
||
//
|
||
// See https://swift.org/LICENSE.txt for license information
|
||
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
|
||
//
|
||
//===----------------------------------------------------------------------===//
|
||
|
||
import Foundation
|
||
@_spi(SourceKitLSP) import LanguageServerProtocolExtensions
|
||
@_spi(SourceKitLSP) package import SKLogging
|
||
import SwiftExtensions
|
||
@_spi(SourceKitLSP) import ToolsProtocolsSwiftExtensions
|
||
|
||
/// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)``
|
||
package enum TaskDependencyAction<TaskDescription: TaskDescriptionProtocol> {
|
||
case waitAndElevatePriorityOfDependency(TaskDescription)
|
||
case cancelAndRescheduleDependency(TaskDescription)
|
||
}
|
||
|
||
private let taskSchedulerSubsystem = "org.swift.sourcekit-lsp.task-scheduler"
|
||
|
||
package protocol TaskDescriptionProtocol: Identifiable, Sendable, CustomLogStringConvertible {
|
||
/// Execute the task.
|
||
///
|
||
/// - Important: This should only be called from `TaskScheduler` and never be called manually.
|
||
func execute() async
|
||
|
||
/// When a new task is picked for execution, this determines how the task should behave with respect to the tasks that
|
||
/// are already running.
|
||
///
|
||
/// Options are the following (see doc comment on `TaskScheduler` for examples):
|
||
/// 1. Not add any `TaskDependencyAction` for a currently executing task. This means that the two tasks can run in
|
||
/// parallel.
|
||
/// 2. Declare a `waitAndElevatePriorityOfDependency` dependency. This will prevent execution of this task until
|
||
/// the other task has finished executing. It will elevate the priority of the dependency to the same priority as
|
||
/// this task. This ensures that we don't get into a priority inversion problem where a high-priority task is
|
||
/// waiting for a low-priority task.
|
||
/// 3. Declare a `cancelAndRescheduleDependency`. If the task dependency is idempotent and has a priority that's not
|
||
/// higher than the this task's priority, this causes the task dependency to be cancelled, so that this task can
|
||
/// execute. The canceled task will be scheduled to re-run at a later point.
|
||
/// - Declaring a `cancelAndRescheduleDependency` dependency on a task that is not idempotent will change the
|
||
/// dependency to a `waitAndElevatePriorityOfDependency` dependency and log a fault.
|
||
/// A `cancelAndRescheduleDependency` dependency should never be emitted for a task that's not idempotent.
|
||
/// - If the task that should be canceled and re-scheduled has a higher priority than this task, the
|
||
/// `waitAndElevatePriorityOfDependency` dependency is changed to a `waitAndElevatePriorityOfDependency`
|
||
/// dependency. This is done to ensure that low-priority tasks can't interfere with the execution of
|
||
/// high-priority tasks.
|
||
/// - **Important**: The task that is canceled to be rescheduled must depend on this task, otherwise the two tasks
|
||
/// will fight each other for execution priority.
|
||
func dependencies(to currentlyExecutingTasks: [Self]) -> [TaskDependencyAction<Self>]
|
||
|
||
/// Whether executing this task twice produces the same results.
|
||
///
|
||
/// This is required for the task to be canceled and re-scheduled (`TaskDependencyAction.cancelAndRescheduleDependency`)
|
||
///
|
||
/// Tasks that are not idempotent should never be cancelled and rescheduled in the first place. This variable is just
|
||
/// a safety net in case non-idempotent tasks are cancelled and rescheduled. It also ensures that tasks conforming to
|
||
/// `TaskDescriptionProtocol` think about idempotency.
|
||
var isIdempotent: Bool { get }
|
||
|
||
/// The number of CPU cores this task is expected to use.
|
||
///
|
||
/// If the `TaskScheduler` only allows 4 concurrent tasks and a task has `estimatedCPUCoreCount == 4`, this means that
|
||
/// no other tasks will be scheduled while this task is executing. Note that the `TaskScheduler` might over-subscribe
|
||
/// itself to start executing this task though, ie. it only needs to have one available execution slot even if this
|
||
/// task will use 4 CPU cores. This ensures that we get to schedule a 4-core high-priority task in a 4 core scheduler
|
||
/// if there are 100 low-priority 1-core tasks in the queue. Otherwise we would just keep executing those whenever a
|
||
/// slot opens up and only have enough available slots to execute the 4-core high-priority task when all the
|
||
/// low-priority tasks are done.
|
||
///
|
||
/// For example, this is used by preparation tasks that are known to prepare multiple targets (or source files within
|
||
/// one target) in parallel.
|
||
var estimatedCPUCoreCount: Int { get }
|
||
}
|
||
|
||
/// Parameter that's passed to `executionStateChangedCallback` to indicate the new state of a scheduled task.
|
||
package enum TaskExecutionState {
|
||
/// The task started executing.
|
||
case executing
|
||
|
||
/// The task was cancelled and will be re-scheduled for execution later. Will be followed by another call with
|
||
/// `executing`.
|
||
case cancelledToBeRescheduled
|
||
|
||
/// The task has finished executing. Now more state updates will come after this one.
|
||
case finished
|
||
}
|
||
|
||
package actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
|
||
/// Result of `executionTask` / the tasks in `executionTaskCreatedContinuation`.
|
||
/// See doc comment on `executionTask`.
|
||
enum ExecutionTaskFinishStatus {
|
||
case terminated
|
||
case cancelledToBeRescheduled
|
||
}
|
||
|
||
/// The `TaskDescription` that defines what the queued task does.
|
||
///
|
||
/// This is also used to determine dependencies between running tasks.
|
||
nonisolated let description: TaskDescription
|
||
|
||
/// The `Task` that produces the actual result of the `QueuedTask`. This is the task that is visible to clients.
|
||
///
|
||
/// See initialization of this task to see how it works.
|
||
///
|
||
/// - Note: Implicitly unwrapped optional so the task's closure can access `self`.
|
||
/// - Note: `nonisolated(unsafe)` is fine because it will never get modified after being set in the initializer.
|
||
nonisolated(unsafe) private(set) var resultTask: Task<Void, Never>! = nil
|
||
|
||
/// After `execute` is called, the `executionTask` is a task that performs the computation defined by
|
||
/// `description.execute`.
|
||
///
|
||
/// The `resultTask` effectively waits for this task to be set (by watching for new values produced by
|
||
/// `executionTaskCreatedContinuation`) and awaits its result. The task can terminate with two different statuses:
|
||
/// - `terminated`: The task has finished executing and the `resultTask` is done.
|
||
/// - `cancelledToBeRescheduled`: The `executionTask` was cancelled by calling `QueuedTask.cancelToBeRescheduled()`.
|
||
/// In this case the `TaskScheduler` is expected to call `execute` again, which will produce a new
|
||
/// `executionTask`. `resultTask` then awaits the creation of the new `executionTask` and then the result of that
|
||
/// `executionTask`.
|
||
private var executionTask: Task<ExecutionTaskFinishStatus, Never>?
|
||
|
||
/// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`.
|
||
private let executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation
|
||
|
||
private let _priority: AtomicUInt8
|
||
|
||
/// The latest known priority of the task.
|
||
///
|
||
/// This starts off as the priority with which the task is being created. If higher priority tasks start depending on
|
||
/// it, the priority may get elevated.
|
||
nonisolated var priority: TaskPriority {
|
||
get {
|
||
TaskPriority(rawValue: _priority.value)
|
||
}
|
||
set {
|
||
_priority.value = newValue.rawValue
|
||
}
|
||
}
|
||
|
||
/// Whether `cancelToBeRescheduled` has been called on this `QueuedTask`.
|
||
///
|
||
/// Gets reset every time `executionTask` finishes.
|
||
private var cancelledToBeRescheduled: Bool = false
|
||
|
||
/// Whether `resultTask` has been cancelled.
|
||
private let resultTaskCancelled: AtomicBool = .init(initialValue: false)
|
||
|
||
private let _isExecuting: AtomicBool = .init(initialValue: false)
|
||
|
||
/// Whether the task is currently executing or still queued to be executed later.
|
||
package nonisolated var isExecuting: Bool {
|
||
return _isExecuting.value
|
||
}
|
||
|
||
package nonisolated func cancel() {
|
||
resultTask.cancel()
|
||
}
|
||
|
||
/// Wait for the task to finish.
|
||
///
|
||
/// If the tasks that waits for this queued task to finished is cancelled, the QueuedTask will still continue
|
||
/// executing.
|
||
package func waitToFinish() async {
|
||
return await resultTask.value
|
||
}
|
||
|
||
/// Wait for the task to finish.
|
||
///
|
||
/// If the tasks that waits for this queued task to finished is cancelled, the QueuedTask will also be cancelled.
|
||
/// This assumes that the caller of this method has unique control over the task and is the only one interested in its
|
||
/// value.
|
||
package func waitToFinishPropagatingCancellation() async {
|
||
return await resultTask.valuePropagatingCancellation
|
||
}
|
||
|
||
/// A callback that will be called when the task starts executing, is cancelled to be rescheduled, or when it finishes
|
||
/// execution.
|
||
private let executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?
|
||
|
||
fileprivate init(
|
||
priority: TaskPriority,
|
||
description: TaskDescription,
|
||
taskPriorityChangedCallback: @escaping @Sendable (_ newPriority: TaskPriority) -> Void,
|
||
executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?
|
||
) async {
|
||
self._priority = AtomicUInt8(initialValue: priority.rawValue)
|
||
self.description = description
|
||
self.executionStateChangedCallback = executionStateChangedCallback
|
||
|
||
var executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation!
|
||
let executionTaskCreatedStream = AsyncStream {
|
||
executionTaskCreatedContinuation = $0
|
||
}
|
||
self.executionTaskCreatedContinuation = executionTaskCreatedContinuation
|
||
|
||
self.resultTask = Task.detached(priority: priority) {
|
||
await withTaskCancellationHandler {
|
||
await withTaskPriorityChangedHandler(initialPriority: self.priority) {
|
||
for await task in executionTaskCreatedStream {
|
||
switch await task.valuePropagatingCancellation {
|
||
case .cancelledToBeRescheduled:
|
||
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
|
||
break
|
||
case .terminated:
|
||
// The task finished. We are done with this `QueuedTask`
|
||
return
|
||
}
|
||
}
|
||
} taskPriorityChanged: {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.debug(
|
||
"Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)"
|
||
)
|
||
}
|
||
self.priority = Task.currentPriority
|
||
taskPriorityChangedCallback(self.priority)
|
||
}
|
||
} onCancel: {
|
||
self.resultTaskCancelled.value = true
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Start executing the task.
|
||
///
|
||
/// Execution might be canceled to be rescheduled, in which case this returns `.cancelledToBeRescheduled`. In that
|
||
/// case the `TaskScheduler` is expected to call `execute` again.
|
||
func execute() async -> ExecutionTaskFinishStatus {
|
||
if cancelledToBeRescheduled {
|
||
// `QueuedTask.execute` is called from a detached task in `TaskScheduler.poke` but we insert it into the
|
||
// `currentlyExecutingTasks` queue beforehand. This leaves a short windows in which we could cancel the task to
|
||
// reschedule it before it actually starts executing.
|
||
// If this happens, we don't have to do anything in `execute` and can immediately return. `execute` will be called
|
||
// again when the task gets rescheduled.
|
||
cancelledToBeRescheduled = false
|
||
return .cancelledToBeRescheduled
|
||
}
|
||
precondition(executionTask == nil, "Task started twice")
|
||
let task = Task.detached(priority: self.priority) {
|
||
if !Task.isCancelled && !self.resultTaskCancelled.value {
|
||
await self.description.execute()
|
||
}
|
||
return await self.finalizeExecution()
|
||
}
|
||
_isExecuting.value = true
|
||
executionTask = task
|
||
executionTaskCreatedContinuation.yield(task)
|
||
if self.resultTaskCancelled.value {
|
||
// The queued task might have been cancelled after the execution ask was started but before the task was yielded
|
||
// to `executionTaskCreatedContinuation`. In that case the result task will simply cancel the await on the
|
||
// `executionTaskCreatedStream` and hence not call `valuePropagatingCancellation` on the execution task. This
|
||
// means that the queued task cancellation wouldn't be propagated to the execution task. To address this, check if
|
||
// `resultTaskCancelled` was set and, if so, explicitly cancel the execution task here.
|
||
task.cancel()
|
||
}
|
||
await executionStateChangedCallback?(self, .executing)
|
||
return await task.value
|
||
}
|
||
|
||
/// Implementation detail of `execute` that is called after `self.description.execute()` finishes.
|
||
private func finalizeExecution() async -> ExecutionTaskFinishStatus {
|
||
self.executionTask = nil
|
||
_isExecuting.value = false
|
||
if Task.isCancelled && self.cancelledToBeRescheduled {
|
||
await executionStateChangedCallback?(self, .cancelledToBeRescheduled)
|
||
self.cancelledToBeRescheduled = false
|
||
return ExecutionTaskFinishStatus.cancelledToBeRescheduled
|
||
} else {
|
||
await executionStateChangedCallback?(self, .finished)
|
||
return ExecutionTaskFinishStatus.terminated
|
||
}
|
||
}
|
||
|
||
/// Cancel the task to be rescheduled later.
|
||
///
|
||
/// If the task has not been started yet or has already finished execution, this is a no-op.
|
||
func cancelToBeRescheduled() {
|
||
self.cancelledToBeRescheduled = true
|
||
guard let executionTask else {
|
||
return
|
||
}
|
||
executionTask.cancel()
|
||
self.executionTask = nil
|
||
}
|
||
|
||
/// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning
|
||
/// a new task that depends on it. Otherwise a no-op.
|
||
nonisolated func elevatePriority(to targetPriority: TaskPriority) {
|
||
if priority < targetPriority {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.debug(
|
||
"Elevating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(targetPriority.rawValue)"
|
||
)
|
||
}
|
||
// Awaiting the result task from a higher-priority task will eventually update `priority` through
|
||
// `withTaskPriorityChangedHandler` but that might take a while because `withTaskPriorityChangedHandler` polls.
|
||
// Since we know that the priority will be elevated, set it now. That way we don't try to elevate it again.
|
||
self.priority = targetPriority
|
||
Task(priority: targetPriority) {
|
||
await self.resultTask.value
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Schedules an unordered list of tasks for execution.
|
||
///
|
||
/// The key features that `TaskScheduler` provides are:
|
||
/// - It allows the dynamic declaration of dependencies between tasks. A task can declare whether it can be executed
|
||
/// based on which other tasks are currently running. For example, this allows us to guarantee that only a single
|
||
/// preparation task is running at a time without enforcing any order in which the preparation tasks should run.
|
||
/// - It allows the maximum number of tasks to be limited at a given priority. This allows us to eg. only use half the
|
||
/// computer's cores for background indexing and using all cores if user interaction is depending on a set of files
|
||
/// being indexed without over-subscribing the CPU.
|
||
/// - It allows tasks to be canceled and rescheduled to make room for tasks that are faster to execute. For example,
|
||
/// this is used when we have a joint background index task for file `A`, `B` and `C` (which might be in the same
|
||
/// target) with low priority. We now request to index `A` with high priority separately because it's needed for user
|
||
/// interaction. This cancels the joint indexing of `A`, `B` and `C` so that `A` can be indexed as a standalone file
|
||
/// as quickly as possible. The joint indexing of `A`, `B` and `C` is then re-scheduled (again at low priority) and
|
||
/// will depend on `A` being indexed.
|
||
package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
|
||
/// The tasks that are currently being executed.
|
||
///
|
||
/// All tasks in this queue are guaranteed to trigger a call `poke` again once they finish. Thus, whenever there are
|
||
/// items left in this array, we are guaranteed to get another call to `poke`
|
||
private var currentlyExecutingTasks: [QueuedTask<TaskDescription>] = []
|
||
|
||
/// The queue of pending tasks that haven't been scheduled for execution yet.
|
||
private var pendingTasks: [QueuedTask<TaskDescription>] = []
|
||
|
||
/// Whether `shutDown` has been called on the `TaskScheduler` and it should thus schedule any new tasks.
|
||
private var isShutDown: Bool = false
|
||
|
||
/// An ordered list of task priorities to the number of tasks that might execute concurrently at that or a lower
|
||
/// priority. As the task priority is decreased, the number of current tasks becomes more restricted.
|
||
///
|
||
/// This list is normalized according to `normalize(maxConcurrentTasksByPriority:)`.
|
||
///
|
||
/// The highest priority entry in this list restricts the total number of tasks that can be executed at any priority
|
||
/// (including priorities higher than its entry).
|
||
///
|
||
/// For example if you have
|
||
/// ```swift
|
||
/// [
|
||
/// (.medium, 4),
|
||
/// (.low, 2)
|
||
/// ]
|
||
/// ```
|
||
///
|
||
/// Then we allow the following number of concurrent tasks at the following priorities
|
||
/// - `.high`: 4 (because `.medium: 4` restricts the total number of tasks to 4)
|
||
/// - `.medium`: 4
|
||
/// - `.low`: 2
|
||
/// - `.background`: 2
|
||
///
|
||
/// When combining tasks with different priorities:
|
||
/// - If we have 3 medium priority tasks, we can have at most 1 low priority task
|
||
/// - If we have 1 medium priority task, we can still have 2 low priority tasks, but no more
|
||
private var maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)] {
|
||
didSet {
|
||
maxConcurrentTasksByPriority = Self.normalize(maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
|
||
|
||
if maxConcurrentTasksByPriority.count == oldValue.count,
|
||
zip(maxConcurrentTasksByPriority, oldValue).allSatisfy(==)
|
||
{
|
||
// We didn't actually change anything, so we don't need to perform any validation or task processing.
|
||
return
|
||
}
|
||
|
||
// Check we are over-subscribed in currently executing tasks by walking through all currently executing tasks and
|
||
// checking if we could schedule them within the new execution limits. Cancel any tasks that do not fit within the
|
||
// new limit to be rescheduled when we are within the limit again.
|
||
var currentlyExecutingTaskDetails: [(priority: TaskPriority, estimatedCPUCoreCount: Int)] = []
|
||
var tasksToCancelAndReschedule: [QueuedTask<TaskDescription>] = []
|
||
for task in currentlyExecutingTasks.sorted(by: { $0.priority > $1.priority }) {
|
||
let taskPriority = task.priority
|
||
if Self.canScheduleTask(
|
||
withPriority: taskPriority,
|
||
maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
|
||
currentlyExecutingTaskDetails: currentlyExecutingTaskDetails
|
||
) {
|
||
currentlyExecutingTaskDetails.append((taskPriority, task.description.estimatedCPUCoreCount))
|
||
} else {
|
||
tasksToCancelAndReschedule.append(task)
|
||
}
|
||
}
|
||
|
||
// Poke the scheduler to schedule new jobs if new execution slots became available.
|
||
poke()
|
||
|
||
// Cancel any tasks that didn't fit into the new execution slots anymore. Do this on a separate task is fine
|
||
// because even if we extend the number of execution slots before the task gets executed (which is unlikely), we
|
||
// would cancel the tasks and then immediately reschedule it – while that's doing unnecessary work, it's still
|
||
// correct.
|
||
Task.detached(priority: .high) {
|
||
for tasksToReschedule in tasksToCancelAndReschedule {
|
||
await tasksToReschedule.cancelToBeRescheduled()
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Modify the number of tasks that are allowed to run concurrently at each priority level.
|
||
///
|
||
/// If there are more tasks executing currently that fit within the new execution limits, tasks will be cancelled and
|
||
/// rescheduled again when execution slots become available.
|
||
package func setMaxConcurrentTasksByPriority(_ newValue: [(priority: TaskPriority, maxConcurrentTasks: Int)]) {
|
||
self.maxConcurrentTasksByPriority = newValue
|
||
}
|
||
|
||
package init(maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)]) {
|
||
self.maxConcurrentTasksByPriority = Self.normalize(maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
|
||
}
|
||
|
||
/// Enqueue a new task to be executed.
|
||
///
|
||
/// - Important: A task that is scheduled by `TaskScheduler` must never be awaited from a task that runs on
|
||
/// `TaskScheduler`. Otherwise we might end up in deadlocks, eg. if the inner task cannot be scheduled because the
|
||
/// outer task is claiming all execution slots in the `TaskScheduler`.
|
||
@discardableResult
|
||
package func schedule(
|
||
priority: TaskPriority? = nil,
|
||
_ taskDescription: TaskDescription,
|
||
@_inheritActorContext executionStateChangedCallback: (
|
||
@Sendable (QueuedTask<TaskDescription>, TaskExecutionState) async -> Void
|
||
)? = nil
|
||
) async -> QueuedTask<TaskDescription> {
|
||
let queuedTask = await QueuedTask(
|
||
priority: priority ?? Task.currentPriority,
|
||
description: taskDescription,
|
||
taskPriorityChangedCallback: { [weak self] (newPriority) in
|
||
Task.detached(priority: newPriority) { [weak self] in
|
||
// If the task's priority got elevated, there might be an execution slot for it now. Poke the scheduler
|
||
// to run the task if possible.
|
||
await self?.poke()
|
||
}
|
||
},
|
||
executionStateChangedCallback: executionStateChangedCallback
|
||
)
|
||
pendingTasks.append(queuedTask)
|
||
Task.detached(priority: priority ?? Task.currentPriority) { [weak self] in
|
||
// Poke the `TaskScheduler` to execute a new task. If the `TaskScheduler` is already working at its capacity
|
||
// limit, this will not do anything. If there are execution slots available, this will start executing the freshly
|
||
// queued task.
|
||
await self?.poke()
|
||
}
|
||
return queuedTask
|
||
}
|
||
|
||
/// Cancel all in-progress tasks and wait for them to finish, either by honoring the cancellation or finishing their
|
||
/// work.
|
||
///
|
||
/// After `shutDown` has been called, no more tasks will be executed on this `TaskScheduler`.
|
||
package func shutDown() async {
|
||
self.isShutDown = true
|
||
await self.currentlyExecutingTasks.concurrentForEach { task in
|
||
task.cancel()
|
||
await task.waitToFinish()
|
||
}
|
||
}
|
||
|
||
private static func normalize(
|
||
maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)]
|
||
) -> [(priority: TaskPriority, maxConcurrentTasks: Int)] {
|
||
var maxConcurrentTasksByPriority = maxConcurrentTasksByPriority
|
||
|
||
// Ensure elements are sorted decreasingly by priority.
|
||
maxConcurrentTasksByPriority = maxConcurrentTasksByPriority.sorted(by: { $0.priority > $1.priority })
|
||
|
||
// Ensure array is not empty.
|
||
if maxConcurrentTasksByPriority.isEmpty {
|
||
logger.fault("Received empty maxConcurrentTasksByPriority. Allowing as many tasks as there are processor cores.")
|
||
maxConcurrentTasksByPriority = [(.medium, ProcessInfo.processInfo.processorCount)]
|
||
}
|
||
|
||
// Ensure `maxConcurrentTasks` is not increasing with lower priority tasks.
|
||
var lastMaxConcurrentTasks = maxConcurrentTasksByPriority.first!.maxConcurrentTasks
|
||
for i in 1..<maxConcurrentTasksByPriority.count {
|
||
if maxConcurrentTasksByPriority[i].maxConcurrentTasks > lastMaxConcurrentTasks {
|
||
logger.fault("More tasks allowed for lower priority than for higher priority")
|
||
maxConcurrentTasksByPriority[i].maxConcurrentTasks = lastMaxConcurrentTasks
|
||
} else {
|
||
lastMaxConcurrentTasks = maxConcurrentTasksByPriority[i].maxConcurrentTasks
|
||
}
|
||
}
|
||
|
||
return maxConcurrentTasksByPriority
|
||
}
|
||
|
||
/// Returns `true` if we can schedule a task with the given priority, assuming that the currently executing tasks have
|
||
/// the given priorities.
|
||
package static func canScheduleTask(
|
||
withPriority newTaskPriority: TaskPriority,
|
||
maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)],
|
||
currentlyExecutingTaskDetails: [(priority: TaskPriority, estimatedCPUCoreCount: Int)]
|
||
) -> Bool {
|
||
if currentlyExecutingTaskDetails.sum(of: \.estimatedCPUCoreCount)
|
||
>= maxConcurrentTasksByPriority.first!.maxConcurrentTasks
|
||
{
|
||
return false
|
||
}
|
||
for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
|
||
guard priority >= newTaskPriority else {
|
||
// This limit does not affect the new task
|
||
continue
|
||
}
|
||
if currentlyExecutingTaskDetails.filter({ $0.priority <= priority }).sum(of: \.estimatedCPUCoreCount)
|
||
>= maxConcurrentTasks
|
||
{
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
/// Poke the execution of more tasks in the queue.
|
||
///
|
||
/// This will continue calling itself until the queue is empty.
|
||
private func poke() {
|
||
if isShutDown {
|
||
return
|
||
}
|
||
pendingTasks.sort(by: { $0.priority > $1.priority })
|
||
for task in pendingTasks {
|
||
guard
|
||
Self.canScheduleTask(
|
||
withPriority: task.priority,
|
||
maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
|
||
currentlyExecutingTaskDetails: currentlyExecutingTasks.map({
|
||
($0.priority, $0.description.estimatedCPUCoreCount)
|
||
})
|
||
)
|
||
else {
|
||
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
|
||
// When the next task finishes, it calls `poke` again.
|
||
// If a low priority task's priority gets elevated that task's priority will get elevated, which will call
|
||
// `poke`.
|
||
return
|
||
}
|
||
let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description))
|
||
let waitForTasks = dependencies.compactMap { (taskDependency) -> QueuedTask<TaskDescription>? in
|
||
switch taskDependency {
|
||
case .cancelAndRescheduleDependency(let taskDescription):
|
||
guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
|
||
else {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.fault(
|
||
"Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks"
|
||
)
|
||
}
|
||
return nil
|
||
}
|
||
if !taskDescription.isIdempotent {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent")
|
||
}
|
||
return dependency
|
||
}
|
||
if dependency.priority > task.priority {
|
||
// Don't reschedule tasks that are more important than the new task we would like to schedule.
|
||
return dependency
|
||
}
|
||
return nil
|
||
case .waitAndElevatePriorityOfDependency(let taskDescription):
|
||
guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
|
||
else {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.fault(
|
||
"Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks"
|
||
)
|
||
}
|
||
return nil
|
||
}
|
||
return dependency
|
||
}
|
||
}
|
||
if !waitForTasks.isEmpty {
|
||
// This task is blocked by a task that's currently executing. Elevate the priorities of those tasks and continue
|
||
// looking in the queue if there is another task we can execute.
|
||
for waitForTask in waitForTasks {
|
||
waitForTask.elevatePriority(to: task.priority)
|
||
}
|
||
continue
|
||
}
|
||
let rescheduleTasks = dependencies.compactMap { (taskDependency) -> QueuedTask<TaskDescription>? in
|
||
switch taskDependency {
|
||
case .cancelAndRescheduleDependency(let taskDescription):
|
||
guard let task = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.fault(
|
||
"Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks"
|
||
)
|
||
}
|
||
return nil
|
||
}
|
||
return task
|
||
default:
|
||
return nil
|
||
}
|
||
}
|
||
if !rescheduleTasks.isEmpty {
|
||
Task.detached(priority: task.priority) {
|
||
for task in rescheduleTasks {
|
||
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
|
||
logger.debug("Suspending \(task.description.forLogging)")
|
||
}
|
||
await task.cancelToBeRescheduled()
|
||
}
|
||
}
|
||
// Don't go looking for other tasks to execute in this poker because we should be waiting for the rescheduled
|
||
// tasks to finish (which will call `poke` again), and then actually schedule `task`.
|
||
// If we did enqueue another task from the pending queue, that new task might introduce a new dependency `task`,
|
||
// which could delay its execution and render the suspension of previous tasks useless.
|
||
return
|
||
}
|
||
|
||
currentlyExecutingTasks.append(task)
|
||
pendingTasks.removeAll(where: { $0 === task })
|
||
Task.detached(priority: task.priority) {
|
||
// Await the task's return in a task so that this poker can continue checking if there are more execution
|
||
// slots that can be filled with queued tasks.
|
||
let finishStatus = await task.execute()
|
||
await self.finalizeTaskExecution(task: task, finishStatus: finishStatus)
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Implementation detail of `poke` to be called after `task.execute()` to ensure that `task.execute()` executes in
|
||
/// a different isolation domain then `TaskScheduler`.
|
||
private func finalizeTaskExecution(
|
||
task: QueuedTask<TaskDescription>,
|
||
finishStatus: QueuedTask<TaskDescription>.ExecutionTaskFinishStatus
|
||
) async {
|
||
currentlyExecutingTasks.removeAll(where: { $0.description.id == task.description.id })
|
||
switch finishStatus {
|
||
case .terminated: break
|
||
case .cancelledToBeRescheduled: pendingTasks.append(task)
|
||
}
|
||
self.poke()
|
||
}
|
||
}
|
||
|
||
// MARK: - Collection utilities
|
||
|
||
fileprivate extension Collection where Element: Comparable {
|
||
func isSorted(descending: Bool) -> Bool {
|
||
var previous = self.first
|
||
for element in self {
|
||
if (previous! < element) == descending {
|
||
return false
|
||
}
|
||
previous = element
|
||
}
|
||
return true
|
||
}
|
||
}
|
||
|
||
fileprivate extension Collection {
|
||
func sum(of transform: (Self.Element) -> Int) -> Int {
|
||
var result = 0
|
||
for element in self {
|
||
result += transform(element)
|
||
}
|
||
return result
|
||
}
|
||
}
|
||
|
||
/// Version of the `withTaskPriorityChangedHandler` where the body doesn't throw.
|
||
private func withTaskPriorityChangedHandler(
|
||
initialPriority: TaskPriority = Task.currentPriority,
|
||
pollingInterval: Duration = .seconds(0.1),
|
||
@_inheritActorContext operation: @escaping @Sendable () async -> Void,
|
||
taskPriorityChanged: @escaping @Sendable () -> Void
|
||
) async {
|
||
do {
|
||
try await withTaskPriorityChangedHandler(
|
||
initialPriority: initialPriority,
|
||
pollingInterval: pollingInterval,
|
||
operation: operation as @Sendable () async throws -> Void,
|
||
taskPriorityChanged: taskPriorityChanged
|
||
)
|
||
} catch is CancellationError {
|
||
} catch {
|
||
// Since `operation` does not throw, the only error we expect `withTaskPriorityChangedHandler` to throw is a
|
||
// `CancellationError`, in which case we can just return.
|
||
logger.fault("Unexpected error thrown from withTaskPriorityChangedHandler: \(error.forLogging)")
|
||
}
|
||
}
|