mirror of
https://github.com/apple/swift.git
synced 2025-12-14 20:36:38 +01:00
We were detaching the child by just modifying the list, but the cancellation path was assuming that that would not be done without holding the task status lock. This patch just fixes the current runtime; the back-deployment side is complicated. Fixes rdar://88398824
1046 lines
39 KiB
C++
1046 lines
39 KiB
C++
//===--- TaskGroup.cpp - Task Groups --------------------------------------===//
|
|
//
|
|
// This source file is part of the Swift.org open source project
|
|
//
|
|
// Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
|
|
// Licensed under Apache License v2.0 with Runtime Library Exception
|
|
//
|
|
// See https://swift.org/LICENSE.txt for license information
|
|
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
//
|
|
// Object management for child tasks that are children of a task group.
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
#include "../CompatibilityOverride/CompatibilityOverride.h"
|
|
|
|
#include "Debug.h"
|
|
#include "TaskGroupPrivate.h"
|
|
#include "TaskPrivate.h"
|
|
#include "bitset"
|
|
#include "queue" // TODO: remove and replace with usage of our mpsc queue
|
|
#include "string"
|
|
#include "swift/ABI/HeapObject.h"
|
|
#include "swift/ABI/Metadata.h"
|
|
#include "swift/ABI/Task.h"
|
|
#include "swift/ABI/TaskGroup.h"
|
|
#include "swift/Basic/RelativePointer.h"
|
|
#include "swift/Basic/STLExtras.h"
|
|
#include "swift/Runtime/Concurrency.h"
|
|
#include "swift/Runtime/Config.h"
|
|
#include "swift/Runtime/HeapObject.h"
|
|
#include "swift/Threading/Mutex.h"
|
|
#include <atomic>
|
|
#include <new>
|
|
|
|
#if !SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY
|
|
#include <mutex>
|
|
#endif
|
|
|
|
#include <assert.h>
|
|
#if SWIFT_CONCURRENCY_ENABLE_DISPATCH
|
|
#include <dispatch/dispatch.h>
|
|
#endif
|
|
|
|
#if !defined(_WIN32) && !defined(__wasi__) && __has_include(<dlfcn.h>)
|
|
#include <dlfcn.h>
|
|
#endif
|
|
|
|
using namespace swift;
|
|
|
|
/******************************************************************************/
|
|
/*************************** TASK GROUP ***************************************/
|
|
/******************************************************************************/
|
|
|
|
using FutureFragment = AsyncTask::FutureFragment;
|
|
|
|
namespace {
|
|
class TaskStatusRecord;
|
|
|
|
class TaskGroupImpl: public TaskGroupTaskStatusRecord {
|
|
public:
|
|
/// Describes the status of the group.
|
|
enum class ReadyStatus : uintptr_t {
|
|
/// The task group is empty, no tasks are pending.
|
|
/// Return immediately, there is no point in suspending.
|
|
///
|
|
/// The storage is not accessible.
|
|
Empty = 0b00,
|
|
|
|
// not used: 0b01; same value as the PollStatus MustWait,
|
|
// which does not make sense for the ReadyStatus
|
|
|
|
/// The future has completed with result (of type \c resultType).
|
|
Success = 0b10,
|
|
|
|
/// The future has completed by throwing an error (an \c Error
|
|
/// existential).
|
|
Error = 0b11,
|
|
};
|
|
|
|
enum class PollStatus : uintptr_t {
|
|
/// The group is known to be empty and we can immediately return nil.
|
|
Empty = 0b00,
|
|
|
|
/// The task has been enqueued to the groups wait queue.
|
|
MustWait = 0b01,
|
|
|
|
/// The task has completed with result (of type \c resultType).
|
|
Success = 0b10,
|
|
|
|
/// The task has completed by throwing an error (an \c Error existential).
|
|
Error = 0b11,
|
|
};
|
|
|
|
/// The result of waiting on the TaskGroupImpl.
|
|
struct PollResult {
|
|
PollStatus status; // TODO: pack it into storage pointer or not worth it?
|
|
|
|
/// Storage for the result of the future.
|
|
///
|
|
/// When the future completed normally, this is a pointer to the storage
|
|
/// of the result value, which lives inside the future task itself.
|
|
///
|
|
/// When the future completed by throwing an error, this is the error
|
|
/// object itself.
|
|
OpaqueValue *storage;
|
|
|
|
const Metadata *successType;
|
|
|
|
/// The completed task, if necessary to keep alive until consumed by next().
|
|
///
|
|
/// # Important: swift_release
|
|
/// If if a task is returned here, the task MUST be swift_released
|
|
/// once we are done with it, to balance out the retain made before
|
|
/// when the task was enqueued into the ready queue to keep it alive
|
|
/// until a next() call eventually picks it up.
|
|
AsyncTask *retainedTask;
|
|
|
|
bool isStorageAccessible() {
|
|
return status == PollStatus::Success ||
|
|
status == PollStatus::Error ||
|
|
status == PollStatus::Empty;
|
|
}
|
|
|
|
static PollResult get(AsyncTask *asyncTask, bool hadErrorResult) {
|
|
auto fragment = asyncTask->futureFragment();
|
|
return PollResult{
|
|
/*status*/ hadErrorResult ?
|
|
PollStatus::Error :
|
|
PollStatus::Success,
|
|
/*storage*/ hadErrorResult ?
|
|
reinterpret_cast<OpaqueValue *>(fragment->getError()) :
|
|
fragment->getStoragePtr(),
|
|
/*successType*/fragment->getResultType(),
|
|
/*task*/ asyncTask
|
|
};
|
|
}
|
|
};
|
|
|
|
/// An item within the message queue of a group.
|
|
struct ReadyQueueItem {
|
|
/// Mask used for the low status bits in a message queue item.
|
|
static const uintptr_t statusMask = 0x03;
|
|
|
|
uintptr_t storage;
|
|
|
|
ReadyStatus getStatus() const {
|
|
return static_cast<ReadyStatus>(storage & statusMask);
|
|
}
|
|
|
|
AsyncTask *getTask() const {
|
|
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
|
|
}
|
|
|
|
static ReadyQueueItem get(ReadyStatus status, AsyncTask *task) {
|
|
assert(task == nullptr || task->isFuture());
|
|
return ReadyQueueItem{
|
|
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
|
|
}
|
|
};
|
|
|
|
/// An item within the pending queue.
|
|
struct PendingQueueItem {
|
|
uintptr_t storage;
|
|
|
|
AsyncTask *getTask() const {
|
|
return reinterpret_cast<AsyncTask *>(storage);
|
|
}
|
|
|
|
static ReadyQueueItem get(AsyncTask *task) {
|
|
assert(task == nullptr || task->isFuture());
|
|
return ReadyQueueItem{reinterpret_cast<uintptr_t>(task)};
|
|
}
|
|
};
|
|
|
|
struct GroupStatus {
|
|
static const uint64_t cancelled = 0b1000000000000000000000000000000000000000000000000000000000000000;
|
|
static const uint64_t waiting = 0b0100000000000000000000000000000000000000000000000000000000000000;
|
|
|
|
// 31 bits for ready tasks counter
|
|
static const uint64_t maskReady = 0b0011111111111111111111111111111110000000000000000000000000000000;
|
|
static const uint64_t oneReadyTask = 0b0000000000000000000000000000000010000000000000000000000000000000;
|
|
|
|
// 31 bits for pending tasks counter
|
|
static const uint64_t maskPending = 0b0000000000000000000000000000000001111111111111111111111111111111;
|
|
static const uint64_t onePendingTask = 0b0000000000000000000000000000000000000000000000000000000000000001;
|
|
|
|
uint64_t status;
|
|
|
|
bool isCancelled() {
|
|
return (status & cancelled) > 0;
|
|
}
|
|
|
|
bool hasWaitingTask() {
|
|
return (status & waiting) > 0;
|
|
}
|
|
|
|
unsigned int readyTasks() {
|
|
return (status & maskReady) >> 31;
|
|
}
|
|
|
|
unsigned int pendingTasks() {
|
|
return (status & maskPending);
|
|
}
|
|
|
|
bool isEmpty() {
|
|
return pendingTasks() == 0;
|
|
}
|
|
|
|
/// Status value decrementing the Ready, Pending and Waiting counters by one.
|
|
GroupStatus completingPendingReadyWaiting() {
|
|
assert(pendingTasks() &&
|
|
"can only complete waiting task when pending tasks available");
|
|
assert(readyTasks() &&
|
|
"can only complete waiting task when ready tasks available");
|
|
assert(hasWaitingTask() &&
|
|
"can only complete waiting task when waiting task available");
|
|
return GroupStatus{status - waiting - oneReadyTask - onePendingTask};
|
|
}
|
|
|
|
GroupStatus completingPendingReady() {
|
|
assert(pendingTasks() &&
|
|
"can only complete waiting task when pending tasks available");
|
|
assert(readyTasks() &&
|
|
"can only complete waiting task when ready tasks available");
|
|
return GroupStatus{status - oneReadyTask - onePendingTask};
|
|
}
|
|
|
|
/// Pretty prints the status, as follows:
|
|
/// GroupStatus{ P:{pending tasks} W:{waiting tasks} {binary repr} }
|
|
std::string to_string() {
|
|
std::string str;
|
|
str.append("GroupStatus{ ");
|
|
str.append("C:"); // cancelled
|
|
str.append(isCancelled() ? "y " : "n ");
|
|
str.append("W:"); // has waiting task
|
|
str.append(hasWaitingTask() ? "y " : "n ");
|
|
str.append("R:"); // ready
|
|
str.append(std::to_string(readyTasks()));
|
|
str.append(" P:"); // pending
|
|
str.append(std::to_string(pendingTasks()));
|
|
str.append(" " + std::bitset<64>(status).to_string());
|
|
str.append(" }");
|
|
return str;
|
|
}
|
|
|
|
/// Initially there are no waiting and no pending tasks.
|
|
static const GroupStatus initial() {
|
|
return GroupStatus{0};
|
|
};
|
|
};
|
|
|
|
template<typename T>
|
|
class NaiveQueue {
|
|
std::queue <T> queue;
|
|
|
|
public:
|
|
NaiveQueue() = default;
|
|
|
|
NaiveQueue(const NaiveQueue<T> &) = delete;
|
|
|
|
NaiveQueue &operator=(const NaiveQueue<T> &) = delete;
|
|
|
|
NaiveQueue(NaiveQueue<T> &&other) {
|
|
queue = std::move(other.queue);
|
|
}
|
|
|
|
virtual ~NaiveQueue() {}
|
|
|
|
bool dequeue(T &output) {
|
|
if (queue.empty()) {
|
|
return false;
|
|
}
|
|
output = queue.front();
|
|
queue.pop();
|
|
return true;
|
|
}
|
|
|
|
void enqueue(const T item) {
|
|
queue.push(item);
|
|
}
|
|
};
|
|
|
|
private:
|
|
#if SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY || SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
|
// Synchronization is simple here. In a single threaded mode, all swift tasks
|
|
// run on a single thread so no coordination is needed. In a task-to-thread
|
|
// model, only the parent task which created the task group can
|
|
//
|
|
// (a) add child tasks to a group
|
|
// (b) run the child tasks
|
|
//
|
|
// So we shouldn't need to worry about coordinating between child tasks and
|
|
// parents in a task group
|
|
void lock() const {}
|
|
void unlock() const {}
|
|
#else
|
|
// TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
|
|
mutable std::mutex mutex_;
|
|
|
|
void lock() const { mutex_.lock(); }
|
|
void unlock() const { mutex_.unlock(); }
|
|
#endif
|
|
|
|
/// Used for queue management, counting number of waiting and ready tasks
|
|
std::atomic <uint64_t> status;
|
|
|
|
/// Queue containing completed tasks offered into this group.
|
|
///
|
|
/// The low bits contain the status, the rest of the pointer is the
|
|
/// AsyncTask.
|
|
NaiveQueue<ReadyQueueItem> readyQueue;
|
|
|
|
/// The task currently waiting on `group.next()`. Since only the owning
|
|
/// task can ever be waiting on a group, this is just either a reference
|
|
/// to that task or null.
|
|
std::atomic<AsyncTask *> waitQueue;
|
|
|
|
const Metadata *successType;
|
|
|
|
friend class ::swift::AsyncTask;
|
|
|
|
public:
|
|
explicit TaskGroupImpl(const Metadata *T)
|
|
: TaskGroupTaskStatusRecord(),
|
|
status(GroupStatus::initial().status),
|
|
readyQueue(),
|
|
waitQueue(nullptr), successType(T) {}
|
|
|
|
TaskGroupTaskStatusRecord *getTaskRecord() {
|
|
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
|
|
}
|
|
|
|
/// Destroy the storage associated with the group.
|
|
void destroy();
|
|
|
|
bool isEmpty() {
|
|
auto oldStatus = GroupStatus{status.load(std::memory_order_relaxed)};
|
|
return oldStatus.pendingTasks() == 0;
|
|
}
|
|
|
|
bool isCancelled() {
|
|
auto oldStatus = GroupStatus{status.load(std::memory_order_relaxed)};
|
|
return oldStatus.isCancelled();
|
|
}
|
|
|
|
/// Cancel the task group and all tasks within it.
|
|
///
|
|
/// Returns `true` if this is the first time cancelling the group, false otherwise.
|
|
bool cancelAll();
|
|
|
|
GroupStatus statusCancel() {
|
|
auto old = status.fetch_or(GroupStatus::cancelled,
|
|
std::memory_order_relaxed);
|
|
return GroupStatus{old};
|
|
}
|
|
|
|
/// Returns *assumed* new status, including the just performed +1.
|
|
GroupStatus statusMarkWaitingAssumeAcquire() {
|
|
auto old = status.fetch_or(GroupStatus::waiting, std::memory_order_acquire);
|
|
return GroupStatus{old | GroupStatus::waiting};
|
|
}
|
|
|
|
GroupStatus statusRemoveWaiting() {
|
|
auto old = status.fetch_and(~GroupStatus::waiting,
|
|
std::memory_order_release);
|
|
return GroupStatus{old};
|
|
}
|
|
|
|
/// Returns *assumed* new status, including the just performed +1.
|
|
GroupStatus statusAddReadyAssumeAcquire() {
|
|
auto old = status.fetch_add(GroupStatus::oneReadyTask,
|
|
std::memory_order_acquire);
|
|
auto s = GroupStatus{old + GroupStatus::oneReadyTask};
|
|
assert(s.readyTasks() <= s.pendingTasks());
|
|
return s;
|
|
}
|
|
|
|
/// Add a single pending task to the status counter.
|
|
/// This is used to implement next() properly, as we need to know if there
|
|
/// are pending tasks worth suspending/waiting for or not.
|
|
///
|
|
/// Note that the group does *not* store child tasks at all, as they are
|
|
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
|
|
/// is currently executing the group. Here we only need the counts of
|
|
/// pending/ready tasks.
|
|
///
|
|
/// If the `unconditionally` parameter is `true` the operation always successfully
|
|
/// adds a pending task, even if the group is cancelled. If the unconditionally
|
|
/// flag is `false`, the added pending count will be *reverted* before returning.
|
|
/// This is because we will NOT add a task to a cancelled group, unless doing
|
|
/// so unconditionally.
|
|
///
|
|
/// Returns *assumed* new status, including the just performed +1.
|
|
GroupStatus statusAddPendingTaskRelaxed(bool unconditionally) {
|
|
auto old = status.fetch_add(GroupStatus::onePendingTask,
|
|
std::memory_order_relaxed);
|
|
auto s = GroupStatus{old + GroupStatus::onePendingTask};
|
|
|
|
if (!unconditionally && s.isCancelled()) {
|
|
// revert that add, it was meaningless
|
|
auto o = status.fetch_sub(GroupStatus::onePendingTask,
|
|
std::memory_order_relaxed);
|
|
s = GroupStatus{o - GroupStatus::onePendingTask};
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
GroupStatus statusLoadRelaxed() {
|
|
return GroupStatus{status.load(std::memory_order_relaxed)};
|
|
}
|
|
|
|
/// Compare-and-set old status to a status derived from the old one,
|
|
/// by simultaneously decrementing one Pending and one Waiting tasks.
|
|
///
|
|
/// This is used to atomically perform a waiting task completion.
|
|
bool statusCompletePendingReadyWaiting(GroupStatus &old) {
|
|
return status.compare_exchange_strong(
|
|
old.status, old.completingPendingReadyWaiting().status,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ std::memory_order_relaxed);
|
|
}
|
|
|
|
bool statusCompletePendingReady(GroupStatus &old) {
|
|
return status.compare_exchange_strong(
|
|
old.status, old.completingPendingReady().status,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ std::memory_order_relaxed);
|
|
}
|
|
|
|
|
|
/// Offer result of a task into this task group.
|
|
///
|
|
/// If possible, and an existing task is already waiting on next(), this will
|
|
/// schedule it immediately. If not, the result is enqueued and will be picked
|
|
/// up whenever a task calls next() the next time.
|
|
void offer(AsyncTask *completed, AsyncContext *context);
|
|
|
|
/// Attempt to dequeue ready tasks and complete the waitingTask.
|
|
///
|
|
/// If unable to complete the waiting task immediately (with an readily
|
|
/// available completed task), either returns an `PollStatus::Empty`
|
|
/// result if it is known that no pending tasks in the group,
|
|
/// or a `PollStatus::MustWait` result if there are tasks in flight
|
|
/// and the waitingTask eventually be woken up by a completion.
|
|
PollResult poll(AsyncTask *waitingTask);
|
|
|
|
private:
|
|
// Enqueue the completed task onto ready queue if there are no waiting tasks
|
|
// yet
|
|
void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult);
|
|
};
|
|
|
|
} // end anonymous namespace
|
|
|
|
/******************************************************************************/
|
|
/************************ TASK GROUP IMPLEMENTATION ***************************/
|
|
/******************************************************************************/
|
|
|
|
using ReadyQueueItem = TaskGroupImpl::ReadyQueueItem;
|
|
using ReadyStatus = TaskGroupImpl::ReadyStatus;
|
|
using PollResult = TaskGroupImpl::PollResult;
|
|
using PollStatus = TaskGroupImpl::PollStatus;
|
|
|
|
static_assert(sizeof(TaskGroupImpl) <= sizeof(TaskGroup) &&
|
|
alignof(TaskGroupImpl) <= alignof(TaskGroup),
|
|
"TaskGroupImpl doesn't fit in TaskGroup");
|
|
|
|
static TaskGroupImpl *asImpl(TaskGroup *group) {
|
|
return reinterpret_cast<TaskGroupImpl*>(group);
|
|
}
|
|
|
|
static TaskGroup *asAbstract(TaskGroupImpl *group) {
|
|
return reinterpret_cast<TaskGroup*>(group);
|
|
}
|
|
|
|
TaskGroupTaskStatusRecord * TaskGroup::getTaskRecord() {
|
|
return asImpl(this)->getTaskRecord();
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== initialize -------------------------------------------------------------
|
|
|
|
// Initializes into the preallocated _group an actual TaskGroupImpl.
|
|
SWIFT_CC(swift)
|
|
static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) {
|
|
SWIFT_TASK_DEBUG_LOG("creating task group = %p", group);
|
|
|
|
TaskGroupImpl *impl = ::new (group) TaskGroupImpl(T);
|
|
auto record = impl->getTaskRecord();
|
|
assert(impl == record && "the group IS the task record");
|
|
|
|
// ok, now that the group actually is initialized: attach it to the task
|
|
addStatusRecord(record, [&](ActiveTaskStatus parentStatus) {
|
|
// If the task has already been cancelled, reflect that immediately in
|
|
// the group's status.
|
|
if (parentStatus.isCancelled()) {
|
|
impl->statusCancel();
|
|
}
|
|
return true;
|
|
});
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== child task management --------------------------------------------------
|
|
|
|
void TaskGroup::addChildTask(AsyncTask *child) {
|
|
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p", child, this);
|
|
|
|
// Add the child task to this task group. The corresponding removal
|
|
// won't happen until the parent task successfully polls for this child
|
|
// task, either synchronously in poll (if a task is available
|
|
// synchronously) or asynchronously in offer (otherwise). In either
|
|
// case, the work ends up being non-concurrent with the parent task.
|
|
|
|
// The task status record lock is held during this operation, which
|
|
// prevents us from racing with cancellation or escalation. We don't
|
|
// need to acquire the task group lock because the child list is only
|
|
// accessed under the task status record lock.
|
|
auto groupRecord = asImpl(this)->getTaskRecord();
|
|
groupRecord->attachChild(child);
|
|
}
|
|
|
|
void TaskGroup::removeChildTask(AsyncTask *child) {
|
|
SWIFT_TASK_DEBUG_LOG("detach child task = %p from group = %p", child, this);
|
|
|
|
auto groupRecord = asImpl(this)->getTaskRecord();
|
|
|
|
// The task status record lock is held during this operation, which
|
|
// prevents us from racing with cancellation or escalation. We don't
|
|
// need to acquire the task group lock because the child list is only
|
|
// accessed under the task status record lock.
|
|
groupRecord->detachChild(child);
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== destroy ----------------------------------------------------------------
|
|
SWIFT_CC(swift)
|
|
static void swift_taskGroup_destroyImpl(TaskGroup *group) {
|
|
asImpl(group)->destroy();
|
|
}
|
|
|
|
void TaskGroupImpl::destroy() {
|
|
SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this);
|
|
|
|
// First, remove the group from the task and deallocate the record
|
|
removeStatusRecord(getTaskRecord());
|
|
|
|
// No need to drain our queue here, as by the time we call destroy,
|
|
// all tasks inside the group must have been awaited on already.
|
|
// This is done in Swift's withTaskGroup function explicitly.
|
|
|
|
// destroy the group's storage
|
|
this->~TaskGroupImpl();
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== offer ------------------------------------------------------------------
|
|
|
|
void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) {
|
|
asImpl(this)->offer(completedTask, context);
|
|
}
|
|
|
|
bool TaskGroup::isCancelled() {
|
|
return asImpl(this)->isCancelled();
|
|
}
|
|
|
|
static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
|
|
PollResult result) {
|
|
/// Fill in the result value
|
|
switch (result.status) {
|
|
case PollStatus::MustWait:
|
|
assert(false && "filling a waiting status?");
|
|
return;
|
|
|
|
case PollStatus::Error: {
|
|
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
|
|
return;
|
|
}
|
|
|
|
case PollStatus::Success: {
|
|
// Initialize the result as an Optional<Success>.
|
|
const Metadata *successType = result.successType;
|
|
OpaqueValue *destPtr = context->successResultPointer;
|
|
// TODO: figure out a way to try to optimistically take the
|
|
// value out of the finished task's future, if there are no
|
|
// remaining references to it.
|
|
successType->vw_initializeWithCopy(destPtr, result.storage);
|
|
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
|
|
return;
|
|
}
|
|
|
|
case PollStatus::Empty: {
|
|
// Initialize the result as a nil Optional<Success>.
|
|
const Metadata *successType = result.successType;
|
|
OpaqueValue *destPtr = context->successResultPointer;
|
|
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// TaskGroup is locked upon entry and exit
|
|
void TaskGroupImpl::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) {
|
|
// Retain the task while it is in the queue; it must remain alive until
|
|
// it is found by poll. This retain will balanced by the release in poll.
|
|
swift_retain(completedTask);
|
|
|
|
auto readyItem = ReadyQueueItem::get(
|
|
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
|
|
completedTask
|
|
);
|
|
|
|
assert(completedTask == readyItem.getTask());
|
|
assert(readyItem.getTask()->isFuture());
|
|
readyQueue.enqueue(readyItem);
|
|
}
|
|
|
|
void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
|
|
assert(completedTask);
|
|
assert(completedTask->isFuture());
|
|
assert(completedTask->hasChildFragment());
|
|
assert(completedTask->hasGroupChildFragment());
|
|
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
|
|
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);
|
|
|
|
// The current ownership convention is that we are *not* given ownership
|
|
// of a retain on completedTask; we're called from the task completion
|
|
// handler, and the task will release itself. So if we need the task
|
|
// to survive this call (e.g. because there isn't an immediate waiting
|
|
// task), we will need to retain it, which we do in enqueueCompletedTask.
|
|
// This is wasteful, and the task completion function should be fixed to
|
|
// transfer ownership of a retain into this function, in which case we
|
|
// will need to release in the other path.
|
|
|
|
lock(); // TODO: remove fragment lock, and use status for synchronization
|
|
|
|
// Immediately increment ready count and acquire the status
|
|
// Examples:
|
|
// W:n R:0 P:3 -> W:n R:1 P:3 // no waiter, 2 more pending tasks
|
|
// W:n R:0 P:1 -> W:n R:1 P:1 // no waiter, no more pending tasks
|
|
// W:n R:0 P:1 -> W:y R:1 P:1 // complete immediately
|
|
// W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks
|
|
auto assumed = statusAddReadyAssumeAcquire();
|
|
|
|
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
|
|
reinterpret_cast<char *>(context) - sizeof(FutureAsyncContextPrefix));
|
|
bool hadErrorResult = false;
|
|
auto errorObject = asyncContextPrefix->errorResult;
|
|
if (errorObject) {
|
|
// instead, we need to enqueue this result:
|
|
hadErrorResult = true;
|
|
}
|
|
|
|
// ==== a) has waiting task, so let us complete it right away
|
|
if (assumed.hasWaitingTask()) {
|
|
auto waitingTask = waitQueue.load(std::memory_order_acquire);
|
|
SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p",
|
|
waitingTask, completedTask);
|
|
while (true) {
|
|
// ==== a) run waiting task directly -------------------------------------
|
|
assert(assumed.hasWaitingTask());
|
|
assert(assumed.pendingTasks() && "offered to group with no pending tasks!");
|
|
// We are the "first" completed task to arrive,
|
|
// and since there is a task waiting we immediately claim and complete it.
|
|
if (waitQueue.compare_exchange_strong(
|
|
waitingTask, nullptr,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_acquire)) {
|
|
|
|
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
|
// In the task-to-thread model, child tasks are always actually
|
|
// run synchronously on the parent task's thread. For task groups
|
|
// specifically, this means that poll() will pick a child task
|
|
// that was added to the group and run it to completion as a
|
|
// subroutine. Therefore, when we enter offer(), we know that
|
|
// the parent task is waiting and we can just return to it.
|
|
|
|
// The task-to-thread logic in poll() currently expects the child
|
|
// task to enqueue itself instead of just filling in the result in
|
|
// the waiting task. This is a little wasteful; there's no reason
|
|
// we can't just have the parent task set itself up as a waiter.
|
|
// But since it's what we're doing, we basically take the same
|
|
// path as we would if there wasn't a waiter.
|
|
enqueueCompletedTask(completedTask, hadErrorResult);
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
return;
|
|
|
|
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
|
if (statusCompletePendingReadyWaiting(assumed)) {
|
|
// Run the task.
|
|
auto result = PollResult::get(completedTask, hadErrorResult);
|
|
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
|
|
// Remove the child from the task group's running tasks list.
|
|
// The parent task isn't currently running (we're about to wake
|
|
// it up), so we're still synchronous with it. We can safely
|
|
// acquire our parent's status record lock here (which would
|
|
// ordinarily run the risk of deadlock, since e.g. cancellation
|
|
// does a parent -> child traversal while recursively holding
|
|
// locks) because we know that the child task is completed and
|
|
// we can't be holding its locks ourselves.
|
|
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
|
|
|
|
auto waitingContext =
|
|
static_cast<TaskFutureWaitAsyncContext *>(
|
|
waitingTask->ResumeContext);
|
|
|
|
fillGroupNextResult(waitingContext, result);
|
|
|
|
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
|
|
// TODO: allow the caller to suggest an executor
|
|
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
|
|
|
|
// completedTask will be released by the remainder of its
|
|
// completion function.
|
|
return;
|
|
} // else, try again
|
|
#endif
|
|
}
|
|
}
|
|
llvm_unreachable("should have enqueued and returned.");
|
|
} else {
|
|
// ==== b) enqueue completion ------------------------------------------------
|
|
//
|
|
// else, no-one was waiting (yet), so we have to instead enqueue to the message
|
|
// queue when a task polls during next() it will notice that we have a value
|
|
// ready for it, and will process it immediately without suspending.
|
|
assert(!waitQueue.load(std::memory_order_relaxed));
|
|
|
|
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
|
|
completedTask);
|
|
enqueueCompletedTask(completedTask, hadErrorResult);
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
SWIFT_CC(swiftasync)
|
|
static void
|
|
task_group_wait_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *_context) {
|
|
|
|
auto context = static_cast<TaskFutureWaitAsyncContext *>(_context);
|
|
auto resumeWithError =
|
|
reinterpret_cast<AsyncVoidClosureResumeEntryPoint *>(context->ResumeParent);
|
|
return resumeWithError(context->Parent, context->errorResult);
|
|
}
|
|
|
|
#ifdef __ARM_ARCH_7K__
|
|
__attribute__((noinline))
|
|
SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_throwingImpl(
|
|
OpaqueValue *result, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
|
|
TaskGroup *_group,
|
|
ThrowingTaskFutureWaitContinuationFunction resumeFunction,
|
|
AsyncContext *callContext) {
|
|
// Make sure we don't eliminate calls to this function.
|
|
asm volatile("" // Do nothing.
|
|
: // Output list, empty.
|
|
: "r"(result), "r"(callerContext), "r"(_group) // Input list.
|
|
: // Clobber list, empty.
|
|
);
|
|
return;
|
|
}
|
|
#endif
|
|
|
|
// =============================================================================
|
|
// ==== group.next() implementation (wait_next and groupPoll) ------------------
|
|
|
|
SWIFT_CC(swiftasync)
|
|
static void swift_taskGroup_wait_next_throwingImpl(
|
|
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
|
|
TaskGroup *_group,
|
|
ThrowingTaskFutureWaitContinuationFunction *resumeFunction,
|
|
AsyncContext *rawContext) {
|
|
auto waitingTask = swift_task_getCurrent();
|
|
waitingTask->ResumeTask = task_group_wait_resume_adapter;
|
|
waitingTask->ResumeContext = rawContext;
|
|
|
|
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
|
|
context->ResumeParent =
|
|
reinterpret_cast<TaskContinuationFunction *>(resumeFunction);
|
|
context->Parent = callerContext;
|
|
context->errorResult = nullptr;
|
|
context->successResultPointer = resultPointer;
|
|
|
|
auto group = asImpl(_group);
|
|
assert(group && "swift_taskGroup_wait_next_throwing was passed context without group!");
|
|
|
|
PollResult polled = group->poll(waitingTask);
|
|
switch (polled.status) {
|
|
case PollStatus::MustWait:
|
|
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
|
|
group, waitingTask);
|
|
// The waiting task has been queued on the channel,
|
|
// there were pending tasks so it will be woken up eventually.
|
|
#ifdef __ARM_ARCH_7K__
|
|
return workaround_function_swift_taskGroup_wait_next_throwingImpl(
|
|
resultPointer, callerContext, _group, resumeFunction, rawContext);
|
|
#else /* __ARM_ARCH_7K__ */
|
|
return;
|
|
#endif /* __ARM_ARCH_7K__ */
|
|
|
|
case PollStatus::Empty:
|
|
case PollStatus::Error:
|
|
case PollStatus::Success:
|
|
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
|
|
group, waitingTask, polled.retainedTask);
|
|
fillGroupNextResult(context, polled);
|
|
if (auto completedTask = polled.retainedTask) {
|
|
// Remove the child from the task group's running tasks list.
|
|
_swift_taskGroup_detachChild(asAbstract(group), completedTask);
|
|
|
|
// Balance the retain done by enqueueCompletedTask.
|
|
swift_release(completedTask);
|
|
}
|
|
|
|
return waitingTask->runInFullyEstablishedContext();
|
|
}
|
|
}
|
|
|
|
PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
|
|
lock(); // TODO: remove group lock, and use status for synchronization
|
|
SWIFT_TASK_DEBUG_LOG("poll group = %p", this);
|
|
|
|
PollResult result;
|
|
result.storage = nullptr;
|
|
result.successType = nullptr;
|
|
result.retainedTask = nullptr;
|
|
|
|
// Have we suspended the task?
|
|
bool hasSuspended = false;
|
|
bool haveRunOneChildTaskInline = false;
|
|
|
|
reevaluate_if_taskgroup_has_results:;
|
|
auto assumed = statusMarkWaitingAssumeAcquire();
|
|
if (haveRunOneChildTaskInline) {
|
|
assert(assumed.readyTasks());
|
|
}
|
|
|
|
// ==== 1) bail out early if no tasks are pending ----------------------------
|
|
if (assumed.isEmpty()) {
|
|
SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this);
|
|
// No tasks in flight, we know no tasks were submitted before this poll
|
|
// was issued, and if we parked here we'd potentially never be woken up.
|
|
// Bail out and return `nil` from `group.next()`.
|
|
statusRemoveWaiting();
|
|
result.status = PollStatus::Empty;
|
|
result.successType = this->successType;
|
|
unlock(); // TODO: remove group lock, and use status for synchronization
|
|
return result;
|
|
}
|
|
|
|
auto waitHead = waitQueue.load(std::memory_order_acquire);
|
|
|
|
// ==== 2) Ready task was polled, return with it immediately -----------------
|
|
if (assumed.readyTasks()) {
|
|
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
|
|
this, assumed.readyTasks());
|
|
|
|
auto assumedStatus = assumed.status;
|
|
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
|
|
if (status.compare_exchange_strong(
|
|
assumedStatus, newStatus.completingPendingReadyWaiting().status,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ std::memory_order_acquire)) {
|
|
|
|
// We're going back to running the task, so if we suspended before,
|
|
// we need to flag it as running again.
|
|
if (hasSuspended) {
|
|
waitingTask->flagAsRunning();
|
|
}
|
|
|
|
// Success! We are allowed to poll.
|
|
ReadyQueueItem item;
|
|
bool taskDequeued = readyQueue.dequeue(item);
|
|
assert(taskDequeued); (void) taskDequeued;
|
|
|
|
assert(item.getTask()->isFuture());
|
|
auto futureFragment = item.getTask()->futureFragment();
|
|
|
|
// Store the task in the result, so after we're done processing it may
|
|
// be swift_release'd; we kept it alive while it was in the readyQueue by
|
|
// an additional retain issued as we enqueued it there.
|
|
result.retainedTask = item.getTask();
|
|
|
|
// Note that the task was detached from the task group when it
|
|
// completed, so we don't need to do that bit of record-keeping here.
|
|
|
|
switch (item.getStatus()) {
|
|
case ReadyStatus::Success:
|
|
// Immediately return the polled value
|
|
result.status = PollStatus::Success;
|
|
result.storage = futureFragment->getStoragePtr();
|
|
result.successType = futureFragment->getResultType();
|
|
assert(result.retainedTask && "polled a task, it must be not null");
|
|
_swift_tsan_acquire(static_cast<Job *>(result.retainedTask));
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
return result;
|
|
|
|
case ReadyStatus::Error:
|
|
// Immediately return the polled value
|
|
result.status = PollStatus::Error;
|
|
result.storage =
|
|
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
|
|
result.successType = nullptr;
|
|
assert(result.retainedTask && "polled a task, it must be not null");
|
|
_swift_tsan_acquire(static_cast<Job *>(result.retainedTask));
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
return result;
|
|
|
|
case ReadyStatus::Empty:
|
|
result.status = PollStatus::Empty;
|
|
result.storage = nullptr;
|
|
result.retainedTask = nullptr;
|
|
result.successType = this->successType;
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
return result;
|
|
}
|
|
assert(false && "must return result when status compare-and-swap was successful");
|
|
} // else, we failed status-cas (some other waiter claimed a ready pending task, try again)
|
|
}
|
|
|
|
// ==== 3) Add to wait queue -------------------------------------------------
|
|
assert(assumed.readyTasks() == 0);
|
|
_swift_tsan_release(static_cast<Job *>(waitingTask));
|
|
while (true) {
|
|
if (!hasSuspended) {
|
|
hasSuspended = true;
|
|
waitingTask->flagAsSuspended();
|
|
}
|
|
// Put the waiting task at the beginning of the wait queue.
|
|
if (waitQueue.compare_exchange_strong(
|
|
waitHead, waitingTask,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_acquire)) {
|
|
unlock(); // TODO: remove fragment lock, and use status for synchronization
|
|
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
|
// The logic here is paired with the logic in TaskGroupImpl::offer. Once
|
|
// we run the
|
|
auto oldTask = _swift_task_clearCurrent();
|
|
assert(oldTask == waitingTask);
|
|
|
|
auto childTask = getTaskRecord()->getFirstChild();
|
|
assert(childTask != NULL);
|
|
|
|
SWIFT_TASK_DEBUG_LOG("[RunInline] Switching away from running %p to now running %p", oldTask, childTask);
|
|
// Run the new task on the same thread now - this should run the new task to
|
|
// completion. All swift tasks in task-to-thread model run on generic
|
|
// executor
|
|
swift_job_run(childTask, ExecutorRef::generic());
|
|
haveRunOneChildTaskInline = true;
|
|
|
|
SWIFT_TASK_DEBUG_LOG("[RunInline] Switching back from running %p to now running %p", childTask, oldTask);
|
|
// We are back to being the parent task and now that we've run the child
|
|
// task, we should reevaluate parent task
|
|
_swift_task_setCurrent(oldTask);
|
|
goto reevaluate_if_taskgroup_has_results;
|
|
#endif
|
|
// no ready tasks, so we must wait.
|
|
result.status = PollStatus::MustWait;
|
|
_swift_task_clearCurrent();
|
|
return result;
|
|
} // else, try again
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== isEmpty ----------------------------------------------------------------
|
|
SWIFT_CC(swift)
|
|
static bool swift_taskGroup_isEmptyImpl(TaskGroup *group) {
|
|
return asImpl(group)->isEmpty();
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== isCancelled ------------------------------------------------------------
|
|
SWIFT_CC(swift)
|
|
static bool swift_taskGroup_isCancelledImpl(TaskGroup *group) {
|
|
return asImpl(group)->isCancelled();
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== cancelAll --------------------------------------------------------------
|
|
|
|
SWIFT_CC(swift)
|
|
static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
|
|
asImpl(group)->cancelAll();
|
|
}
|
|
|
|
bool TaskGroupImpl::cancelAll() {
|
|
SWIFT_TASK_DEBUG_LOG("cancel all tasks in group = %p", this);
|
|
|
|
// Flag the task group itself as cancelled. If this was already
|
|
// done, any existing child tasks should already have been cancelled,
|
|
// and cancellation should automatically flow to any new child tasks,
|
|
// so there's nothing else for us to do.
|
|
auto old = statusCancel();
|
|
if (old.isCancelled()) {
|
|
return false;
|
|
}
|
|
|
|
// Cancel all the child tasks. TaskGroup is not a Sendable type,
|
|
// so cancelAll() can only be called from the owning task. This
|
|
// satisfies the precondition on cancelAllChildren().
|
|
_swift_taskGroup_cancelAllChildren(asAbstract(this));
|
|
|
|
return true;
|
|
}
|
|
|
|
SWIFT_CC(swift)
|
|
static void swift_task_cancel_group_child_tasksImpl(TaskGroup *group) {
|
|
// TaskGroup is not a Sendable type, and so this operation (which is not
|
|
// currently exposed in the API) can only be called from the owning
|
|
// task. This satisfies the precondition on cancelAllChildren().
|
|
_swift_taskGroup_cancelAllChildren(group);
|
|
}
|
|
|
|
/// Cancel all the children of the given task group.
|
|
///
|
|
/// The caller must guarantee that this is either called from the
|
|
/// owning task of the task group or while holding the owning task's
|
|
/// status record lock.
|
|
void swift::_swift_taskGroup_cancelAllChildren(TaskGroup *group) {
|
|
// Because only the owning task of the task group can modify the
|
|
// child list of a task group status record, and it can only do so
|
|
// while holding the owning task's status record lock, we do not need
|
|
// any additional synchronization within this function.
|
|
for (auto childTask: group->getTaskRecord()->children())
|
|
swift_task_cancel(childTask);
|
|
}
|
|
|
|
// =============================================================================
|
|
// ==== addPending -------------------------------------------------------------
|
|
SWIFT_CC(swift)
|
|
static bool swift_taskGroup_addPendingImpl(TaskGroup *group, bool unconditionally) {
|
|
auto assumedStatus = asImpl(group)->statusAddPendingTaskRelaxed(unconditionally);
|
|
return !assumedStatus.isCancelled();
|
|
}
|
|
|
|
#define OVERRIDE_TASK_GROUP COMPATIBILITY_OVERRIDE
|
|
#include COMPATIBILITY_OVERRIDE_INCLUDE_PATH
|