Files
swift-mirror/stdlib/public/Concurrency/TaskStatus.cpp
2021-02-22 13:26:27 +09:00

610 lines
22 KiB
C++

//===--- TaskStatus.cpp - Asynchronous task status tracking ---------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// Routines for maintaining and interacting with the current state of a
// task, including tracking child tasks, deadlines, and cancellation.
//
//===----------------------------------------------------------------------===//
#include "swift/Runtime/Concurrency.h"
#include "swift/Runtime/Mutex.h"
#include "swift/ABI/TaskStatus.h"
#include <atomic>
using namespace swift;
/**************************************************************************/
/************************* RECORD LOCK MANAGEMENT *************************/
/**************************************************************************/
/// A lock used to protect management of task-specific status
/// record locks.
static StaticConditionVariable::StaticMutex StatusRecordLockLock;
namespace {
/// A lock record which can be used to protect a task's active
/// status records.
///
/// For the most part, the active task status records of a task are
/// only accessed by the task itself. If that were always true,
/// no synchronization would be required to change them. However,
/// cancellation and escalation can occur asynchronously, and they
/// must be able to inspect the status records without worrying about
/// their concurrent modification or destruction of the records.
/// Therefore, these operations freeze the active status records
/// for their duration. They do this by (1) setting a bit in the
/// task's `Status` field state which says that the records are
/// locked and (2) creating a lock record as the new innermost
/// status record. When the operation is complete, it removes this
/// record and clears the lock bit, then notifies the lock record that
/// the locking operation is complete.
///
/// When a task wants to change its active status record, but
/// it sees that the locked bit is set in the `Status` field, it
/// must acquire the global status-record lock, find this record
/// (which should be the innermost record), and wait for an unlock.
class StatusRecordLockRecord : public TaskStatusRecord {
/// A lock held by the locking thread for the duration of some
/// operation. The real lock for the status record state is the
/// isLocked() bit in the active state; this lock is just a
/// mechanism to allow threads to wait for that lock. This is
/// rather unfortunately heavyweight, but we're willing make
/// locking expensive if it makes a task's normal record
/// manipulations as cheap as possible.
Mutex LockingThreadLock;
/// A condition variable that the locking thread waits for if
/// there are active unlock waiters when it tries to unlock.
ConditionVariable LockerQueue;
// These fields are protected by StatusRecordLockLock,
// not LockingThreadLock.
/// The number of threads waiting for Locked to become false.
size_t NumUnlockWaiters : CHAR_BIT * sizeof(size_t) - 1;
/// True if the lock has been cleared.
size_t Locked : 1;
public:
StatusRecordLockRecord(TaskStatusRecord *parent)
: TaskStatusRecord(TaskStatusRecordKind::Private_RecordLock, parent),
NumUnlockWaiters(0), Locked(true) {
// This is always initialized on the locking thread, and
// the private lock starts off locked.
LockingThreadLock.lock();
}
~StatusRecordLockRecord() {
// Unlock the lock before destroying it.
if (Locked) LockingThreadLock.unlock();
}
/// Wait on the queue until there's an unlock.
void
waitForUnlock(StaticConditionVariable::StaticMutex::ScopedLock &globalLock) {
assert(Locked);
// Flag that we're waiting, then drop the global lock.
NumUnlockWaiters++;
{
StaticConditionVariable::StaticMutex::ScopedUnlock globalUnlock(
StatusRecordLockLock);
// Attempt to acquire the locking-thread lock, thereby
// waiting until the locking thread unlocks the record.
{
Mutex::ScopedLock acquirePrivateLock(LockingThreadLock);
}
// Now reacquire the global lock.
}
// The record should always be unlocked now.
assert(!Locked);
// Remove ourselves from the count, and if the count is zero,
// wake the locking thread.
NumUnlockWaiters--;
if (NumUnlockWaiters == 0)
LockerQueue.notifyAll();
}
/// Wake up any threads that were waiting for unlock. Must be
/// called by the locking thread.
void unlock() {
StaticConditionVariable::StaticMutex::ScopedLock globalLock(
StatusRecordLockLock);
assert(Locked);
Locked = false;
// Unlock the locking-thread lock, balancing out the lock()
// call in the constructor. This allows any unlock waiters
// to wake up.
LockingThreadLock.unlock();
// As soon as we don't have any unlock waiters, we're done.
while (NumUnlockWaiters) {
// In the meantime, wait on the locker queue, temporarily
// releasing the global lock.
// FIXME: this is a priority inversion; we really want to
// escalate the priority of the waiting threads.
StatusRecordLockLock.wait(LockerQueue);
}
}
static bool classof(const TaskStatusRecord *record) {
return record->getKind() == TaskStatusRecordKind::Private_RecordLock;
}
};
}
/// Wait for a task's status record lock to be unlocked.
///
/// When this function returns, `oldStatus` will have been updated
/// to the last value read and `isLocked()` will be false.
/// Of course, another thread may still be concurrently trying
/// to acquire the record lock.
static void waitForStatusRecordUnlock(AsyncTask *task,
ActiveTaskStatus &oldStatus) {
assert(oldStatus.isLocked());
// Acquire the lock.
StaticConditionVariable::StaticMutex::ScopedLock globalLock(
StatusRecordLockLock);
while (true) {
// Check that oldStatus is still correct.
oldStatus = task->Status.load(std::memory_order_acquire);
if (!oldStatus.isLocked())
return;
// The innermost entry should be a record lock record; wait
// for it to be unlocked.
auto record = oldStatus.getInnermostRecord();
auto recordLockRecord = cast<StatusRecordLockRecord>(record);
recordLockRecord->waitForUnlock(globalLock);
}
}
/// Acquire a task's status record lock and return the
/// previous value of its status record state.
///
/// If `forCancellation` is true, the cancelled bit will be set in the
/// state, and the lock will not be acquired if the task is already
/// cancelled or can be cancelled without the lock. If this occurs,
/// `isCancelled()` will be true for the return value.
static ActiveTaskStatus
acquireStatusRecordLock(AsyncTask *task,
Optional<StatusRecordLockRecord> &recordLockRecord,
bool forCancellation) {
auto loadOrdering = forCancellation
? std::memory_order_acquire
: std::memory_order_relaxed;
// Load the current state. We can use relaxed loads if this isn't
// for cancellation because (1) this operation should be synchronous
// with the task, so the only thing that can modify it asynchronously
// is a cancelling thread, and (2) we'll reload with acquire ordering
// if a cancelling thread forces us to wait for an unlock.
auto oldStatus = task->Status.load(loadOrdering);
while (true) {
// Cancellation should be idempotent: if the task has already
// been cancelled (or is being cancelled concurrently), there
// shouldn't be any need to do this work again.
if (oldStatus.isCancelled() && forCancellation)
return oldStatus;
// If the old info says we're locked, wait for the lock to clear.
if (oldStatus.isLocked()) {
waitForStatusRecordUnlock(task, oldStatus);
continue;
}
// If we're cancelling and the task has no active status records,
// try to just set the cancelled bit and return.
auto oldRecord = oldStatus.getInnermostRecord();
if (!oldRecord && forCancellation) {
ActiveTaskStatus newStatus(nullptr,
/*cancelled*/ true,
/*locked*/ false);
if (task->Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_relaxed,
/*failure*/ loadOrdering))
return newStatus;
// If that failed, just restart.
continue;
}
// Make (or reconfigure) a lock record.
if (!recordLockRecord) {
recordLockRecord.emplace(oldRecord);
} else {
recordLockRecord->resetParent(oldRecord);
}
// Install the lock record as the active cancellation info, or
// restart if that fails.
bool newIsCancelled = forCancellation || oldStatus.isCancelled();
ActiveTaskStatus newStatus(&*recordLockRecord,
/*cancelled*/ newIsCancelled,
/*locked*/ true);
if (task->Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ loadOrdering))
return oldStatus;
}
}
/// Release a task's status record lock that was previously
/// acquired on this thread.
static void releaseStatusRecordLock(AsyncTask *task,
ActiveTaskStatus newStatus,
Optional<StatusRecordLockRecord> &recordLockRecord) {
assert(!newStatus.isLocked());
// We can just unconditionally store because nobody can be modifying
// the state while we've locked it. The task shouldn't depend
// on memory-ordering with anything we've done, so we can use a
// relaxed store.
task->Status.store(newStatus, std::memory_order_relaxed);
// Unlock the record lock.
recordLockRecord->unlock();
}
/**************************************************************************/
/*************************** RECORD MANAGEMENT ****************************/
/**************************************************************************/
bool swift::swift_task_addStatusRecord(AsyncTask *task,
TaskStatusRecord *newRecord) {
// Load the current state. We can use a relaxed load because we're
// synchronous with the task.
auto oldStatus = task->Status.load(std::memory_order_relaxed);
while (true) {
// Wait for any active lock to be released.
if (oldStatus.isLocked())
waitForStatusRecordUnlock(task, oldStatus);
// Reset the parent of the new record.
newRecord->resetParent(oldStatus.getInnermostRecord());
// Set the record as the new innermost record.
// We have to use a release on success to make the initialization of
// the new record visible to the cancelling thread.
ActiveTaskStatus newStatus(newRecord,
oldStatus.isCancelled(),
/*locked*/ false);
if (task->Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
return !oldStatus.isCancelled();
}
}
bool swift::swift_task_tryAddStatusRecord(AsyncTask *task,
TaskStatusRecord *newRecord) {
// Load the current state. We can use a relaxed load because we're
// synchronous with the task.
auto oldStatus = task->Status.load(std::memory_order_relaxed);
while (true) {
// If the old info is already cancelled, do nothing.
if (oldStatus.isCancelled())
return false;
// Wait for any active lock to be released.
if (oldStatus.isLocked()) {
waitForStatusRecordUnlock(task, oldStatus);
if (oldStatus.isCancelled())
return false;
}
// Reset the parent of the new record.
newRecord->resetParent(oldStatus.getInnermostRecord());
// Set the record as the new innermost record.
// We have to use a release on success to make the initialization of
// the new record visible to the cancelling thread.
ActiveTaskStatus newStatus(newRecord,
/*cancelled*/ false,
/*locked*/ false);
if (task->Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
return true;
}
}
bool swift::swift_task_removeStatusRecord(AsyncTask *task,
TaskStatusRecord *record) {
// Load the current state.
auto oldStatus = task->Status.load(std::memory_order_relaxed);
while (true) {
// Wait for any active lock to be released.
if (oldStatus.isLocked())
waitForStatusRecordUnlock(task, oldStatus);
// If the record is the innermost record, try to just pop it off.
if (oldStatus.getInnermostRecord() == record) {
ActiveTaskStatus newStatus(record->getParent(),
oldStatus.isCancelled(),
/*locked*/ false);
if (task->Status.compare_exchange_weak(oldStatus, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
return !oldStatus.isCancelled();
// Otherwise, restart.
continue;
}
// If the record is not the innermost record, we need to acquire the
// record lock; there's no way to splice the record list safely with
// a thread that's attempting to acquire the lock.
break;
}
// Acquire the status record lock.
Optional<StatusRecordLockRecord> recordLockRecord;
oldStatus = acquireStatusRecordLock(task, recordLockRecord,
/*forCancellation*/ false);
assert(!oldStatus.isLocked());
// We can't observe the record to be the innermost record here because
// that would require some other thread to be concurrently structurally
// changing the set of status records, but we're running
// synchronously with the task.
auto cur = oldStatus.getInnermostRecord();
assert(cur != record);
// Splice the record out.
while (true) {
auto next = cur->getParent();
if (next == record) {
cur->spliceParent(record->getParent());
break;
}
}
// Release the lock. Since the record can't be the root, we don't
// have to worry about replacing the root, and oldStatus is always
// exactly what we want to restore.
releaseStatusRecordLock(task, oldStatus, recordLockRecord);
return !oldStatus.isCancelled();
}
/**************************************************************************/
/****************************** CANCELLATION ******************************/
/**************************************************************************/
/// Perform any cancellation actions required by the given record.
static void performCancellationAction(TaskStatusRecord *record) {
fprintf(stderr, "[%s:%d] (%s) \n", __FILE__, __LINE__, __FUNCTION__);
switch (record->getKind()) {
// Deadlines don't require any special support.
case TaskStatusRecordKind::Deadline:
return;
// Child tasks need to be recursively cancelled.
case TaskStatusRecordKind::ChildTask:
case TaskStatusRecordKind::GroupChildTask: {
fprintf(stderr, "[%s:%d] (%s) child task\n", __FILE__, __LINE__, __FUNCTION__);
auto childRecord = cast<ChildTaskStatusRecord>(record);
for (AsyncTask *child: childRecord->children())
swift_task_cancel(child);
return;
}
// Cancellation notifications need to be called.
case TaskStatusRecordKind::CancellationNotification: {
auto notification =
cast<CancellationNotificationStatusRecord>(record);
notification->run();
return;
}
// Escalation notifications can be ignored.
case TaskStatusRecordKind::EscalationNotification:
return;
// Record locks shouldn't be found this way, but they don't have
// anything to do anyway.
case TaskStatusRecordKind::Private_RecordLock:
return;
}
// Other cases can fall through here and be ignored.
// FIXME: allow dynamic extension/correction?
}
void swift::swift_task_cancel(AsyncTask *task) {
fprintf(stderr, "[%s:%d] (%s): cancel: %d\n", __FILE__, __LINE__, __FUNCTION__, task);
Optional<StatusRecordLockRecord> recordLockRecord;
// Acquire the status record lock.
auto oldStatus = acquireStatusRecordLock(task, recordLockRecord,
/*forCancellation*/ true);
assert(!oldStatus.isLocked());
// If we were already cancelled or were able to cancel without acquiring
// the lock, there's nothing else to do.
if (oldStatus.isCancelled()) {
fprintf(stderr, "[%s:%d] (%s): was already cancelled: %d\n", __FILE__, __LINE__, __FUNCTION__, task);
return;
}
// Otherwise, we've installed the lock record and are now the
// locking thread.
// Carry out the cancellation operations associated with all
// the active records.
for (auto cur: oldStatus.records()) {
assert(false);
performCancellationAction(cur);
}
// Release the status record lock, being sure to flag that
// the task is now cancelled.
ActiveTaskStatus cancelledStatus(oldStatus.getInnermostRecord(),
/*cancelled*/ true,
/*locked*/ false);
releaseStatusRecordLock(task, cancelledStatus, recordLockRecord);
auto newStatus = acquireStatusRecordLock(task, recordLockRecord,
/*forCancellation*/ true);
assert(newStatus.isCancelled());
releaseStatusRecordLock(task, cancelledStatus, recordLockRecord);
fprintf(stderr, "[%s:%d] (%s): ok was cancelled: %d\n", __FILE__, __LINE__, __FUNCTION__, task);
}
/**************************************************************************/
/******************************* ESCALATION *******************************/
/**************************************************************************/
/// Perform any escalation actions required by the given record.
static void performEscalationAction(TaskStatusRecord *record,
JobPriority newPriority) {
switch (record->getKind()) {
// Deadlines don't require any special support.
case TaskStatusRecordKind::Deadline:
return;
// Child tasks need to be recursively escalated.
case TaskStatusRecordKind::ChildTask:
case TaskStatusRecordKind::GroupChildTask: {
auto childRecord = cast<ChildTaskStatusRecord>(record);
for (AsyncTask *child: childRecord->children())
swift_task_escalate(child, newPriority);
return;
}
// Cancellation notifications can be ignore.
case TaskStatusRecordKind::CancellationNotification:
return;
// Escalation notifications need to be called.
case TaskStatusRecordKind::EscalationNotification: {
auto notification =
cast<EscalationNotificationStatusRecord>(record);
notification->run(newPriority);
return;
}
// Record locks shouldn't be found this way, but they don't have
// anything to do anyway.
case TaskStatusRecordKind::Private_RecordLock:
return;
}
// Other cases can fall through here and be ignored.
// FIXME: allow dynamic extension/correction?
}
JobPriority
swift::swift_task_escalate(AsyncTask *task, JobPriority newPriority) {
Optional<StatusRecordLockRecord> recordLockRecord;
// Fast path: check that the task's priority is not already at least
// as high as the target. The task's priority can only be modified
// under the status record lock; it's possible that the priority could
// be getting simultaneously escalated, but it's okay for us to return
// before that's complete.
if (task->Flags.getPriority() >= newPriority)
return task->Flags.getPriority();
// Acquire the status record lock.
auto oldStatus = acquireStatusRecordLock(task, recordLockRecord,
/*forCancellation*/ false);
assert(!oldStatus.isLocked());
// Now that we have the task's status lock, check again that the
// priority is still too low.
auto priorityToReturn = task->Flags.getPriority();
if (priorityToReturn < newPriority) {
// Change the priority.
task->Flags.setPriority(newPriority);
priorityToReturn = newPriority;
// TODO: attempt to escalate the thread running the task, if it's
// currently running. This probably requires the task to be enqueued
// on a standard executor.
// Perform escalation operations for all the status records.
for (auto cur: oldStatus.records()) {
performEscalationAction(cur, newPriority);
}
}
// Release the status record lock, restoring the old status.
releaseStatusRecordLock(task, oldStatus, recordLockRecord);
return priorityToReturn;
}
/**************************************************************************/
/******************************** DEADLINE ********************************/
/**************************************************************************/
NearestTaskDeadline swift::swift_task_getNearestDeadline(AsyncTask *task) {
// We don't have to worry about the deadline records being
// concurrently modified, so we can just walk the record chain,
// ignoring the possibility of a concurrent cancelling task.
// Load the current state.
auto oldStatus = task->Status.load(std::memory_order_relaxed);
NearestTaskDeadline result;
// If it's already cancelled, we're done.
if (oldStatus.isCancelled()) {
result.ValueKind = NearestTaskDeadline::AlreadyCancelled;
return result;
}
// If it's locked, wait for the lock; we can't safely step through
// the RecordLockStatusRecord on a different thread.
if (oldStatus.isLocked()) {
waitForStatusRecordUnlock(task, oldStatus);
assert(!oldStatus.isLocked());
}
// Walk all the records looking for deadlines.
result.ValueKind = NearestTaskDeadline::None;
for (const auto *record: oldStatus.records()) {
auto deadlineRecord = dyn_cast<DeadlineStatusRecord>(record);
if (!deadlineRecord) continue;
auto recordDeadline = deadlineRecord->getDeadline();
// If we already have a deadline, pick the earlier.
if (result.ValueKind == NearestTaskDeadline::Active) {
if (recordDeadline < result.Value)
result.Value = recordDeadline;
} else {
result.Value = recordDeadline;
result.ValueKind = NearestTaskDeadline::Active;
}
}
return result;
}