mirror of
https://github.com/apple/swift.git
synced 2025-12-14 20:36:38 +01:00
[Concurrency] Set thread base priority when running escalated Tasks (#84895)
This commit is contained in:
@@ -29,6 +29,13 @@
|
||||
#include "bitset"
|
||||
#include "queue" // TODO: remove and replace with our own mpsc
|
||||
|
||||
// Does the runtime integrate with libdispatch?
|
||||
#if defined(SWIFT_CONCURRENCY_USES_DISPATCH)
|
||||
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH
|
||||
#else
|
||||
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
|
||||
#endif
|
||||
|
||||
// Does the runtime provide priority escalation support?
|
||||
#ifndef SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
|
||||
#if SWIFT_CONCURRENCY_ENABLE_DISPATCH && \
|
||||
@@ -422,7 +429,22 @@ public:
|
||||
///
|
||||
/// Generally this should be done immediately after updating
|
||||
/// ActiveTask.
|
||||
void flagAsRunning();
|
||||
///
|
||||
/// When Dispatch is used for the default executor:
|
||||
/// * If the return value is non-zero, it must be passed
|
||||
/// to swift_dispatch_thread_reset_override_self
|
||||
/// before returning to the executor.
|
||||
/// * If the return value is zero, it may be ignored or passed to
|
||||
/// the aforementioned function (which will ignore values of zero).
|
||||
/// The current implementation will always return zero
|
||||
/// if you call flagAsRunning again before calling
|
||||
/// swift_dispatch_thread_reset_override_self with the
|
||||
/// initial value. This supports suspending and immediately
|
||||
/// resuming a Task without returning up the callstack.
|
||||
///
|
||||
/// For all other default executors, flagAsRunning
|
||||
/// will return zero which may be ignored.
|
||||
uint32_t flagAsRunning();
|
||||
|
||||
/// Flag that this task is now suspended with information about what it is
|
||||
/// waiting on.
|
||||
|
||||
@@ -38,13 +38,6 @@
|
||||
#define SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL 0
|
||||
#endif
|
||||
|
||||
// Does the runtime integrate with libdispatch?
|
||||
#if defined(SWIFT_CONCURRENCY_USES_DISPATCH)
|
||||
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH
|
||||
#else
|
||||
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
|
||||
#endif
|
||||
|
||||
namespace swift {
|
||||
class DefaultActor;
|
||||
class TaskOptionRecord;
|
||||
|
||||
@@ -48,6 +48,27 @@ swift_dispatch_thread_override_self(qos_class_t override_qos) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline uint32_t
|
||||
swift_dispatch_thread_override_self_with_base(qos_class_t override_qos, qos_class_t base_qos) {
|
||||
|
||||
if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) {
|
||||
return dispatch_thread_override_self_with_base(override_qos, base_qos);
|
||||
} else if (__builtin_available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)) {
|
||||
// If we don't have the ability to set our base qos correctly, at least set the override
|
||||
// We want to return 0 here because we have nothing to reset in this case
|
||||
(void) dispatch_thread_override_self(override_qos);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline void
|
||||
swift_dispatch_thread_reset_override_self(uint32_t opaque) {
|
||||
if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) {
|
||||
dispatch_thread_reset_override_self(opaque);
|
||||
}
|
||||
}
|
||||
|
||||
static inline int
|
||||
swift_dispatch_lock_override_start_with_debounce(dispatch_lock_t *lock_addr,
|
||||
dispatch_tid_t expected_thread, qos_class_t override_to_apply) {
|
||||
|
||||
@@ -237,12 +237,17 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
|
||||
// current thread. If the task suspends somewhere, it should
|
||||
// update the task status appropriately; we don't need to update
|
||||
// it afterwards.
|
||||
task->flagAsRunning();
|
||||
[[maybe_unused]]
|
||||
uint32_t dispatchOpaquePriority = task->flagAsRunning();
|
||||
|
||||
auto traceHandle = concurrency::trace::job_run_begin(job);
|
||||
task->runInFullyEstablishedContext();
|
||||
concurrency::trace::job_run_end(traceHandle);
|
||||
|
||||
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
|
||||
swift_dispatch_thread_reset_override_self(dispatchOpaquePriority);
|
||||
#endif
|
||||
|
||||
assert(ActiveTask::get() == nullptr &&
|
||||
"active task wasn't cleared before suspending?");
|
||||
if (oldTask) ActiveTask::set(oldTask);
|
||||
|
||||
@@ -124,7 +124,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
|
||||
|
||||
// NOTE: this acquire synchronizes with `completeFuture`.
|
||||
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
|
||||
bool contextInitialized = false;
|
||||
bool suspendedWaiter = false;
|
||||
while (true) {
|
||||
switch (queueHead.getStatus()) {
|
||||
case Status::Error:
|
||||
@@ -132,7 +132,14 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
|
||||
SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, completed immediately",
|
||||
waitingTask, this);
|
||||
_swift_tsan_acquire(static_cast<Job *>(this));
|
||||
if (contextInitialized) waitingTask->flagAsRunning();
|
||||
if (suspendedWaiter) {
|
||||
// This will always return zero because we were just
|
||||
// running this Task so its BasePriority (which is
|
||||
// immutable) should've already been set on the thread.
|
||||
[[maybe_unused]]
|
||||
uint32_t opaque = waitingTask->flagAsRunning();
|
||||
assert(opaque == 0);
|
||||
}
|
||||
// The task is done; we don't need to wait.
|
||||
return queueHead.getStatus();
|
||||
|
||||
@@ -146,8 +153,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
|
||||
break;
|
||||
}
|
||||
|
||||
if (!contextInitialized) {
|
||||
contextInitialized = true;
|
||||
if (!suspendedWaiter) {
|
||||
suspendedWaiter = true;
|
||||
auto context =
|
||||
reinterpret_cast<TaskFutureWaitAsyncContext *>(waitingTaskContext);
|
||||
context->errorResult = nullptr;
|
||||
@@ -1659,8 +1666,11 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
|
||||
// we try to tail-call.
|
||||
} while (false);
|
||||
#else
|
||||
// Restore the running state of the task and resume it.
|
||||
task->flagAsRunning();
|
||||
// This will always return zero because we were just running this Task so its
|
||||
// BasePriority (which is immutable) should've already been set on the thread.
|
||||
[[maybe_unused]]
|
||||
uint32_t opaque = task->flagAsRunning();
|
||||
assert(opaque == 0);
|
||||
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
||||
|
||||
if (context->isExecutorSwitchForced())
|
||||
|
||||
@@ -1822,7 +1822,12 @@ reevaluate_if_taskgroup_has_results:;
|
||||
// We're going back to running the task, so if we suspended before,
|
||||
// we need to flag it as running again.
|
||||
if (hasSuspended) {
|
||||
waitingTask->flagAsRunning();
|
||||
// This will always return zero because we were just
|
||||
// running this Task so its BasePriority (which is
|
||||
// immutable) should've already been set on the thread.
|
||||
[[maybe_unused]]
|
||||
uint32_t opaque = waitingTask->flagAsRunning();
|
||||
assert(opaque == 0);
|
||||
}
|
||||
|
||||
// Success! We are allowed to poll.
|
||||
|
||||
@@ -970,32 +970,40 @@ inline bool AsyncTask::isCancelled() const {
|
||||
.isCancelled();
|
||||
}
|
||||
|
||||
inline void AsyncTask::flagAsRunning() {
|
||||
inline uint32_t AsyncTask::flagAsRunning() {
|
||||
|
||||
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
|
||||
dispatch_thread_override_info_s threadOverrideInfo;
|
||||
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
|
||||
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;
|
||||
qos_class_t basePriorityCeil = overrideFloor;
|
||||
qos_class_t taskBasePriority = (qos_class_t) _private().BasePriority;
|
||||
#endif
|
||||
|
||||
auto oldStatus = _private()._status().load(std::memory_order_relaxed);
|
||||
assert(!oldStatus.isRunning());
|
||||
assert(!oldStatus.isComplete());
|
||||
|
||||
uint32_t dispatchOpaquePriority = 0;
|
||||
if (!oldStatus.hasTaskDependency()) {
|
||||
SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this);
|
||||
assert(_private().dependencyRecord == nullptr);
|
||||
|
||||
while (true) {
|
||||
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
|
||||
// Task's priority is greater than the thread's - do a self escalation
|
||||
// If the base priority is not equal to the current override floor then
|
||||
// dispqatch may need to apply the base priority to the thread. If the
|
||||
// current priority is higher than the override floor, then dispatch may
|
||||
// need to apply a self-override. In either case, call into dispatch to
|
||||
// do this.
|
||||
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
|
||||
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
|
||||
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
|
||||
overrideFloor, this, maxTaskPriority);
|
||||
if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) {
|
||||
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x",
|
||||
overrideFloor, this, maxTaskPriority, taskBasePriority);
|
||||
|
||||
(void) swift_dispatch_thread_override_self(maxTaskPriority);
|
||||
dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority);
|
||||
overrideFloor = maxTaskPriority;
|
||||
basePriorityCeil = taskBasePriority;
|
||||
}
|
||||
#endif
|
||||
// Set self as executor and remove escalation bit if any - the task's
|
||||
@@ -1024,14 +1032,19 @@ inline void AsyncTask::flagAsRunning() {
|
||||
ActiveTaskStatus& newStatus) {
|
||||
|
||||
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
|
||||
// Task's priority is greater than the thread's - do a self escalation
|
||||
// If the base priority is not equal to the current override floor then
|
||||
// dispqatch may need to apply the base priority to the thread. If the
|
||||
// current priority is higher than the override floor, then dispatch may
|
||||
// need to apply a self-override. In either case, call into dispatch to
|
||||
// do this.
|
||||
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
|
||||
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
|
||||
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
|
||||
overrideFloor, this, maxTaskPriority);
|
||||
if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) {
|
||||
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x",
|
||||
overrideFloor, this, maxTaskPriority, taskBasePriority);
|
||||
|
||||
(void) swift_dispatch_thread_override_self(maxTaskPriority);
|
||||
dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority);
|
||||
overrideFloor = maxTaskPriority;
|
||||
basePriorityCeil = taskBasePriority;
|
||||
}
|
||||
#endif
|
||||
// Set self as executor and remove escalation bit if any - the task's
|
||||
@@ -1047,7 +1060,7 @@ inline void AsyncTask::flagAsRunning() {
|
||||
swift_task_enterThreadLocalContext(
|
||||
(char *)&_private().ExclusivityAccessSet[0]);
|
||||
}
|
||||
|
||||
return dispatchOpaquePriority;
|
||||
}
|
||||
|
||||
/// TODO (rokhinip): We need the handoff of the thread to the next executor to
|
||||
|
||||
@@ -322,6 +322,43 @@ actor Test {
|
||||
await task2.value // Escalate task2 which should be queued behind task1 on the actor
|
||||
}
|
||||
|
||||
// This test will only work properly on 27.0+
|
||||
if #available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *) {
|
||||
tests.test("Task escalation doesn't impact qos_class_self") {
|
||||
let task = Task(priority: .utility) {
|
||||
let initialQos = DispatchQoS(
|
||||
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
|
||||
relativePriority: 0)
|
||||
expectEqual(initialQos, DispatchQoS.utility)
|
||||
let childTask = Task {
|
||||
let qosBeforeEscalate = DispatchQoS(
|
||||
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
|
||||
relativePriority: 0)
|
||||
// Unstructured task should inherit utility priority
|
||||
expectEqual(qosBeforeEscalate, DispatchQoS.utility)
|
||||
// Escalate priority override, not base QoS
|
||||
withUnsafeCurrentTask {
|
||||
$0!.escalatePriority(to: .userInitiated)
|
||||
}
|
||||
let qosAfterEscalate = DispatchQoS(
|
||||
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
|
||||
relativePriority: 0)
|
||||
// qos_class_self should remain utility after escalation
|
||||
expectEqual(qosAfterEscalate, DispatchQoS.utility)
|
||||
await Task.yield()
|
||||
let qosAfterYield = DispatchQoS(
|
||||
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
|
||||
relativePriority: 0)
|
||||
// qos_class_self should remain utility after yield
|
||||
expectEqual(qosAfterYield, DispatchQoS.utility)
|
||||
}
|
||||
|
||||
await childTask.value
|
||||
}
|
||||
|
||||
await task.value
|
||||
}
|
||||
}
|
||||
}
|
||||
await runAllTestsAsync()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user