Merge pull request #40718 from apple/rokhinip/86100376-change-task-propagation-rules

Change task priority propagation rules
This commit is contained in:
Rokhini Prabhu
2022-01-25 12:13:11 -08:00
committed by GitHub
10 changed files with 322 additions and 66 deletions

View File

@@ -2014,12 +2014,12 @@ enum class JobKind : size_t {
enum class JobPriority : size_t {
// This is modelled off of Dispatch.QoS, and the values are directly
// stolen from there.
UserInteractive = 0x21,
UserInitiated = 0x19,
Default = 0x15,
Utility = 0x11,
Background = 0x09,
Unspecified = 0x00,
UserInteractive = 0x21, /* UI */
UserInitiated = 0x19, /* IN */
Default = 0x15, /* DEF */
Utility = 0x11, /* UT */
Background = 0x09, /* BG */
Unspecified = 0x00, /* UN */
};
/// A tri-valued comparator which orders higher priorities first.
@@ -2028,12 +2028,18 @@ inline int descendingPriorityOrder(JobPriority lhs,
return (lhs == rhs ? 0 : lhs > rhs ? -1 : 1);
}
inline JobPriority withUserInteractivePriorityDowngrade(JobPriority priority) {
return (priority == JobPriority::UserInteractive) ? JobPriority::UserInitiated
: priority;
}
/// Flags for task creation.
class TaskCreateFlags : public FlagSet<size_t> {
public:
enum {
Priority = 0,
Priority_width = 8,
// Priority that user specified while creating the task
RequestedPriority = 0,
RequestedPriority_width = 8,
Task_IsChildTask = 8,
// bit 9 is unused
@@ -2046,8 +2052,9 @@ public:
explicit constexpr TaskCreateFlags(size_t bits) : FlagSet(bits) {}
constexpr TaskCreateFlags() {}
FLAGSET_DEFINE_FIELD_ACCESSORS(Priority, Priority_width, JobPriority,
getPriority, setPriority)
FLAGSET_DEFINE_FIELD_ACCESSORS(RequestedPriority, RequestedPriority_width,
JobPriority, getRequestedPriority,
setRequestedPriority)
FLAGSET_DEFINE_FLAG_ACCESSORS(Task_IsChildTask,
isChildTask,
setIsChildTask)

View File

@@ -222,9 +222,9 @@ public:
void *Storage[14];
/// Initialize this storage during the creation of a task.
void initialize(AsyncTask *task);
void initializeWithSlab(AsyncTask *task,
void *slab, size_t slabCapacity);
void initialize(JobPriority basePri);
void initializeWithSlab(JobPriority basePri, void *slab,
size_t slabCapacity);
/// React to the completion of the enclosing task's execution.
void complete(AsyncTask *task);

View File

@@ -481,6 +481,19 @@ size_t swift_task_getJobFlags(AsyncTask* task);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_isCancelled(AsyncTask* task);
/// Returns the current priority of the task which is >= base priority of the
/// task. This function does not exist in the base ABI of this library and must
/// be deployment limited
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
JobPriority
swift_task_currentPriority(AsyncTask *task);
/// Returns the base priority of the task. This function does not exist in the
/// base ABI of this library and must be deployment limited.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
JobPriority
swift_task_basePriority(AsyncTask *task);
/// Create and add an cancellation record to the task.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
CancellationNotificationStatusRecord*

View File

@@ -413,7 +413,7 @@ static void completeTaskWithClosure(SWIFT_ASYNC_CONTEXT AsyncContext *context,
reinterpret_cast<char *>(context) - sizeof(AsyncContextPrefix));
swift_release((HeapObject *)asyncContextPrefix->closureContext);
// Clean up the rest of the task.
return completeTaskAndRelease(context, error);
}
@@ -470,6 +470,32 @@ const void *AsyncTask::getResumeFunctionForLogging() {
return reinterpret_cast<const void *>(ResumeTask);
}
JobPriority swift::swift_task_currentPriority(AsyncTask *task)
{
// This is racey but this is to be used in an API is inherently racey anyways.
auto oldStatus = task->_private().Status.load(std::memory_order_relaxed);
return oldStatus.getStoredPriority();
}
JobPriority swift::swift_task_basePriority(AsyncTask *task)
{
JobPriority pri = task->_private().BasePriority;
SWIFT_TASK_DEBUG_LOG("Task %p has base priority = %zu", task, pri);
return pri;
}
static inline bool isUnspecified(JobPriority priority) {
return priority == JobPriority::Unspecified;
}
static inline bool taskIsUnstructured(JobFlags jobFlags) {
return !jobFlags.task_isAsyncLetTask() && !jobFlags.task_isGroupChildTask();
}
static inline bool taskIsDetached(TaskCreateFlags createFlags, JobFlags jobFlags) {
return taskIsUnstructured(jobFlags) && !createFlags.copyTaskLocals();
}
/// Implementation of task creation.
SWIFT_CC(swift)
static AsyncTaskAndContext swift_task_create_commonImpl(
@@ -478,10 +504,11 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
const Metadata *futureResultType,
TaskContinuationFunction *function, void *closureContext,
size_t initialContextSize) {
TaskCreateFlags taskCreateFlags(rawTaskCreateFlags);
JobFlags jobFlags(JobKind::Task, JobPriority::Unspecified);
// Propagate task-creation flags to job flags as appropriate.
JobFlags jobFlags(JobKind::Task, taskCreateFlags.getPriority());
jobFlags.task_setIsChildTask(taskCreateFlags.isChildTask());
if (futureResultType) {
jobFlags.task_setIsFuture(true);
@@ -520,7 +547,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
// of in a FutureFragment.
hasAsyncLetResultBuffer = true;
assert(asyncLet && "Missing async let storage");
jobFlags.task_setIsAsyncLetTask(true);
jobFlags.task_setIsChildTask(true);
break;
@@ -534,32 +561,85 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
}
AsyncTask *parent = nullptr;
AsyncTask *currentTask = swift_task_getCurrent();
if (jobFlags.task_isChildTask()) {
parent = swift_task_getCurrent();
parent = currentTask;
assert(parent != nullptr && "creating a child task with no active task");
}
// Inherit the priority of the currently-executing task if unspecified and
// we want to inherit.
if (jobFlags.getPriority() == JobPriority::Unspecified &&
(jobFlags.task_isChildTask() || taskCreateFlags.inheritContext())) {
AsyncTask *currentTask = parent;
if (!currentTask)
currentTask = swift_task_getCurrent();
// Start with user specified priority at creation time (if any)
JobPriority basePriority = (taskCreateFlags.getRequestedPriority());
if (currentTask)
jobFlags.setPriority(currentTask->getPriority());
else if (taskCreateFlags.inheritContext())
jobFlags.setPriority(swift_task_getCurrentThreadPriority());
if (taskIsDetached(taskCreateFlags, jobFlags)) {
SWIFT_TASK_DEBUG_LOG("Creating a detached task from %p", currentTask);
// Case 1: No priority specified
// Base priority = UN
// Escalated priority = UN
// Case 2: Priority specified
// Base priority = user specified priority
// Escalated priority = UN
//
// Task will be created with max priority = max(base priority, UN) = base
// priority. We shouldn't need to do any additional manipulations here since
// basePriority should already be the right value
} else if (taskIsUnstructured(jobFlags)) {
SWIFT_TASK_DEBUG_LOG("Creating an unstructured task from %p", currentTask);
if (isUnspecified(basePriority)) {
// Case 1: No priority specified
// Base priority = Base priority of parent with a UI -> IN downgrade
// Escalated priority = UN
if (currentTask) {
basePriority = currentTask->_private().BasePriority;
} else {
basePriority = swift_task_getCurrentThreadPriority();
}
basePriority = withUserInteractivePriorityDowngrade(basePriority);
} else {
// Case 2: User specified a priority
// Base priority = user specified priority
// Escalated priority = UN
}
// Task will be created with max priority = max(base priority, UN) = base
// priority
} else {
// Is a structured concurrency child task. Must have a parent.
assert((asyncLet || group) && parent);
SWIFT_TASK_DEBUG_LOG("Creating an structured concurrency task from %p", currentTask);
if (isUnspecified(basePriority)) {
// Case 1: No priority specified
// Base priority = Base priority of parent with a UI -> IN downgrade
// Escalated priority = Escalated priority of parent with a UI -> IN
// downgrade
JobPriority parentBasePri = parent->_private().BasePriority;
basePriority = withUserInteractivePriorityDowngrade(parentBasePri);
} else {
// Case 2: User priority specified
// Base priority = User specified priority
// Escalated priority = Escalated priority of parent with a UI -> IN
// downgrade
}
// Task will be created with escalated priority = base priority. We will
// update the escalated priority with the right rules in
// updateNewChildWithParentAndGroupState when we link the child into
// the parent task/task group since we'll have the right
// synchronization then.
}
// Adjust user-interactive priorities down to user-initiated.
if (jobFlags.getPriority() == JobPriority::UserInteractive)
jobFlags.setPriority(JobPriority::UserInitiated);
if (isUnspecified(basePriority)) {
basePriority = JobPriority::Default;
}
// If there is still no job priority, use the default priority.
if (jobFlags.getPriority() == JobPriority::Unspecified)
jobFlags.setPriority(JobPriority::Default);
SWIFT_TASK_DEBUG_LOG("Task's base priority = %d", basePriority);
// TODO (rokhinip): Figure out the semantics of the job priority and where
// it ought to be set conclusively - seems like it ought to be at enqueue
// time. For now, maintain current semantics of setting jobPriority as well.
jobFlags.setPriority(basePriority);
// Figure out the size of the header.
size_t headerSize = sizeof(AsyncTask);
@@ -591,14 +671,14 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
void *allocation = nullptr;
if (asyncLet) {
assert(parent);
// If there isn't enough room in the fixed async let allocation to
// set up the initial context, then we'll have to allocate more space
// from the parent.
if (asyncLet->getSizeOfPreallocatedSpace() < amountToAllocate) {
hasAsyncLetResultBuffer = false;
}
// DEPRECATED. This is separated from the above condition because we
// also have to handle an older async let ABI that did not provide
// space for the initial slab in the compiler-generated preallocation.
@@ -653,7 +733,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
// Initialize the task so that resuming it will run the given
// function on the initial context.
AsyncTask *task = nullptr;
bool captureCurrentVoucher = taskCreateFlags.copyTaskLocals() || jobFlags.task_isChildTask();
bool captureCurrentVoucher = !taskIsDetached(taskCreateFlags, jobFlags);
if (asyncLet) {
// Initialize the refcount bits to "immortal", so that
// ARC operations don't have any effect on the task.
@@ -678,7 +758,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
auto groupChildFragment = task->groupChildFragment();
new (groupChildFragment) AsyncTask::GroupChildFragment(group);
}
// Initialize the future fragment if applicable.
if (futureResultType) {
assert(task->isFuture());
@@ -694,7 +774,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
futureAsyncContextPrefix->indirectResult = futureFragment->getStoragePtr();
}
SWIFT_TASK_DEBUG_LOG("creating task %p with parent %p", task, parent);
SWIFT_TASK_DEBUG_LOG("creating task %p with parent %p at base pri %zu", task, parent, basePriority);
// Initialize the task-local allocator.
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
@@ -704,9 +784,10 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
if (asyncLet && initialSlabSize > 0) {
assert(parent);
void *initialSlab = (char*)allocation + amountToAllocate;
task->Private.initializeWithSlab(task, initialSlab, initialSlabSize);
task->Private.initializeWithSlab(basePriority, initialSlab,
initialSlabSize);
} else {
task->Private.initialize(task);
task->Private.initialize(basePriority);
}
// Perform additional linking between parent and child task.
@@ -940,7 +1021,7 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
// must happen-after this call.
context->AwaitSynchronization.store(flags.isPreawaited()
? ContinuationStatus::Awaited
: ContinuationStatus::Pending,
: ContinuationStatus::Pending,
std::memory_order_relaxed);
AsyncTask *task;

View File

@@ -289,14 +289,28 @@ extension Task where Success == Never, Failure == Never {
public static var currentPriority: TaskPriority {
withUnsafeCurrentTask { task in
// If we are running on behalf of a task, use that task's priority.
if let task = task {
return task.priority
if let unsafeTask = task {
return TaskPriority(rawValue: _taskCurrentPriority(unsafeTask._task))
}
// Otherwise, query the system.
return TaskPriority(rawValue: UInt8(_getCurrentThreadPriority()))
}
}
/// The current task's base priority.
///
/// If you access this property outside of any task, this returns nil
@available(SwiftStdlib 9999, *)
public static var basePriority: TaskPriority? {
withUnsafeCurrentTask { task in
// If we are running on behalf of a task, use that task's priority.
if let unsafeTask = task {
return TaskPriority(rawValue: _taskBasePriority(unsafeTask._task))
}
return nil
}
}
}
@available(SwiftStdlib 5.1, *)
@@ -752,8 +766,7 @@ public struct UnsafeCurrentTask {
/// - SeeAlso: `TaskPriority`
/// - SeeAlso: `Task.currentPriority`
public var priority: TaskPriority {
getJobFlags(_task).priority ?? TaskPriority(
rawValue: UInt8(_getCurrentThreadPriority()))
TaskPriority(rawValue: _taskCurrentPriority(_task))
}
/// Cancel the current task.
@@ -850,6 +863,12 @@ func _taskCancel(_ task: Builtin.NativeObject)
@usableFromInline
func _taskIsCancelled(_ task: Builtin.NativeObject) -> Bool
@_silgen_name("swift_task_currentPriority")
internal func _taskCurrentPriority(_ task: Builtin.NativeObject) -> UInt8
@_silgen_name("swift_task_basePriority")
internal func _taskBasePriority(_ task: Builtin.NativeObject) -> UInt8
@available(SwiftStdlib 5.1, *)
@_silgen_name("swift_task_createNullaryContinuationJob")
func _taskCreateNullaryContinuationJob(priority: Int, continuation: Builtin.RawUnsafeContinuation) -> Builtin.Job

View File

@@ -488,7 +488,7 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
// ==== add / attachChild ------------------------------------------------------
void TaskGroup::addChildTask(AsyncTask *child) {
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p", child, group);
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p", child, this);
// The counterpart of this (detachChild) is performed by the group itself,
// when it offers the completed (child) task's value to a waiting task -

View File

@@ -169,7 +169,7 @@ public:
/// The current state of a task's status records.
class alignas(sizeof(void*) * 2) ActiveTaskStatus {
enum : uintptr_t {
/// The current running priority of the task.
/// The max priority of the task. This is always >= basePriority in the task
PriorityMask = 0xFF,
/// Has the task been cancelled?
@@ -206,8 +206,8 @@ public:
ActiveTaskStatus() = default;
#endif
constexpr ActiveTaskStatus(JobFlags flags)
: Record(nullptr), Flags(uintptr_t(flags.getPriority())) {}
constexpr ActiveTaskStatus(JobPriority priority)
: Record(nullptr), Flags(uintptr_t(priority)) {}
/// Is the task currently cancelled?
bool isCancelled() const { return Flags & IsCancelled; }
@@ -317,12 +317,23 @@ struct AsyncTask::PrivateStorage {
/// The top 32 bits of the task ID. The bottom 32 bits are in Job::Id.
uint32_t Id;
PrivateStorage(JobFlags flags)
: Status(ActiveTaskStatus(flags)), Local(TaskLocal::Storage()) {}
/// Base priority of Task - set only at creation time of task.
/// Current max priority of task is ActiveTaskStatus.
///
/// TODO (rokhinip): Only 8 bits of the full size_t are used. Change this into
/// flagset thing so that remaining bits are available for other non-changing
/// task status stuff
JobPriority BasePriority;
PrivateStorage(JobFlags flags, void *slab, size_t slabCapacity)
: Status(ActiveTaskStatus(flags)), Allocator(slab, slabCapacity),
Local(TaskLocal::Storage()) {}
// Always create an async task with max priority in ActiveTaskStatus = base
// priority. It will be updated later if needed.
PrivateStorage(JobPriority basePri)
: Status(ActiveTaskStatus(basePri)), Local(TaskLocal::Storage()),
BasePriority(basePri) {}
PrivateStorage(JobPriority basePri, void *slab, size_t slabCapacity)
: Status(ActiveTaskStatus(basePri)), Allocator(slab, slabCapacity),
Local(TaskLocal::Storage()), BasePriority(basePri) {}
void complete(AsyncTask *task) {
// Destroy and deallocate any remaining task local items.
@@ -347,14 +358,12 @@ inline const AsyncTask::PrivateStorage &
AsyncTask::OpaquePrivateStorage::get() const {
return reinterpret_cast<const PrivateStorage &>(*this);
}
inline void AsyncTask::OpaquePrivateStorage::initialize(AsyncTask *task) {
new (this) PrivateStorage(task->Flags);
inline void AsyncTask::OpaquePrivateStorage::initialize(JobPriority basePri) {
new (this) PrivateStorage(basePri);
}
inline void
AsyncTask::OpaquePrivateStorage::initializeWithSlab(AsyncTask *task,
void *slab,
size_t slabCapacity) {
new (this) PrivateStorage(task->Flags, slab, slabCapacity);
inline void AsyncTask::OpaquePrivateStorage::initializeWithSlab(
JobPriority basePri, void *slab, size_t slabCapacity) {
new (this) PrivateStorage(basePri, slab, slabCapacity);
}
inline void AsyncTask::OpaquePrivateStorage::complete(AsyncTask *task) {
get().complete(task);

View File

@@ -411,7 +411,8 @@ void swift::updateNewChildWithParentAndGroupState(AsyncTask *child,
// Propagate max priority of parent to child task's active status and the Job
// header
JobPriority pri = parentStatus.getStoredPriority();
newChildTaskStatus = newChildTaskStatus.withNewPriority(pri);
newChildTaskStatus =
newChildTaskStatus.withNewPriority(withUserInteractivePriorityDowngrade(pri));
child->Flags.setPriority(pri);
child->_private().Status.store(newChildTaskStatus, std::memory_order_relaxed);

View File

@@ -0,0 +1,126 @@
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking %import-libdispatch -parse-as-library)
// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: libdispatch
// rdar://76038845
// REQUIRES: concurrency_runtime
// UNSUPPORTED: back_deployment_runtime
import StdlibUnittest
import Darwin
import Dispatch
func loopUntil(priority: TaskPriority) async {
while (Task.currentPriority != priority) {
await Task.sleep(1_000_000_000)
}
}
func print(_ s: String = "") {
fputs("\(s)\n", stderr)
}
func expectedBasePri(priority: TaskPriority) -> TaskPriority {
let basePri = Task.basePriority!
print("Testing basePri matching expected pri - \(basePri) == \(priority)")
expectEqual(basePri, priority)
return basePri
}
func expectedCurrentPri(priority: TaskPriority) -> TaskPriority {
let curPri = Task.currentPriority
print("Testing curPri matching expected pri - \(curPri) == \(priority)")
expectEqual(curPri, priority)
return curPri
}
func testNestedTaskPriority(basePri: TaskPriority, curPri: TaskPriority) async {
let _ = expectedBasePri(priority: basePri)
let _ = expectedCurrentPri(priority: curPri)
}
@main struct Main {
static func main() async {
let top_level = detach { /* To detach from main actor when running work */
let tests = TestSuite("Task base priority")
if #available(SwiftStdlib 5.1, *) {
tests.test("Structured concurrency base priority propagation") {
let task = Task(priority: .background) {
await loopUntil(priority: .default)
let basePri = expectedBasePri(priority: .background)
let curPri = expectedCurrentPri(priority: .default)
// Structured concurrency via async let, escalated priority of
// parent should propagate
print("Testing propagation for async let structured concurrency child")
async let child = testNestedTaskPriority(basePri: basePri, curPri: curPri)
await child
let dispatchGroup = DispatchGroup()
// Structured concurrency via task groups, escalated priority should
// propagate
await withTaskGroup(of: Void.self, returning: Void.self) { group in
dispatchGroup.enter()
group.addTask {
print("Testing propagation for task group regular child")
let _ = await testNestedTaskPriority(basePri: basePri, curPri: curPri)
dispatchGroup.leave()
return
}
dispatchGroup.enter()
group.addTask(priority: .utility) {
print("Testing propagation for task group child with specified priority")
let _ = await testNestedTaskPriority(basePri: .utility, curPri: curPri)
dispatchGroup.leave()
return
}
// Wait for child tasks to finish running, don't await since that
// will escalate them
dispatchGroup.wait()
}
}
await task.value // Escalate task BG->DEF
}
tests.test("Unstructured base priority propagation") {
let task = Task(priority : .background) {
await loopUntil(priority: .default)
let basePri = expectedBasePri(priority: .background)
let _ = expectedCurrentPri(priority: .default)
let group = DispatchGroup()
// Create an unstructured task
group.enter()
let _ = Task {
let _ = await testNestedTaskPriority(basePri: basePri, curPri: basePri)
group.leave()
return
}
// Wait for unstructured task to finish running, don't await it
// since that will escalate
group.wait()
}
await task.value // Escalate task BG->DEF
}
}
await runAllTestsAsync()
}
await top_level.value
}
}

View File

@@ -140,9 +140,9 @@ static std::pair<AsyncTask*, Context*>
createTaskWithContext(JobPriority priority, Fn &&fn) {
auto invoke =
TaskContinuationFromLambda<Fn, Context>::get(std::move(fn));
TaskCreateFlags flags;
flags.setPriority(priority);
auto pair = swift_task_create_common(flags.getOpaqueValue(),
TaskCreateFlags createFlags;
createFlags.setRequestedPriority(priority);
auto pair = swift_task_create_common(createFlags.getOpaqueValue(),
nullptr,
nullptr,
invoke,