mirror of
https://github.com/apple/swift.git
synced 2025-12-14 20:36:38 +01:00
Tracking this as a single bit is actually largely uninteresting to the runtime. To handle priority escalation properly, we really need to track this at a finer grain of detail: recording that the task is running on a specific thread, enqueued on a specific actor, or so on. But starting by tracking a single bit is important for two reasons: - First, it's more realistic about the performance overheads of tasks: we're going to be doing this tracking eventually, and the cost of that tracking will be dominated by the atomic access, so doing that access now sets the baseline about right. - Second, it ensures that we've actually got runtime involvement in all the right places to do this tracking. A propos of the latter: there was no runtime involvement with awaiting a continuation, which is a point at which the task potentially transitions from running to suspended. We must do the tracking as part of this transition, rather than recognizing in the run-loops that a task is still active and treating it as having suspended, because the latter point potentially races with the resumption of the task. To do this, I've had to introduce a runtime function, swift_continuation_await, to do this awaiting rather than inlining the atomic operation on the continuation. As part of doing this work, I've also fixed a bug where we failed to load-acquire in swift_task_escalate before walking the task status records to invoke escalation actions. I've also fixed several places where the handling of task statuses may have accidentally allowed the task to revert to uncancelled.
772 lines
27 KiB
C++
772 lines
27 KiB
C++
//===--- TaskStatus.cpp - Asynchronous task status tracking ---------------===//
|
|
//
|
|
// This source file is part of the Swift.org open source project
|
|
//
|
|
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
|
|
// Licensed under Apache License v2.0 with Runtime Library Exception
|
|
//
|
|
// See https://swift.org/LICENSE.txt for license information
|
|
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
//
|
|
// Routines for maintaining and interacting with the current state of a
|
|
// task, including tracking child tasks, deadlines, and cancellation.
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
#include "../CompatibilityOverride/CompatibilityOverride.h"
|
|
#include "swift/Runtime/Concurrency.h"
|
|
#include "swift/Runtime/Mutex.h"
|
|
#include "swift/ABI/TaskStatus.h"
|
|
#include "TaskPrivate.h"
|
|
#include <atomic>
|
|
|
|
using namespace swift;
|
|
|
|
inline TaskStatusRecord *
|
|
ActiveTaskStatus::getStatusRecordParent(TaskStatusRecord *ptr) {
|
|
return ptr->getParent();
|
|
}
|
|
|
|
/**************************************************************************/
|
|
/************************* RECORD LOCK MANAGEMENT *************************/
|
|
/**************************************************************************/
|
|
|
|
/// A lock used to protect management of task-specific status
|
|
/// record locks.
|
|
static StaticConditionVariable::StaticMutex StatusRecordLockLock;
|
|
|
|
namespace {
|
|
|
|
/// A lock record which can be used to protect a task's active
|
|
/// status records.
|
|
///
|
|
/// For the most part, the active task status records of a task are
|
|
/// only accessed by the task itself. If that were always true,
|
|
/// no synchronization would be required to change them. However,
|
|
/// cancellation and escalation can occur asynchronously, and they
|
|
/// must be able to inspect the status records without worrying about
|
|
/// their concurrent modification or destruction of the records.
|
|
/// Therefore, these operations freeze the active status records
|
|
/// for their duration. They do this by (1) setting a bit in the
|
|
/// task's `Status` field state which says that the records are
|
|
/// locked and (2) creating a lock record as the new innermost
|
|
/// status record. When the operation is complete, it removes this
|
|
/// record and clears the lock bit, then notifies the lock record that
|
|
/// the locking operation is complete.
|
|
///
|
|
/// When a task wants to change its active status record, but
|
|
/// it sees that the locked bit is set in the `Status` field, it
|
|
/// must acquire the global status-record lock, find this record
|
|
/// (which should be the innermost record), and wait for an unlock.
|
|
class StatusRecordLockRecord : public TaskStatusRecord {
|
|
/// A lock held by the locking thread for the duration of some
|
|
/// operation. The real lock for the status record state is the
|
|
/// isLocked() bit in the active state; this lock is just a
|
|
/// mechanism to allow threads to wait for that lock. This is
|
|
/// rather unfortunately heavyweight, but we're willing make
|
|
/// locking expensive if it makes a task's normal record
|
|
/// manipulations as cheap as possible.
|
|
Mutex LockingThreadLock;
|
|
|
|
/// A condition variable that the locking thread waits for if
|
|
/// there are active unlock waiters when it tries to unlock.
|
|
ConditionVariable LockerQueue;
|
|
|
|
// These fields are protected by StatusRecordLockLock,
|
|
// not LockingThreadLock.
|
|
|
|
/// The number of threads waiting for Locked to become false.
|
|
size_t NumUnlockWaiters : CHAR_BIT * sizeof(size_t) - 1;
|
|
|
|
/// True if the lock has been cleared.
|
|
size_t Locked : 1;
|
|
|
|
public:
|
|
StatusRecordLockRecord(TaskStatusRecord *parent)
|
|
: TaskStatusRecord(TaskStatusRecordKind::Private_RecordLock, parent),
|
|
NumUnlockWaiters(0), Locked(true) {
|
|
// This is always initialized on the locking thread, and
|
|
// the private lock starts off locked.
|
|
LockingThreadLock.lock();
|
|
}
|
|
|
|
~StatusRecordLockRecord() {
|
|
// Unlock the lock before destroying it.
|
|
if (Locked) LockingThreadLock.unlock();
|
|
}
|
|
|
|
/// Wait on the queue until there's an unlock.
|
|
void
|
|
waitForUnlock(StaticConditionVariable::StaticMutex::ScopedLock &globalLock) {
|
|
assert(Locked);
|
|
|
|
// Flag that we're waiting, then drop the global lock.
|
|
NumUnlockWaiters++;
|
|
{
|
|
StaticConditionVariable::StaticMutex::ScopedUnlock globalUnlock(
|
|
StatusRecordLockLock);
|
|
|
|
// Attempt to acquire the locking-thread lock, thereby
|
|
// waiting until the locking thread unlocks the record.
|
|
{
|
|
Mutex::ScopedLock acquirePrivateLock(LockingThreadLock);
|
|
}
|
|
|
|
// Now reacquire the global lock.
|
|
}
|
|
|
|
// The record should always be unlocked now.
|
|
assert(!Locked);
|
|
|
|
// Remove ourselves from the count, and if the count is zero,
|
|
// wake the locking thread.
|
|
NumUnlockWaiters--;
|
|
if (NumUnlockWaiters == 0)
|
|
LockerQueue.notifyAll();
|
|
}
|
|
|
|
/// Wake up any threads that were waiting for unlock. Must be
|
|
/// called by the locking thread.
|
|
void unlock() {
|
|
StaticConditionVariable::StaticMutex::ScopedLock globalLock(
|
|
StatusRecordLockLock);
|
|
assert(Locked);
|
|
Locked = false;
|
|
|
|
// Unlock the locking-thread lock, balancing out the lock()
|
|
// call in the constructor. This allows any unlock waiters
|
|
// to wake up.
|
|
LockingThreadLock.unlock();
|
|
|
|
// As soon as we don't have any unlock waiters, we're done.
|
|
while (NumUnlockWaiters) {
|
|
// In the meantime, wait on the locker queue, temporarily
|
|
// releasing the global lock.
|
|
// FIXME: this is a priority inversion; we really want to
|
|
// escalate the priority of the waiting threads.
|
|
StatusRecordLockLock.wait(LockerQueue);
|
|
}
|
|
}
|
|
|
|
static bool classof(const TaskStatusRecord *record) {
|
|
return record->getKind() == TaskStatusRecordKind::Private_RecordLock;
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
/// Wait for a task's status record lock to be unlocked.
|
|
///
|
|
/// When this function returns, `oldStatus` will have been updated
|
|
/// to the last value read and `isLocked()` will be false.
|
|
/// Of course, another thread may still be concurrently trying
|
|
/// to acquire the record lock.
|
|
static void waitForStatusRecordUnlock(AsyncTask *task,
|
|
ActiveTaskStatus &oldStatus) {
|
|
assert(oldStatus.isLocked());
|
|
|
|
// Acquire the lock.
|
|
StaticConditionVariable::StaticMutex::ScopedLock globalLock(
|
|
StatusRecordLockLock);
|
|
|
|
while (true) {
|
|
// Check that oldStatus is still correct.
|
|
oldStatus = task->_private().Status.load(std::memory_order_acquire);
|
|
if (!oldStatus.isLocked())
|
|
return;
|
|
|
|
// The innermost entry should be a record lock record; wait
|
|
// for it to be unlocked.
|
|
auto record = oldStatus.getInnermostRecord();
|
|
auto recordLockRecord = cast<StatusRecordLockRecord>(record);
|
|
recordLockRecord->waitForUnlock(globalLock);
|
|
}
|
|
}
|
|
|
|
enum class LockContext {
|
|
/// The lock is being acquired from within the running task.
|
|
OnTask,
|
|
|
|
/// The lock is being acquired asynchronously in order to cancel the
|
|
/// task.
|
|
Cancellation,
|
|
|
|
/// The lock is being acquired asynchronously in order to read the
|
|
/// status records for some other reason.
|
|
OtherAsynchronous
|
|
};
|
|
|
|
/// Acquire a task's status record lock and return the
|
|
/// previous value of its status record state.
|
|
///
|
|
/// If `forCancellation` is true, the cancelled bit will be set in the
|
|
/// state, and the lock will not be acquired if the task is already
|
|
/// cancelled or can be cancelled without the lock. If this occurs,
|
|
/// `isCancelled()` will be true for the return value.
|
|
static ActiveTaskStatus
|
|
acquireStatusRecordLock(AsyncTask *task,
|
|
Optional<StatusRecordLockRecord> &recordLockRecord,
|
|
LockContext lockContext) {
|
|
auto loadOrdering = lockContext != LockContext::OnTask
|
|
? std::memory_order_acquire
|
|
: std::memory_order_relaxed;
|
|
bool forCancellation = lockContext == LockContext::Cancellation;
|
|
|
|
// Load the current state. We can use relaxed loads if this isn't
|
|
// for cancellation because (1) this operation should be synchronous
|
|
// with the task, so the only thing that can modify it asynchronously
|
|
// is a cancelling thread, and (2) we'll reload with acquire ordering
|
|
// if a cancelling thread forces us to wait for an unlock.
|
|
auto oldStatus = task->_private().Status.load(loadOrdering);
|
|
|
|
while (true) {
|
|
// Cancellation should be idempotent: if the task has already
|
|
// been cancelled (or is being cancelled concurrently), there
|
|
// shouldn't be any need to do this work again.
|
|
if (oldStatus.isCancelled() && forCancellation)
|
|
return oldStatus;
|
|
|
|
// If the old info says we're locked, wait for the lock to clear.
|
|
if (oldStatus.isLocked()) {
|
|
waitForStatusRecordUnlock(task, oldStatus);
|
|
continue;
|
|
}
|
|
|
|
// If we're cancelling and the task has no active status records,
|
|
// try to just set the cancelled bit and return.
|
|
auto oldRecord = oldStatus.getInnermostRecord();
|
|
if (!oldRecord && forCancellation) {
|
|
ActiveTaskStatus newStatus = oldStatus.withCancelled();
|
|
if (task->_private().Status.compare_exchange_weak(oldStatus, newStatus,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ loadOrdering))
|
|
return newStatus;
|
|
|
|
// If that failed, just restart.
|
|
continue;
|
|
}
|
|
|
|
// Make (or reconfigure) a lock record.
|
|
if (!recordLockRecord) {
|
|
recordLockRecord.emplace(oldRecord);
|
|
} else {
|
|
recordLockRecord->resetParent(oldRecord);
|
|
}
|
|
|
|
// Install the lock record as the active cancellation info, or
|
|
// restart if that fails.
|
|
ActiveTaskStatus newStatus =
|
|
oldStatus.withLockingRecord(&*recordLockRecord);
|
|
if (forCancellation)
|
|
newStatus = newStatus.withCancelled();
|
|
if (task->_private().Status.compare_exchange_weak(oldStatus, newStatus,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ loadOrdering))
|
|
return oldStatus;
|
|
}
|
|
}
|
|
|
|
/// Release a task's status record lock that was previously
|
|
/// acquired on this thread.
|
|
static void releaseStatusRecordLock(AsyncTask *task,
|
|
ActiveTaskStatus newStatus,
|
|
Optional<StatusRecordLockRecord> &recordLockRecord) {
|
|
assert(!newStatus.isLocked());
|
|
|
|
// We can just unconditionally store because nobody can be modifying
|
|
// the state while we've locked it. The task shouldn't depend
|
|
// on memory-ordering with anything we've done, so we can use a
|
|
// relaxed store.
|
|
task->_private().Status.store(newStatus, std::memory_order_relaxed);
|
|
|
|
// Unlock the record lock.
|
|
recordLockRecord->unlock();
|
|
}
|
|
|
|
/**************************************************************************/
|
|
/*************************** RECORD MANAGEMENT ****************************/
|
|
/**************************************************************************/
|
|
|
|
SWIFT_CC(swift)
|
|
static bool swift_task_addStatusRecordImpl(TaskStatusRecord *newRecord) {
|
|
auto task = swift_task_getCurrent();
|
|
|
|
// Load the current state. We can use a relaxed load because we're
|
|
// synchronous with the task.
|
|
auto oldStatus = task->_private().Status.load(std::memory_order_relaxed);
|
|
|
|
while (true) {
|
|
// Wait for any active lock to be released.
|
|
if (oldStatus.isLocked())
|
|
waitForStatusRecordUnlock(task, oldStatus);
|
|
|
|
// Reset the parent of the new record.
|
|
newRecord->resetParent(oldStatus.getInnermostRecord());
|
|
|
|
// Set the record as the new innermost record.
|
|
// We have to use a release on success to make the initialization of
|
|
// the new record visible to the cancelling thread.
|
|
ActiveTaskStatus newStatus = oldStatus.withInnermostRecord(newRecord);
|
|
if (task->_private().Status.compare_exchange_weak(oldStatus, newStatus,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_relaxed))
|
|
return !oldStatus.isCancelled();
|
|
}
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
|
|
auto task = swift_task_getCurrent();
|
|
|
|
// Load the current state. We can use a relaxed load because we're
|
|
// synchronous with the task.
|
|
auto oldStatus = task->_private().Status.load(std::memory_order_relaxed);
|
|
|
|
while (true) {
|
|
// If the old info is already cancelled, do nothing.
|
|
if (oldStatus.isCancelled())
|
|
return false;
|
|
|
|
// Wait for any active lock to be released.
|
|
if (oldStatus.isLocked()) {
|
|
waitForStatusRecordUnlock(task, oldStatus);
|
|
|
|
if (oldStatus.isCancelled())
|
|
return false;
|
|
}
|
|
|
|
// Reset the parent of the new record.
|
|
newRecord->resetParent(oldStatus.getInnermostRecord());
|
|
|
|
// Set the record as the new innermost record.
|
|
// We have to use a release on success to make the initialization of
|
|
// the new record visible to the cancelling thread.
|
|
ActiveTaskStatus newStatus = oldStatus.withInnermostRecord(newRecord);
|
|
if (task->_private().Status.compare_exchange_weak(oldStatus, newStatus,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_relaxed))
|
|
return true;
|
|
}
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
|
|
auto task = swift_task_getCurrent();
|
|
|
|
// Load the current state.
|
|
auto &status = task->_private().Status;
|
|
auto oldStatus = status.load(std::memory_order_relaxed);
|
|
|
|
while (true) {
|
|
// Wait for any active lock to be released.
|
|
if (oldStatus.isLocked())
|
|
waitForStatusRecordUnlock(task, oldStatus);
|
|
|
|
// If the record is the innermost record, try to just pop it off.
|
|
if (oldStatus.getInnermostRecord() == record) {
|
|
ActiveTaskStatus newStatus =
|
|
oldStatus.withInnermostRecord(record->getParent());
|
|
if (status.compare_exchange_weak(oldStatus, newStatus,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_relaxed)) {
|
|
return !oldStatus.isCancelled();
|
|
}
|
|
|
|
// Otherwise, restart.
|
|
continue;
|
|
}
|
|
|
|
// If the record is not the innermost record, we need to acquire the
|
|
// record lock; there's no way to splice the record list safely with
|
|
// a thread that's attempting to acquire the lock.
|
|
break;
|
|
}
|
|
|
|
// Acquire the status record lock.
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
oldStatus = acquireStatusRecordLock(task, recordLockRecord,
|
|
LockContext::OnTask);
|
|
assert(!oldStatus.isLocked());
|
|
|
|
// We can't observe the record to be the innermost record here because
|
|
// that would require some other thread to be concurrently structurally
|
|
// changing the set of status records, but we're running
|
|
// synchronously with the task.
|
|
auto cur = oldStatus.getInnermostRecord();
|
|
assert(cur != record);
|
|
|
|
// Splice the record out.
|
|
while (true) {
|
|
auto next = cur->getParent();
|
|
if (next == record) {
|
|
cur->spliceParent(record->getParent());
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Release the lock. Since the record can't be the root, we don't
|
|
// have to worry about replacing the root, and oldStatus is always
|
|
// exactly what we want to restore.
|
|
releaseStatusRecordLock(task, oldStatus, recordLockRecord);
|
|
|
|
return !oldStatus.isCancelled();
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static bool swift_task_hasTaskGroupStatusRecordImpl() {
|
|
auto task = swift_task_getCurrent();
|
|
|
|
// a group must be in a task, so if we're not in a task...
|
|
// then, we certainly are not in a group either!
|
|
if (!task)
|
|
return false;
|
|
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
|
|
// Acquire the status record lock.
|
|
auto oldStatus = acquireStatusRecordLock(task, recordLockRecord,
|
|
LockContext::OnTask);
|
|
assert(!oldStatus.isLocked());
|
|
|
|
// Scan for the task group record within all the active records.
|
|
auto foundTaskGroupRecord = false;
|
|
for (auto record: oldStatus.records()) {
|
|
if (record->getKind() == TaskStatusRecordKind::TaskGroup) {
|
|
foundTaskGroupRecord = true;
|
|
break; // out of the for loop
|
|
}
|
|
}
|
|
|
|
releaseStatusRecordLock(task, oldStatus, recordLockRecord);
|
|
|
|
return foundTaskGroupRecord;
|
|
}
|
|
|
|
/**************************************************************************/
|
|
/************************** CHILD TASK MANAGEMENT *************************/
|
|
/**************************************************************************/
|
|
|
|
// ==== Child tasks ------------------------------------------------------------
|
|
SWIFT_CC(swift)
|
|
static ChildTaskStatusRecord*
|
|
swift_task_attachChildImpl(AsyncTask *child) {
|
|
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
|
|
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
|
|
swift_task_addStatusRecord(record);
|
|
return record;
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static void
|
|
swift_task_detachChildImpl(ChildTaskStatusRecord *record) {
|
|
swift_task_removeStatusRecord(record);
|
|
}
|
|
|
|
/****************************** CANCELLATION ******************************/
|
|
/**************************************************************************/
|
|
|
|
/// Perform any cancellation actions required by the given record.
|
|
static void performCancellationAction(TaskStatusRecord *record) {
|
|
switch (record->getKind()) {
|
|
// Deadlines don't require any special support.
|
|
case TaskStatusRecordKind::Deadline:
|
|
return;
|
|
|
|
// Child tasks need to be recursively cancelled.
|
|
case TaskStatusRecordKind::ChildTask: {
|
|
auto childRecord = cast<ChildTaskStatusRecord>(record);
|
|
for (AsyncTask *child: childRecord->children())
|
|
swift_task_cancel(child);
|
|
return;
|
|
}
|
|
|
|
case TaskStatusRecordKind::TaskGroup: {
|
|
auto childRecord = cast<TaskGroupTaskStatusRecord>(record);
|
|
for (AsyncTask *child: childRecord->children())
|
|
swift_task_cancel(child);
|
|
return;
|
|
}
|
|
|
|
// Cancellation notifications need to be called.
|
|
case TaskStatusRecordKind::CancellationNotification: {
|
|
auto notification =
|
|
cast<CancellationNotificationStatusRecord>(record);
|
|
notification->run();
|
|
return;
|
|
}
|
|
|
|
// Escalation notifications can be ignored.
|
|
case TaskStatusRecordKind::EscalationNotification:
|
|
return;
|
|
|
|
// Record locks shouldn't be found this way, but they don't have
|
|
// anything to do anyway.
|
|
case TaskStatusRecordKind::Private_RecordLock:
|
|
return;
|
|
}
|
|
|
|
// Other cases can fall through here and be ignored.
|
|
// FIXME: allow dynamic extension/correction?
|
|
}
|
|
|
|
/// Perform any cancellation actions required by the given record.
|
|
static void performGroupCancellationAction(TaskStatusRecord *record) {
|
|
switch (record->getKind()) {
|
|
// We only need to cancel specific GroupChildTasks, not arbitrary child tasks.
|
|
// A task may be parent to many tasks which are not part of a group after all.
|
|
case TaskStatusRecordKind::ChildTask:
|
|
return;
|
|
|
|
case TaskStatusRecordKind::TaskGroup: {
|
|
auto groupChildRecord = cast<TaskGroupTaskStatusRecord>(record);
|
|
// Since a task can only be running a single task group at the same time,
|
|
// we can always assume that the group record which we found is the one
|
|
// we're intended to cancel child tasks for.
|
|
//
|
|
// A group enforces that tasks can not "escape" it, and as such once the group
|
|
// returns, all its task have been completed.
|
|
for (AsyncTask *child: groupChildRecord->children()) {
|
|
swift_task_cancel(child);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// All other kinds of records we handle the same way as in a normal cancellation
|
|
case TaskStatusRecordKind::Deadline:
|
|
case TaskStatusRecordKind::CancellationNotification:
|
|
case TaskStatusRecordKind::EscalationNotification:
|
|
case TaskStatusRecordKind::Private_RecordLock:
|
|
performCancellationAction(record);
|
|
return;
|
|
}
|
|
|
|
// Other cases can fall through here and be ignored.
|
|
// FIXME: allow dynamic extension/correction?
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static void swift_task_cancelImpl(AsyncTask *task) {
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
|
|
// Acquire the status record lock.
|
|
auto oldStatus = acquireStatusRecordLock(task, recordLockRecord,
|
|
LockContext::Cancellation);
|
|
assert(!oldStatus.isLocked());
|
|
|
|
// Lock acquisition will fail for LockContext::Cancellation if
|
|
// the task is already cancelled. In this case, we have nothing
|
|
// to do, not even releasing the lock.
|
|
if (oldStatus.isCancelled()) {
|
|
return;
|
|
}
|
|
|
|
// Otherwise, we've installed the lock record and are now the
|
|
// locking thread.
|
|
|
|
// Carry out the cancellation operations associated with all
|
|
// the active records.
|
|
for (auto cur: oldStatus.records()) {
|
|
performCancellationAction(cur);
|
|
}
|
|
|
|
// Release the status record lock, being sure to flag that
|
|
// the task is now cancelled.
|
|
ActiveTaskStatus cancelledStatus = oldStatus.withCancelled();
|
|
releaseStatusRecordLock(task, cancelledStatus, recordLockRecord);
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static void swift_task_cancel_group_child_tasksImpl(TaskGroup *group) {
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
|
|
// Acquire the status record lock.
|
|
//
|
|
// We purposefully DO NOT make this a cancellation by itself.
|
|
// We are cancelling the task group, and all tasks it contains.
|
|
// We are NOT cancelling the entire parent task though.
|
|
auto task = swift_task_getCurrent();
|
|
auto oldStatus = acquireStatusRecordLock(task, recordLockRecord,
|
|
LockContext::OnTask);
|
|
// Carry out the cancellation operations associated with all
|
|
// the active records.
|
|
for (auto cur: oldStatus.records()) {
|
|
performGroupCancellationAction(cur);
|
|
}
|
|
|
|
// Release the status record lock, restoring exactly the old status.
|
|
releaseStatusRecordLock(task, oldStatus, recordLockRecord);
|
|
}
|
|
|
|
/**************************************************************************/
|
|
/******************************* ESCALATION *******************************/
|
|
/**************************************************************************/
|
|
|
|
/// Perform any escalation actions required by the given record.
|
|
static void performEscalationAction(TaskStatusRecord *record,
|
|
JobPriority newPriority) {
|
|
switch (record->getKind()) {
|
|
// Deadlines don't require any special support.
|
|
case TaskStatusRecordKind::Deadline:
|
|
return;
|
|
|
|
// Child tasks need to be recursively escalated.
|
|
case TaskStatusRecordKind::ChildTask: {
|
|
auto childRecord = cast<ChildTaskStatusRecord>(record);
|
|
for (AsyncTask *child: childRecord->children())
|
|
swift_task_escalate(child, newPriority);
|
|
return;
|
|
}
|
|
case TaskStatusRecordKind::TaskGroup: {
|
|
auto childRecord = cast<TaskGroupTaskStatusRecord>(record);
|
|
for (AsyncTask *child: childRecord->children())
|
|
swift_task_escalate(child, newPriority);
|
|
return;
|
|
}
|
|
|
|
// Cancellation notifications can be ignore.
|
|
case TaskStatusRecordKind::CancellationNotification:
|
|
return;
|
|
|
|
// Escalation notifications need to be called.
|
|
case TaskStatusRecordKind::EscalationNotification: {
|
|
auto notification =
|
|
cast<EscalationNotificationStatusRecord>(record);
|
|
notification->run(newPriority);
|
|
return;
|
|
}
|
|
|
|
// Record locks shouldn't be found this way, but they don't have
|
|
// anything to do anyway.
|
|
case TaskStatusRecordKind::Private_RecordLock:
|
|
return;
|
|
}
|
|
|
|
// Other cases can fall through here and be ignored.
|
|
// FIXME: allow dynamic extension/correction?
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
JobPriority
|
|
static swift_task_escalateImpl(AsyncTask *task, JobPriority newPriority) {
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
|
|
// Fast path: check that the stored priority is already at least
|
|
// as high as the desired priority.
|
|
auto oldStatus = task->_private().Status.load(std::memory_order_relaxed);
|
|
if (oldStatus.getStoredPriority() >= newPriority)
|
|
return oldStatus.getStoredPriority();
|
|
|
|
// Acquire the status record lock. This has to do a load-acquire
|
|
// because we need to read the status records.
|
|
oldStatus = acquireStatusRecordLock(task, recordLockRecord,
|
|
LockContext::OtherAsynchronous);
|
|
assert(!oldStatus.isLocked());
|
|
|
|
// Now that we have the task's status lock, check again that the
|
|
// priority is still too low.
|
|
auto newStatus = oldStatus;
|
|
if (oldStatus.getStoredPriority() < newPriority) {
|
|
newStatus = oldStatus.withEscalatedPriority(newPriority);
|
|
|
|
// TODO: attempt to escalate the thread running the task, if it's
|
|
// currently running. This probably requires the task to be enqueued
|
|
// on a standard executor.
|
|
|
|
// Perform escalation operations for all the status records.
|
|
for (auto cur: oldStatus.records()) {
|
|
performEscalationAction(cur, newPriority);
|
|
}
|
|
}
|
|
|
|
// Release the status record lock, restoring the old status.
|
|
releaseStatusRecordLock(task, newStatus, recordLockRecord);
|
|
|
|
return newStatus.getStoredPriority();
|
|
}
|
|
|
|
void AsyncTask::flagAsRunning_slow() {
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
|
|
auto oldStatus = acquireStatusRecordLock(this, recordLockRecord,
|
|
LockContext::OnTask);
|
|
assert(!oldStatus.isLocked());
|
|
assert(!oldStatus.isRunning());
|
|
|
|
auto newStatus = oldStatus.withRunning(true);
|
|
if (newStatus.isStoredPriorityEscalated()) {
|
|
newStatus = newStatus.withoutStoredPriorityEscalation();
|
|
Flags.setPriority(oldStatus.getStoredPriority());
|
|
}
|
|
|
|
releaseStatusRecordLock(this, newStatus, recordLockRecord);
|
|
}
|
|
|
|
void AsyncTask::flagAsSuspended_slow() {
|
|
Optional<StatusRecordLockRecord> recordLockRecord;
|
|
|
|
auto oldStatus = acquireStatusRecordLock(this, recordLockRecord,
|
|
LockContext::OnTask);
|
|
assert(!oldStatus.isLocked());
|
|
assert(oldStatus.isRunning());
|
|
|
|
auto newStatus = oldStatus.withRunning(false);
|
|
if (newStatus.isStoredPriorityEscalated()) {
|
|
newStatus = newStatus.withoutStoredPriorityEscalation();
|
|
Flags.setPriority(oldStatus.getStoredPriority());
|
|
}
|
|
|
|
releaseStatusRecordLock(this, newStatus, recordLockRecord);
|
|
}
|
|
|
|
/**************************************************************************/
|
|
/******************************** DEADLINE ********************************/
|
|
/**************************************************************************/
|
|
SWIFT_CC(swift)
|
|
static NearestTaskDeadline swift_task_getNearestDeadlineImpl(AsyncTask *task) {
|
|
// We don't have to worry about the deadline records being
|
|
// concurrently modified, so we can just walk the record chain,
|
|
// ignoring the possibility of a concurrent cancelling task.
|
|
|
|
// Load the current state.
|
|
auto &status = task->_private().Status;
|
|
auto oldStatus = status.load(std::memory_order_relaxed);
|
|
|
|
NearestTaskDeadline result;
|
|
|
|
// If it's already cancelled, we're done.
|
|
if (oldStatus.isCancelled()) {
|
|
result.ValueKind = NearestTaskDeadline::AlreadyCancelled;
|
|
return result;
|
|
}
|
|
|
|
// If it's locked, wait for the lock; we can't safely step through
|
|
// the RecordLockStatusRecord on a different thread.
|
|
if (oldStatus.isLocked()) {
|
|
waitForStatusRecordUnlock(task, oldStatus);
|
|
assert(!oldStatus.isLocked());
|
|
}
|
|
|
|
// Walk all the records looking for deadlines.
|
|
result.ValueKind = NearestTaskDeadline::None;
|
|
for (const auto *record: oldStatus.records()) {
|
|
auto deadlineRecord = dyn_cast<DeadlineStatusRecord>(record);
|
|
if (!deadlineRecord) continue;
|
|
auto recordDeadline = deadlineRecord->getDeadline();
|
|
|
|
// If we already have a deadline, pick the earlier.
|
|
if (result.ValueKind == NearestTaskDeadline::Active) {
|
|
if (recordDeadline < result.Value)
|
|
result.Value = recordDeadline;
|
|
} else {
|
|
result.Value = recordDeadline;
|
|
result.ValueKind = NearestTaskDeadline::Active;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
#define OVERRIDE_TASK_STATUS COMPATIBILITY_OVERRIDE
|
|
#include COMPATIBILITY_OVERRIDE_INCLUDE_PATH
|