[TaskGroup] Towards ABI stability of groups

This commit is contained in:
Konrad `ktoso` Malawski
2021-03-01 11:55:44 +09:00
parent d26b1f051d
commit aedbbe615d
12 changed files with 642 additions and 493 deletions

View File

@@ -46,6 +46,9 @@ enum {
/// The number of words (in addition to the heap-object header)
/// in a default actor.
NumWords_DefaultActor = 10,
/// The number of words in a task group.
NumWords_TaskGroup = 32,
};
struct InProcess;
@@ -121,6 +124,9 @@ const size_t MaximumAlignment = 16;
/// The alignment of a DefaultActor.
const size_t Alignment_DefaultActor = MaximumAlignment;
/// The alignment of a TaskGroup.
const size_t Alignment_TaskGroup = MaximumAlignment;
/// Flags stored in the value-witness table.
template <typename int_type>
class TargetValueWitnessFlags {

View File

@@ -23,370 +23,23 @@
#include "swift/Runtime/Config.h"
#include "swift/Basic/RelativePointer.h"
#include "swift/Basic/STLExtras.h"
#include "bitset"
#include "string"
#include "queue" // TODO: remove and replace with usage of our mpsc queue
#include <atomic>
#include <assert.h>
namespace swift {
class TaskGroupTaskStatusRecord;
class AsyncTask;
// ==== TaskGroup ------------------------------------------------------------
/// The task group is responsible for maintaining dynamically created child tasks.
class alignas(Alignment_TaskGroup) TaskGroup {
public:
// These constructors do not initialize the actor instance, and the
// destructor does not destroy the actor instance; you must call
// swift_taskGroup_{initialize,destroy} yourself.
constexpr TaskGroup()
: PrivateData{} {}
class TaskGroup {
public:
/// Describes the status of the group.
enum class ReadyStatus : uintptr_t {
/// The task group is empty, no tasks are pending.
/// Return immediately, there is no point in suspending.
///
/// The storage is not accessible.
Empty = 0b00,
void *PrivateData[NumWords_TaskGroup];
/// The future has completed with result (of type \c resultType).
Success = 0b10,
/// The future has completed by throwing an error (an \c Error
/// existential).
Error = 0b11,
};
enum class PollStatus : uintptr_t {
/// The group is known to be empty and we can immediately return nil.
Empty = 0,
/// The task has been enqueued to the groups wait queue.
MustWait = 1,
/// The task has completed with result (of type \c resultType).
Success = 2,
/// The task has completed by throwing an error (an \c Error
/// existential).
Error = 3,
};
/// The result of waiting on the TaskGroup.
struct PollResult {
PollStatus status; // TODO: pack it into storage pointer or not worth it?
/// Storage for the result of the future.
///
/// When the future completed normally, this is a pointer to the storage
/// of the result value, which lives inside the future task itself.
///
/// When the future completed by throwing an error, this is the error
/// object itself.
OpaqueValue *storage;
/// The completed task, if necessary to keep alive until consumed by next().
///
/// # Important: swift_release
/// If if a task is returned here, the task MUST be swift_released
/// once we are done with it, to balance out the retain made before
/// when the task was enqueued into the ready queue to keep it alive
/// until a next() call eventually picks it up.
AsyncTask *retainedTask;
bool isStorageAccessible() {
return status == PollStatus::Success ||
status == PollStatus::Error ||
status == PollStatus::Empty;
}
static PollResult get(AsyncTask *asyncTask, bool hadErrorResult) {
auto fragment = asyncTask->futureFragment();
return PollResult{
/*status*/ hadErrorResult ?
TaskGroup::PollStatus::Error :
TaskGroup::PollStatus::Success,
/*storage*/ hadErrorResult ?
reinterpret_cast<OpaqueValue *>(fragment->getError()) :
fragment->getStoragePtr(),
/*task*/ asyncTask
};
}
};
/// An item within the message queue of a group.
struct ReadyQueueItem {
/// Mask used for the low status bits in a message queue item.
static const uintptr_t statusMask = 0x03;
uintptr_t storage;
ReadyStatus getStatus() const {
return static_cast<ReadyStatus>(storage & statusMask);
}
AsyncTask *getTask() const {
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
}
static ReadyQueueItem get(ReadyStatus status, AsyncTask *task) {
assert(task == nullptr || task->isFuture());
return ReadyQueueItem{
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
}
};
/// An item within the pending queue.
struct PendingQueueItem {
uintptr_t storage;
AsyncTask *getTask() const {
return reinterpret_cast<AsyncTask *>(storage);
}
static ReadyQueueItem get(AsyncTask *task) {
assert(task == nullptr || task->isFuture());
return ReadyQueueItem{ reinterpret_cast<uintptr_t>(task) };
}
};
struct GroupStatus {
static const uint64_t cancelled = 0b1000000000000000000000000000000000000000000000000000000000000000;
static const uint64_t waiting = 0b0100000000000000000000000000000000000000000000000000000000000000;
// 31 bits for ready tasks counter
static const uint64_t maskReady = 0b0011111111111111111111111111111110000000000000000000000000000000;
static const uint64_t oneReadyTask = 0b0000000000000000000000000000000010000000000000000000000000000000;
// 31 bits for pending tasks counter
static const uint64_t maskPending = 0b0000000000000000000000000000000001111111111111111111111111111111;
static const uint64_t onePendingTask = 0b0000000000000000000000000000000000000000000000000000000000000001;
uint64_t status;
bool isCancelled() {
return (status & cancelled) > 0;
}
bool hasWaitingTask() {
return (status & waiting) > 0;
}
unsigned int readyTasks() {
return (status & maskReady) >> 31;
}
unsigned int pendingTasks() {
return (status & maskPending);
}
bool isEmpty() {
return pendingTasks() == 0;
}
/// Status value decrementing the Ready, Pending and Waiting counters by one.
GroupStatus completingPendingReadyWaiting() {
assert(pendingTasks() && "can only complete waiting task when pending tasks available");
assert(readyTasks() && "can only complete waiting task when ready tasks available");
assert(hasWaitingTask() && "can only complete waiting task when waiting task available");
return GroupStatus{status - waiting - oneReadyTask - onePendingTask};
}
GroupStatus completingPendingReady() {
assert(pendingTasks() && "can only complete waiting task when pending tasks available");
assert(readyTasks() && "can only complete waiting task when ready tasks available");
return GroupStatus{status - oneReadyTask - onePendingTask};
}
/// Pretty prints the status, as follows:
/// GroupStatus{ P:{pending tasks} W:{waiting tasks} {binary repr} }
std::string to_string() {
std::string str;
str.append("GroupStatus{ ");
str.append("C:"); // cancelled
str.append(isCancelled() ? "y " : "n ");
str.append("W:"); // has waiting task
str.append(hasWaitingTask() ? "y " : "n ");
str.append("R:"); // ready
str.append(std::to_string(readyTasks()));
str.append(" P:"); // pending
str.append(std::to_string(pendingTasks()));
str.append(" " + std::bitset<64>(status).to_string());
str.append(" }");
return str;
}
/// Initially there are no waiting and no pending tasks.
static const GroupStatus initial() {
return GroupStatus{0};
};
};
template<typename T>
class NaiveQueue {
std::queue<T> queue;
public:
NaiveQueue() = default;
NaiveQueue(const NaiveQueue<T> &) = delete ;
NaiveQueue& operator=(const NaiveQueue<T> &) = delete ;
NaiveQueue(NaiveQueue<T>&& other) {
queue = std::move(other.queue);
}
virtual ~NaiveQueue() { }
bool dequeue(T &output) {
if (queue.empty()) {
return false;
}
output = queue.front();
queue.pop();
return true;
}
void enqueue(const T item) {
queue.push(item);
}
};
private:
// // TODO: move to lockless via the status atomic
mutable std::mutex mutex;
/// Used for queue management, counting number of waiting and ready tasks
std::atomic<uint64_t> status;
/// TaskStatusRecord that is attached to the task running the group.
///
/// Because we must remove it from the task as we exit/destroy the group,
/// we have to keep this pointer here so we know which record to remove then.
TaskGroupTaskStatusRecord* Record;
/// Queue containing completed tasks offered into this group.
///
/// The low bits contain the status, the rest of the pointer is the
/// AsyncTask.
NaiveQueue<ReadyQueueItem> readyQueue;
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
/// or `nullptr` if no task is currently waiting.
std::atomic<AsyncTask*> waitQueue;
friend class AsyncTask;
public:
explicit TaskGroup(TaskGroupTaskStatusRecord* record)
: status(GroupStatus::initial().status),
Record(record),
readyQueue(),
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
waitQueue(nullptr) {}
/// Destroy the storage associated with the group.
void destroy(AsyncTask *task);
bool isEmpty() {
auto oldStatus = GroupStatus { status.load(std::memory_order_relaxed) };
return oldStatus.pendingTasks() == 0;
}
bool isCancelled() {
auto oldStatus = GroupStatus { status.load(std::memory_order_relaxed) };
return oldStatus.isCancelled();
}
TaskGroupTaskStatusRecord* getTaskRecord() const {
return Record;
}
/// Cancel the task group and all tasks within it.
///
/// Returns `true` if this is the first time cancelling the group, false otherwise.
bool cancelAll(AsyncTask *task);
GroupStatus statusCancel() {
auto old = status.fetch_or(GroupStatus::cancelled, std::memory_order_relaxed);
return GroupStatus { old };
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusMarkWaitingAssumeAcquire() {
auto old = status.fetch_or(GroupStatus::waiting, std::memory_order_acquire);
return GroupStatus{old | GroupStatus::waiting};
}
GroupStatus statusRemoveWaiting() {
auto old = status.fetch_and(~GroupStatus::waiting, std::memory_order_release);
return GroupStatus{old};
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddReadyAssumeAcquire() {
auto old = status.fetch_add(GroupStatus::oneReadyTask, std::memory_order_acquire);
auto s = GroupStatus {old + GroupStatus::oneReadyTask };
assert(s.readyTasks() <= s.pendingTasks());
return s;
}
/// Add a single pending task to the status counter.
/// This is used to implement next() properly, as we need to know if there
/// are pending tasks worth suspending/waiting for or not.
///
/// Note that the group does *not* store child tasks at all, as they are
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
/// is currently executing the group. Here we only need the counts of
/// pending/ready tasks.
///
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddPendingTaskRelaxed() {
auto old = status.fetch_add(GroupStatus::onePendingTask, std::memory_order_relaxed);
auto s = GroupStatus {old + GroupStatus::onePendingTask };
if (s.isCancelled()) {
// revert that add, it was meaningless
auto o = status.fetch_sub(GroupStatus::onePendingTask, std::memory_order_relaxed);
s = GroupStatus {o - GroupStatus::onePendingTask };
}
return s;
}
GroupStatus statusLoadRelaxed() {
return GroupStatus{status.load(std::memory_order_relaxed)};
}
/// Compare-and-set old status to a status derived from the old one,
/// by simultaneously decrementing one Pending and one Waiting tasks.
///
/// This is used to atomically perform a waiting task completion.
bool statusCompletePendingReadyWaiting(GroupStatus& old) {
return status.compare_exchange_weak(
old.status, old.completingPendingReadyWaiting().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_relaxed);
}
bool statusCompletePendingReady(GroupStatus& old) {
return status.compare_exchange_weak(
old.status, old.completingPendingReady().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_relaxed);
}
/// Offer result of a task into this task group.
///
/// If possible, and an existing task is already waiting on next(), this will
/// schedule it immediately. If not, the result is enqueued and will be picked
/// up whenever a task calls next() the next time.
void offer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
/// Attempt to dequeue ready tasks and complete the waitingTask.
///
/// If unable to complete the waiting task immediately (with an readily
/// available completed task), either returns an `PollStatus::Empty`
/// result if it is known that no pending tasks in the group,
/// or a `PollStatus::MustWait` result if there are tasks in flight
/// and the waitingTask eventually be woken up by a completion.
TaskGroup::PollResult poll(AsyncTask *waitingTask);
};
/// Upon a future task's completion, offer it to the task group it belongs to.
void offer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
};
} // end namespace swift

View File

@@ -164,6 +164,8 @@ public:
/// A status record which states that a task has a task group.
///
/// A record always is a specific `TaskGroupImpl`.
///
/// The child tasks are stored as an invasive single-linked list, starting
/// from `FirstChild` and continuing through the `NextChild` pointers of all
/// the linked children.
@@ -177,21 +179,18 @@ public:
/// Group child tasks DO NOT have their own `ChildTaskStatusRecord` entries,
/// and are only tracked by their respective `TaskGroupTaskStatusRecord`.
class TaskGroupTaskStatusRecord : public TaskStatusRecord {
TaskGroup *Group;
AsyncTask *FirstChild;
public:
TaskGroupTaskStatusRecord(TaskGroup *group)
TaskGroupTaskStatusRecord()
: TaskStatusRecord(TaskStatusRecordKind::TaskGroup),
Group(group),
FirstChild(nullptr) {}
TaskGroupTaskStatusRecord(TaskGroup *group, AsyncTask *child)
TaskGroupTaskStatusRecord(AsyncTask *child)
: TaskStatusRecord(TaskStatusRecordKind::TaskGroup),
Group(group),
FirstChild(child) {}
TaskGroup* getGroup() const {
return Group;
TaskGroup* getGroup() {
return reinterpret_cast<TaskGroup *>(this);
}
/// Return the first child linked by this record. This may be null;
@@ -205,7 +204,7 @@ public:
void attachChild(AsyncTask *child) {
assert(child->groupChildFragment());
assert(child->hasGroupChildFragment());
assert(child->groupChildFragment()->getGroup() == Group);
assert(child->groupChildFragment()->getGroup() == getGroup());
if (!FirstChild) {
// This is the first child we ever attach, so store it as FirstChild.
@@ -214,12 +213,10 @@ public:
}
// We need to traverse the siblings to find the last one and add the child there.
// FIXME: just set prepend to the current head, no need to traverse.
auto cur = FirstChild;
auto i = 0;
while (cur) {
i++;
// no need to check hasChildFragment, all tasks we store here have them.
auto fragment = cur->childFragment();
if (auto next = fragment->getNextChild()) {

View File

@@ -170,80 +170,105 @@ using TaskGroupFutureWaitThrowingSignature =
/// This can be called from any thread. Its Swift signature is
///
/// \code
/// func swift_task_group_wait_next_throwing(
/// func swift_taskGroup_wait_next_throwing(
/// waitingTask: Builtin.NativeObject, // current task
/// group: UnsafeRawPointer,
/// group: UnsafeRawPointer
/// ) async -> T
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
TaskGroupFutureWaitThrowingSignature::FunctionType
swift_task_group_wait_next_throwing;
swift_taskGroup_wait_next_throwing;
/// Create a new `TaskGroup` using the task's allocator.
/// Create a new `TaskGroup`.
/// The caller is responsible for retaining and managing the group's lifecycle.
///
/// Its Swift signature is
///
/// \code
/// func swift_task_group_create(
/// func swift_taskGroup_create(
/// _ task: Builtin.NativeObject
/// ) -> Builtin.NativeObject
/// ) -> Builtin.RawPointer
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
swift::TaskGroup* swift_task_group_create(AsyncTask *task);
TaskGroup* swift_taskGroup_create(AsyncTask *task); // TODO: probably remove this call, and just use the initialize always
/// Attach a child task to the parent task's task group record.
/// Initialize a `TaskGroup` in the passed `group` memory location.
/// The caller is responsible for retaining and managing the group's lifecycle.
///
/// Its Swift signature is
///
/// \code
/// func swift_task_group_attachChild(
/// group: UnsafeRawPointer,
/// func swift_taskGroup_initialize(
/// _ task: Builtin.NativeObject,
/// group: Builtin.RawPointer,
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_taskGroup_initialize(AsyncTask *task, TaskGroup *group);
/// Attach a child task to the parent task's task group record.
///
/// This function MUST be called from the AsyncTask running the task group.
///
/// Since the group (or rather, its record) is inserted in the parent task at
/// creation we do not need the parent task here, the group already is attached
/// to it.
/// Its Swift signature is
///
/// \code
/// func swift_taskGroup_attachChild(
/// group: Builtin.RawPointer,
/// parent: Builtin.NativeObject,
/// child: Builtin.NativeObject
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_group_attachChild(TaskGroup *group,
AsyncTask *parent, AsyncTask *child);
void swift_taskGroup_attachChild(TaskGroup *group, AsyncTask *child);
/// Its Swift signature is
///
/// This function MUST be called from the AsyncTask running the task group.
///
/// \code
/// func swift_task_group_destroy(
/// func swift_taskGroup_destroy(
/// _ task: Builtin.NativeObject,
/// _ group: UnsafeRawPointer
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_group_destroy(AsyncTask *task, TaskGroup *group);
void swift_taskGroup_destroy(AsyncTask *task, TaskGroup *group);
/// Before starting a task group child task, inform the group that there is one
/// more 'pending' child to account for.
///
/// This can be called from any thread. Its Swift signature is
/// This function SHOULD be called from the AsyncTask running the task group,
/// however is generally thread-safe as it only only works with the group status.
///
/// Its Swift signature is
///
/// \code
/// func swift_task_group_add_pending(
/// group: UnsafeRawPointer
/// func swift_taskGroup_addPending(
/// group: Builtin.RawPointer
/// ) -> Bool
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_group_add_pending(TaskGroup *group);
bool swift_taskGroup_addPending(TaskGroup *group);
/// Cancel all tasks in the group.
/// This also prevents new tasks from being added.
///
/// This can be called from any thread. Its Swift signature is
/// This can be called from any thread.
///
/// Its Swift signature is
///
/// \code
/// func swift_task_group_cancel_all(
/// func swift_taskGroup_cancelAll(
/// task: Builtin.NativeObject,
/// group: UnsafeRawPointer
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_group_cancel_all(AsyncTask *task, TaskGroup *group);
void swift_taskGroup_cancelAll(AsyncTask *task, TaskGroup *group);
/// Check ONLY if the group was explicitly cancelled, e.g. by `cancelAll`.
///
@@ -253,25 +278,25 @@ void swift_task_group_cancel_all(AsyncTask *task, TaskGroup *group);
/// This can be called from any thread. Its Swift signature is
///
/// \code
/// func swift_task_group_is_cancelled(
/// func swift_taskGroup_isCancelled(
/// task: Builtin.NativeObject,
/// group: UnsafeRawPointer
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_group_is_cancelled(AsyncTask *task, TaskGroup *group);
bool swift_taskGroup_isCancelled(AsyncTask *task, TaskGroup *group);
/// Check the readyQueue of a task group, return true if it has no pending tasks.
///
/// This can be called from any thread. Its Swift signature is
///
/// \code
/// func swift_task_group_is_empty(
/// _ group: UnsafeRawPointer
/// func swift_taskGroup_isEmpty(
/// _ group: Builtin.RawPointer
/// ) -> Bool
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_group_is_empty(TaskGroup *group);
bool swift_taskGroup_isEmpty(TaskGroup *group);
/// Add a status record to a task. The record should not be
/// modified while it is registered with a task.

View File

@@ -1564,6 +1564,22 @@ FUNCTION(DefaultActorDestroy,
ARGS(RefCountedPtrTy),
ATTRS(NoUnwind))
//// void swift_taskGroup_initialize(AsyncTask *task, TaskGroup *group);
//FUNCTION(TaskGroupInitialize,
// swift_taskGroup_initialize, SwiftCC,
// ConcurrencyAvailability,
// RETURNS(VoidTy),
// ARGS(RefCountedPtrTy, RefCountedPtrTy),
// ATTRS(NoUnwind))
//
//// void swift_taskGroup_destroy(AsyncTask *task, TaskGroup *group);
//FUNCTION(TaskGroupDestroy,
// swift_taskGroup_destroy, SwiftCC,
// ConcurrencyAvailability,
// RETURNS(VoidTy),
// ARGS(RefCountedPtrTy, RefCountedPtrTy),
// ATTRS(NoUnwind))
// AutoDiffLinearMapContext *swift_autoDiffCreateLinearMapContext(size_t);
FUNCTION(AutoDiffCreateLinearMapContext,
swift_autoDiffCreateLinearMapContext, SwiftCC,

View File

@@ -2399,7 +2399,7 @@ static FunctionPointer::Kind classifyFunctionPointerKind(SILFunction *fn) {
return SpecialKind::TaskFutureWait;
if (name.equals("swift_task_future_wait_throwing"))
return SpecialKind::TaskFutureWaitThrowing;
if (name.equals("swift_task_group_wait_next_throwing"))
if (name.equals("swift_taskGroup_wait_next_throwing"))
return SpecialKind::TaskGroupWaitNext;
}

View File

@@ -20,6 +20,7 @@
#include "swift/ABI/Metadata.h"
#include "swift/Runtime/Mutex.h"
#include "swift/Runtime/HeapObject.h"
#include "TaskGroupPrivate.h"
#include "TaskPrivate.h"
#include "AsyncCall.h"
#include "Debug.h"

View File

@@ -15,15 +15,24 @@
//===----------------------------------------------------------------------===//
#include "swift/ABI/TaskGroup.h"
#include "swift/Runtime/Concurrency.h"
#include "swift/ABI/Task.h"
#include "swift/ABI/Metadata.h"
#include "swift/ABI/HeapObject.h"
#include "TaskPrivate.h"
#include "TaskGroupPrivate.h"
#include "swift/Basic/RelativePointer.h"
#include "swift/Basic/STLExtras.h"
#include "swift/Runtime/Concurrency.h"
#include "swift/Runtime/Config.h"
#include "swift/Runtime/Mutex.h"
#include "swift/Runtime/HeapObject.h"
#include "TaskPrivate.h"
#include "AsyncCall.h"
#include "Debug.h"
#include "bitset"
#include "string"
#include "queue" // TODO: remove and replace with usage of our mpsc queue
#include <atomic>
#include <assert.h>
#include <dispatch/dispatch.h>
#if !defined(_WIN32)
@@ -31,60 +40,461 @@
#endif
using namespace swift;
/******************************************************************************/
/*************************** TASK GROUP ***************************************/
/******************************************************************************/
using FutureFragment = AsyncTask::FutureFragment;
using ReadyQueueItem = TaskGroup::ReadyQueueItem;
using ReadyStatus = TaskGroup::ReadyStatus;
using PollResult = TaskGroup::PollResult;
namespace {
class TaskStatusRecord;
class TaskGroupImpl: public TaskGroupTaskStatusRecord {
public:
/// Describes the status of the group.
enum class ReadyStatus : uintptr_t {
/// The task group is empty, no tasks are pending.
/// Return immediately, there is no point in suspending.
///
/// The storage is not accessible.
Empty = 0b00,
// not used: 0b01; same value as the PollStatus MustWait,
// which does not make sense for the ReadyStatus
/// The future has completed with result (of type \c resultType).
Success = 0b10,
/// The future has completed by throwing an error (an \c Error
/// existential).
Error = 0b11,
};
enum class PollStatus : uintptr_t {
/// The group is known to be empty and we can immediately return nil.
Empty = 0b00,
/// The task has been enqueued to the groups wait queue.
MustWait = 0b01,
/// The task has completed with result (of type \c resultType).
Success = 0b10,
/// The task has completed by throwing an error (an \c Error existential).
Error = 0b11,
};
/// The result of waiting on the TaskGroupImpl.
struct PollResult {
PollStatus status; // TODO: pack it into storage pointer or not worth it?
/// Storage for the result of the future.
///
/// When the future completed normally, this is a pointer to the storage
/// of the result value, which lives inside the future task itself.
///
/// When the future completed by throwing an error, this is the error
/// object itself.
OpaqueValue *storage;
/// The completed task, if necessary to keep alive until consumed by next().
///
/// # Important: swift_release
/// If if a task is returned here, the task MUST be swift_released
/// once we are done with it, to balance out the retain made before
/// when the task was enqueued into the ready queue to keep it alive
/// until a next() call eventually picks it up.
AsyncTask *retainedTask;
bool isStorageAccessible() {
return status == PollStatus::Success ||
status == PollStatus::Error ||
status == PollStatus::Empty;
}
static PollResult get(AsyncTask *asyncTask, bool hadErrorResult) {
auto fragment = asyncTask->futureFragment();
return PollResult{
/*status*/ hadErrorResult ?
PollStatus::Error :
PollStatus::Success,
/*storage*/ hadErrorResult ?
reinterpret_cast<OpaqueValue *>(fragment->getError()) :
fragment->getStoragePtr(),
/*task*/ asyncTask
};
}
};
/// An item within the message queue of a group.
struct ReadyQueueItem {
/// Mask used for the low status bits in a message queue item.
static const uintptr_t statusMask = 0x03;
uintptr_t storage;
ReadyStatus getStatus() const {
return static_cast<ReadyStatus>(storage & statusMask);
}
AsyncTask *getTask() const {
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
}
static ReadyQueueItem get(ReadyStatus status, AsyncTask *task) {
assert(task == nullptr || task->isFuture());
return ReadyQueueItem{
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
}
};
/// An item within the pending queue.
struct PendingQueueItem {
uintptr_t storage;
AsyncTask *getTask() const {
return reinterpret_cast<AsyncTask *>(storage);
}
static ReadyQueueItem get(AsyncTask *task) {
assert(task == nullptr || task->isFuture());
return ReadyQueueItem{reinterpret_cast<uintptr_t>(task)};
}
};
struct GroupStatus {
static const uint64_t cancelled = 0b1000000000000000000000000000000000000000000000000000000000000000;
static const uint64_t waiting = 0b0100000000000000000000000000000000000000000000000000000000000000;
// 31 bits for ready tasks counter
static const uint64_t maskReady = 0b0011111111111111111111111111111110000000000000000000000000000000;
static const uint64_t oneReadyTask = 0b0000000000000000000000000000000010000000000000000000000000000000;
// 31 bits for pending tasks counter
static const uint64_t maskPending = 0b0000000000000000000000000000000001111111111111111111111111111111;
static const uint64_t onePendingTask = 0b0000000000000000000000000000000000000000000000000000000000000001;
uint64_t status;
bool isCancelled() {
return (status & cancelled) > 0;
}
bool hasWaitingTask() {
return (status & waiting) > 0;
}
unsigned int readyTasks() {
return (status & maskReady) >> 31;
}
unsigned int pendingTasks() {
return (status & maskPending);
}
bool isEmpty() {
return pendingTasks() == 0;
}
/// Status value decrementing the Ready, Pending and Waiting counters by one.
GroupStatus completingPendingReadyWaiting() {
assert(pendingTasks() &&
"can only complete waiting task when pending tasks available");
assert(readyTasks() &&
"can only complete waiting task when ready tasks available");
assert(hasWaitingTask() &&
"can only complete waiting task when waiting task available");
return GroupStatus{status - waiting - oneReadyTask - onePendingTask};
}
GroupStatus completingPendingReady() {
assert(pendingTasks() &&
"can only complete waiting task when pending tasks available");
assert(readyTasks() &&
"can only complete waiting task when ready tasks available");
return GroupStatus{status - oneReadyTask - onePendingTask};
}
/// Pretty prints the status, as follows:
/// GroupStatus{ P:{pending tasks} W:{waiting tasks} {binary repr} }
std::string to_string() {
std::string str;
str.append("GroupStatus{ ");
str.append("C:"); // cancelled
str.append(isCancelled() ? "y " : "n ");
str.append("W:"); // has waiting task
str.append(hasWaitingTask() ? "y " : "n ");
str.append("R:"); // ready
str.append(std::to_string(readyTasks()));
str.append(" P:"); // pending
str.append(std::to_string(pendingTasks()));
str.append(" " + std::bitset<64>(status).to_string());
str.append(" }");
return str;
}
/// Initially there are no waiting and no pending tasks.
static const GroupStatus initial() {
return GroupStatus{0};
};
};
template<typename T>
class NaiveQueue {
std::queue <T> queue;
public:
NaiveQueue() = default;
NaiveQueue(const NaiveQueue<T> &) = delete;
NaiveQueue &operator=(const NaiveQueue<T> &) = delete;
NaiveQueue(NaiveQueue<T> &&other) {
queue = std::move(other.queue);
}
virtual ~NaiveQueue() {}
bool dequeue(T &output) {
if (queue.empty()) {
return false;
}
output = queue.front();
queue.pop();
return true;
}
void enqueue(const T item) {
queue.push(item);
}
};
private:
// // TODO: move to lockless via the status atomic
mutable std::mutex mutex;
/// Used for queue management, counting number of waiting and ready tasks
std::atomic <uint64_t> status;
/// Queue containing completed tasks offered into this group.
///
/// The low bits contain the status, the rest of the pointer is the
/// AsyncTask.
NaiveQueue<ReadyQueueItem> readyQueue;
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
/// or `nullptr` if no task is currently waiting.
std::atomic<AsyncTask *> waitQueue;
friend class AsyncTask;
public:
explicit TaskGroupImpl()
: TaskGroupTaskStatusRecord(),
status(GroupStatus::initial().status),
readyQueue(),
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
waitQueue(nullptr) {}
TaskGroupTaskStatusRecord *getTaskRecord() {
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
}
/// Destroy the storage associated with the group.
void destroy(AsyncTask *task);
bool isEmpty() {
auto oldStatus = GroupStatus{status.load(std::memory_order_relaxed)};
return oldStatus.pendingTasks() == 0;
}
bool isCancelled() {
auto oldStatus = GroupStatus{status.load(std::memory_order_relaxed)};
return oldStatus.isCancelled();
}
/// Cancel the task group and all tasks within it.
///
/// Returns `true` if this is the first time cancelling the group, false otherwise.
bool cancelAll(AsyncTask *task);
GroupStatus statusCancel() {
auto old = status.fetch_or(GroupStatus::cancelled,
std::memory_order_relaxed);
return GroupStatus{old};
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusMarkWaitingAssumeAcquire() {
auto old = status.fetch_or(GroupStatus::waiting, std::memory_order_acquire);
return GroupStatus{old | GroupStatus::waiting};
}
GroupStatus statusRemoveWaiting() {
auto old = status.fetch_and(~GroupStatus::waiting,
std::memory_order_release);
return GroupStatus{old};
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddReadyAssumeAcquire() {
auto old = status.fetch_add(GroupStatus::oneReadyTask,
std::memory_order_acquire);
auto s = GroupStatus{old + GroupStatus::oneReadyTask};
assert(s.readyTasks() <= s.pendingTasks());
return s;
}
/// Add a single pending task to the status counter.
/// This is used to implement next() properly, as we need to know if there
/// are pending tasks worth suspending/waiting for or not.
///
/// Note that the group does *not* store child tasks at all, as they are
/// stored in the `TaskGroupTaskStatusRecord` inside the current task, that
/// is currently executing the group. Here we only need the counts of
/// pending/ready tasks.
///
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddPendingTaskRelaxed() {
auto old = status.fetch_add(GroupStatus::onePendingTask,
std::memory_order_relaxed);
auto s = GroupStatus{old + GroupStatus::onePendingTask};
if (s.isCancelled()) {
// revert that add, it was meaningless
auto o = status.fetch_sub(GroupStatus::onePendingTask,
std::memory_order_relaxed);
s = GroupStatus{o - GroupStatus::onePendingTask};
}
return s;
}
GroupStatus statusLoadRelaxed() {
return GroupStatus{status.load(std::memory_order_relaxed)};
}
/// Compare-and-set old status to a status derived from the old one,
/// by simultaneously decrementing one Pending and one Waiting tasks.
///
/// This is used to atomically perform a waiting task completion.
bool statusCompletePendingReadyWaiting(GroupStatus &old) {
return status.compare_exchange_weak(
old.status, old.completingPendingReadyWaiting().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_relaxed);
}
bool statusCompletePendingReady(GroupStatus &old) {
return status.compare_exchange_weak(
old.status, old.completingPendingReady().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_relaxed);
}
/// Offer result of a task into this task group.
///
/// If possible, and an existing task is already waiting on next(), this will
/// schedule it immediately. If not, the result is enqueued and will be picked
/// up whenever a task calls next() the next time.
void offer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
/// Attempt to dequeue ready tasks and complete the waitingTask.
///
/// If unable to complete the waiting task immediately (with an readily
/// available completed task), either returns an `PollStatus::Empty`
/// result if it is known that no pending tasks in the group,
/// or a `PollStatus::MustWait` result if there are tasks in flight
/// and the waitingTask eventually be woken up by a completion.
PollResult poll(AsyncTask *waitingTask);
};
} // end anonymous namespace
/******************************************************************************/
/************************ TASK GROUP IMPLEMENTATION ***************************/
/******************************************************************************/
using ReadyQueueItem = TaskGroupImpl::ReadyQueueItem;
using ReadyStatus = TaskGroupImpl::ReadyStatus;
using PollResult = TaskGroupImpl::PollResult;
using PollStatus = TaskGroupImpl::PollStatus;
static_assert(sizeof(TaskGroupImpl) <= sizeof(TaskGroup) &&
alignof(TaskGroupImpl) <= alignof(TaskGroup),
"TaskGroupImpl doesn't fit in TaskGroup");
static TaskGroupImpl *asImpl(TaskGroup *group) {
return reinterpret_cast<TaskGroupImpl*>(group);
}
static TaskGroup *asAbstract(TaskGroupImpl *group) {
return reinterpret_cast<TaskGroup*>(group);
}
// =============================================================================
// ==== initialize -------------------------------------------------------------
// Initializes into the preallocated _group an actual TaskGroupImpl.
void swift::swift_taskGroup_initialize(AsyncTask *task, TaskGroup *group) {
// // nasty trick, but we want to keep the record inside the group as we'll need
// // to remove it from the task as the group is destroyed, as well as interact
// // with it every time we add child tasks; so it is useful to pre-create it here
// // and store it in the group.
// //
// // The record won't be used by anyone until we're done constructing and setting
// // up the group anyway.
// void *recordAllocation = swift_task_alloc(task, sizeof(TaskGroupTaskStatusRecord));
// auto record = new (recordAllocation)
// TaskGroupTaskStatusRecord(reinterpret_cast<TaskGroupImpl*>(_group));
// TODO: this becomes less weird once we make the fragment BE the group
TaskGroupImpl *impl = new (group) TaskGroupImpl();
auto record = impl->getTaskRecord();
assert(impl == record && "the group IS the task record");
// ok, now that the group actually is initialized: attach it to the task
swift_task_addStatusRecord(task, record);
}
// =============================================================================
// ==== create -----------------------------------------------------------------
TaskGroup* swift::swift_task_group_create(AsyncTask *task) {
TaskGroup* swift::swift_taskGroup_create(AsyncTask *task) {
// TODO: John suggested we should rather create from a builtin, which would allow us to optimize allocations even more?
void *allocation = swift_task_alloc(task, sizeof(TaskGroup));
// nasty trick, but we want to keep the record inside the group as we'll need
// to remove it from the task as the group is destroyed, as well as interact
// with it every time we add child tasks; so it is useful to pre-create it here
// and store it in the group.
//
// The record won't be used by anyone until we're done constructing and setting
// up the group anyway.
void *recordAllocation = swift_task_alloc(task, sizeof(TaskGroupTaskStatusRecord));
auto record = new (recordAllocation)
TaskGroupTaskStatusRecord(reinterpret_cast<TaskGroup*>(allocation));
TaskGroup *group = new (allocation) TaskGroup(record);
// ok, now that the group actually is initialized: attach it to the task
swift_task_addStatusRecord(task, record);
auto group = reinterpret_cast<TaskGroup *>(allocation);
swift_taskGroup_initialize(task, group);
return group;
}
// =============================================================================
// ==== add / attachChild -----------------------------------------------------------------
// ==== add / attachChild ------------------------------------------------------
void swift::swift_task_group_attachChild(TaskGroup *group,
AsyncTask *parent, AsyncTask *child) {
auto groupRecord = group->getTaskRecord();
assert(groupRecord->getGroup() == group);
void swift::swift_taskGroup_attachChild(TaskGroup *group, AsyncTask *child) {
auto groupRecord = asImpl(group)->getTaskRecord();
return groupRecord->attachChild(child);
}
// =============================================================================
// ==== destroy ----------------------------------------------------------------
void swift::swift_task_group_destroy(AsyncTask *task, TaskGroup *group) {
group->destroy(task);
void swift::swift_taskGroup_destroy(AsyncTask *task, TaskGroup *group) {
asImpl(group)->destroy(task);
}
void TaskGroup::destroy(AsyncTask *task) {
void TaskGroupImpl::destroy(AsyncTask *task) {
// First, remove the group from the task and deallocate the record
swift_task_removeStatusRecord(task, Record);
swift_task_dealloc(task, Record);
swift_task_removeStatusRecord(task, getTaskRecord());
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
mutex.lock(); // TODO: remove lock, and use status for synchronization
// Release all ready tasks which are kept retained, the group destroyed,
// so no other task will ever await on them anymore;
ReadyQueueItem item;
@@ -102,19 +512,24 @@ void TaskGroup::destroy(AsyncTask *task) {
// =============================================================================
// ==== offer ------------------------------------------------------------------
void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
ExecutorRef completingExecutor) {
asImpl(this)->offer(completedTask, context, completingExecutor);
}
static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
TaskGroup::PollResult result) {
PollResult result) {
/// Fill in the result value
switch (result.status) {
case TaskGroup::PollStatus::MustWait:
case PollStatus::MustWait:
assert(false && "filling a waiting status?");
return;
case TaskGroup::PollStatus::Error:
case PollStatus::Error:
context->fillWithError(reinterpret_cast<SwiftError*>(result.storage));
return;
case TaskGroup::PollStatus::Success: {
case PollStatus::Success: {
// Initialize the result as an Optional<Success>.
const Metadata *successType = context->successType;
OpaqueValue *destPtr = context->successResultPointer;
@@ -126,7 +541,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
return;
}
case TaskGroup::PollStatus::Empty: {
case PollStatus::Empty: {
// Initialize the result as a nil Optional<Success>.
const Metadata *successType = context->successType;
OpaqueValue *destPtr = context->successResultPointer;
@@ -136,13 +551,13 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
}
}
void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
ExecutorRef completingExecutor) {
void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context,
ExecutorRef completingExecutor) {
assert(completedTask);
assert(completedTask->isFuture());
assert(completedTask->hasChildFragment());
assert(completedTask->hasGroupChildFragment());
assert(completedTask->groupChildFragment()->getGroup() == this);
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
// We retain the completed task, because we will either:
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
@@ -192,8 +607,8 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
auto waitingContext =
static_cast<TaskFutureWaitAsyncContext *>(
waitingTask->ResumeContext);
static_cast<TaskFutureWaitAsyncContext *>(
waitingTask->ResumeContext);
fillGroupNextResult(waitingContext, result);
// TODO: allow the caller to suggest an executor
@@ -229,7 +644,7 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
// ==== group.next() implementation (wait_next and groupPoll) ------------------
SWIFT_CC(swiftasync)
void swift::swift_task_group_wait_next_throwing(
void swift::swift_taskGroup_wait_next_throwing(
AsyncTask *waitingTask,
ExecutorRef executor,
SWIFT_ASYNC_CONTEXT AsyncContext *rawContext) {
@@ -238,26 +653,26 @@ void swift::swift_task_group_wait_next_throwing(
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
auto task = context->task;
auto group = context->group;
auto group = asImpl(context->group);
assert(waitingTask == task && "attempted to wait on group.next() from other task, which is illegal!");
assert(group && "swift_task_group_wait_next_throwing was passed context without group!");
assert(group && "swift_taskGroup_wait_next_throwing was passed context without group!");
TaskGroup::PollResult polled = group->poll(waitingTask);
PollResult polled = group->poll(waitingTask);
switch (polled.status) {
case TaskGroup::PollStatus::MustWait:
case PollStatus::MustWait:
// The waiting task has been queued on the channel,
// there were pending tasks so it will be woken up eventually.
return;
case TaskGroup::PollStatus::Empty:
case TaskGroup::PollStatus::Error:
case TaskGroup::PollStatus::Success:
case PollStatus::Empty:
case PollStatus::Error:
case PollStatus::Success:
fillGroupNextResult(context, polled);
return waitingTask->runInFullyEstablishedContext(executor);
}
}
TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
mutex.lock(); // TODO: remove group lock, and use status for synchronization
auto assumed = statusMarkWaitingAssumeAcquire();
@@ -271,7 +686,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
// was issued, and if we parked here we'd potentially never be woken up.
// Bail out and return `nil` from `group.next()`.
statusRemoveWaiting();
result.status = TaskGroup::PollStatus::Empty;
result.status = PollStatus::Empty;
mutex.unlock(); // TODO: remove group lock, and use status for synchronization
return result;
}
@@ -281,7 +696,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
// ==== 2) Ready task was polled, return with it immediately -----------------
if (assumed.readyTasks()) {
auto assumedStatus = assumed.status;
auto newStatus = TaskGroup::GroupStatus{assumedStatus};
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
if (status.compare_exchange_weak(
assumedStatus, newStatus.completingPendingReadyWaiting().status,
/*success*/ std::memory_order_relaxed,
@@ -291,7 +706,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
ReadyQueueItem item;
bool taskDequeued = readyQueue.dequeue(item);
if (!taskDequeued) {
result.status = TaskGroup::PollStatus::MustWait;
result.status = PollStatus::MustWait;
result.storage = nullptr;
result.retainedTask = nullptr;
mutex.unlock(); // TODO: remove group lock, and use status for synchronization
@@ -308,7 +723,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
switch (item.getStatus()) {
case ReadyStatus::Success:
// Immediately return the polled value
result.status = TaskGroup::PollStatus::Success;
result.status = PollStatus::Success;
result.storage = futureFragment->getStoragePtr();
assert(result.retainedTask && "polled a task, it must be not null");
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
@@ -316,7 +731,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
case ReadyStatus::Error:
// Immediately return the polled value
result.status = TaskGroup::PollStatus::Error;
result.status = PollStatus::Error;
result.storage =
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
assert(result.retainedTask && "polled a task, it must be not null");
@@ -324,7 +739,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
return result;
case ReadyStatus::Empty:
result.status = TaskGroup::PollStatus::Empty;
result.status = PollStatus::Empty;
result.storage = nullptr;
result.retainedTask = nullptr;
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
@@ -344,7 +759,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
/*failure*/ std::memory_order_acquire)) {
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
// no ready tasks, so we must wait.
result.status = TaskGroup::PollStatus::MustWait;
result.status = PollStatus::MustWait;
return result;
} // else, try again
}
@@ -354,25 +769,25 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
// =============================================================================
// ==== isEmpty ----------------------------------------------------------------
bool swift::swift_task_group_is_empty(TaskGroup *group) {
return group->isEmpty();
bool swift::swift_taskGroup_isEmpty(TaskGroup *group) {
return asImpl(group)->isEmpty();
}
// =============================================================================
// ==== isCancelled ------------------------------------------------------------
bool swift::swift_task_group_is_cancelled(AsyncTask *task, TaskGroup *group) {
return group->isCancelled();
bool swift::swift_taskGroup_isCancelled(AsyncTask *task, TaskGroup *group) {
return asImpl(group)->isCancelled();
}
// =============================================================================
// ==== cancelAll --------------------------------------------------------------
void swift::swift_task_group_cancel_all(AsyncTask *task, TaskGroup *group) {
group->cancelAll(task);
void swift::swift_taskGroup_cancelAll(AsyncTask *task, TaskGroup *group) {
asImpl(group)->cancelAll(task);
}
bool TaskGroup::cancelAll(AsyncTask *task) {
bool TaskGroupImpl::cancelAll(AsyncTask *task) {
// store the cancelled bit
auto old = statusCancel();
if (old.isCancelled()) {
@@ -381,15 +796,14 @@ bool TaskGroup::cancelAll(AsyncTask *task) {
}
// cancel all existing tasks within the group
swift_task_cancel_group_child_tasks(task, this);
swift_task_cancel_group_child_tasks(task, asAbstract(this));
return true;
}
// =============================================================================
// ==== internal ---------------------------------------------------------------
// ==== addPending -------------------------------------------------------------
bool swift::swift_task_group_add_pending(TaskGroup *group) {
return !group->statusAddPendingTaskRelaxed().isCancelled();
bool swift::swift_taskGroup_addPending(TaskGroup *group) {
auto assumedStatus = asImpl(group)->statusAddPendingTaskRelaxed();
return !assumedStatus.isCancelled();
}

View File

@@ -146,7 +146,7 @@ extension Task {
flags.bits, _task, _group, operation)
// Attach it to the group's task record in the current task.
_taskGroupAttachChild(group: _group, parent: _task, child: childTask)
_taskGroupAttachChild(group: _group, child: childTask)
// Enqueue the resulting job.
_enqueueJobGlobal(Builtin.convertTaskToJob(childTask))
@@ -242,14 +242,20 @@ extension Task {
/// This function may be called even from within child (or any other) tasks,
/// and will reliably cause the group to become cancelled.
///
/// - SeeAlso: `Task.addCancellationHandler`
/// - SeeAlso: `Task.checkCancelled`
/// - SeeAlso: `Task.isCancelled`
/// - SeeAlso: `TaskGroup.isCancelled`
public func cancelAll() {
_taskGroupCancelAll(task: _task, group: _group)
}
// Returns `true` if the group was cancelled, e.g. by `cancelAll`.
/// Returns `true` if the group was cancelled, e.g. by `cancelAll`.
///
/// If the task currently running this group was cancelled, the group will
/// also be implicitly cancelled, which will be reflected in the return
/// value of this function as well.
///
/// - Returns: `true` if the group (or its parent task) was cancelled,
/// `false` otherwise.
public var isCancelled: Bool {
return _taskIsCancelled(_task) ||
_taskGroupIsCancelled(task: _task, group: _group)
@@ -335,31 +341,30 @@ func _swiftRelease(
_ object: Builtin.NativeObject
)
@_silgen_name("swift_task_group_create")
@_silgen_name("swift_taskGroup_create")
func _taskGroupCreate(
task: Builtin.NativeObject
) -> Builtin.RawPointer
/// Attach task group child to the group group to the task.
@_silgen_name("swift_task_group_attachChild")
@_silgen_name("swift_taskGroup_attachChild")
func _taskGroupAttachChild(
group: Builtin.RawPointer,
parent: Builtin.NativeObject,
child: Builtin.NativeObject
) -> UnsafeRawPointer /*ChildTaskStatusRecord*/
@_silgen_name("swift_task_group_destroy")
@_silgen_name("swift_taskGroup_destroy")
func _taskGroupDestroy(
task: Builtin.NativeObject,
group: __owned Builtin.RawPointer
)
@_silgen_name("swift_task_group_add_pending")
@_silgen_name("swift_taskGroup_addPending")
func _taskGroupAddPendingTask(
group: Builtin.RawPointer
) -> Bool
@_silgen_name("swift_task_group_cancel_all")
@_silgen_name("swift_taskGroup_cancelAll")
func _taskGroupCancelAll(
task: Builtin.NativeObject,
group: Builtin.RawPointer
@@ -367,13 +372,13 @@ func _taskGroupCancelAll(
/// Checks ONLY if the group was specifically cancelled.
/// The task itself being cancelled must be checked separately.
@_silgen_name("swift_task_group_is_cancelled")
@_silgen_name("swift_taskGroup_isCancelled")
func _taskGroupIsCancelled(
task: Builtin.NativeObject,
group: Builtin.RawPointer
) -> Bool
@_silgen_name("swift_task_group_wait_next_throwing")
@_silgen_name("swift_taskGroup_wait_next_throwing")
func _taskGroupWaitNext<T>(
waitingTask: Builtin.NativeObject,
group: Builtin.RawPointer
@@ -386,7 +391,7 @@ enum PollStatus: Int {
case error = 3
}
@_silgen_name("swift_task_group_is_empty")
@_silgen_name("swift_taskGroup_isEmpty")
func _taskGroupIsEmpty(
_ group: Builtin.RawPointer
) -> Bool

View File

@@ -0,0 +1,34 @@
//===--- TaskGroupPrivate.h - TaskGroup internal interface -*- C++ -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// Internal functions for the concurrency library.
//
//===----------------------------------------------------------------------===//
#ifndef SWIFT_CONCURRENCY_TASKGROUPPRIVATE_H
#define SWIFT_CONCURRENCY_TASKGROUPPRIVATE_H
#include "swift/ABI/Task.h"
#include "swift/ABI/TaskGroup.h"
namespace swift {
//void _swift_taskgroup_offer(AsyncTask *completed,
// AsyncContext *context,
// ExecutorRef executor);
//
//void _swift_taskgroup_cancelAll(AsyncTask *completed,
// TaskGroup *group);
} // end namespace swift
#endif

View File

@@ -79,9 +79,9 @@ public:
// Arguments.
AsyncTask *task;
// Only in swift_task_group_wait_next_throwing.
// Only in swift_taskGroup_wait_next_throwing.
TaskGroup *group;
// Only in swift_task_group_wait_next_throwing.
// Only in swift_taskGroup_wait_next_throwing.
const Metadata *successType;
using AsyncContext::AsyncContext;

View File

@@ -460,8 +460,7 @@ static void performCancellationAction(TaskStatusRecord *record) {
}
/// Perform any cancellation actions required by the given record.
static void performGroupCancellationAction(TaskStatusRecord *record,
TaskGroup *group) {
static void performGroupCancellationAction(TaskStatusRecord *record) {
switch (record->getKind()) {
// We only need to cancel specific GroupChildTasks, not arbitrary child tasks.
// A task may be parent to many tasks which are not part of a group after all.
@@ -470,13 +469,12 @@ static void performGroupCancellationAction(TaskStatusRecord *record,
case TaskStatusRecordKind::TaskGroup: {
auto groupChildRecord = cast<TaskGroupTaskStatusRecord>(record);
// since a task can only be running a single task group at the same time,
// we do not need to `group == groupChildRecord->getGroup()` filter here,
// however we assert this for good measure.
// Since a task can only be running a single task group at the same time,
// we can always assume that the group record which we found is the one
// we're intended to cancel child tasks for.
//
// A group enforces that tasks can not "escape" it, and as such once the group
// returns, all its task have been completed.
assert(group == groupChildRecord->getGroup());
for (AsyncTask *child: groupChildRecord->children()) {
swift_task_cancel(child);
}
@@ -540,7 +538,7 @@ void swift::swift_task_cancel_group_child_tasks(AsyncTask *task, TaskGroup *grou
// Carry out the cancellation operations associated with all
// the active records.
for (auto cur: oldStatus.records()) {
performGroupCancellationAction(cur, group);
performGroupCancellationAction(cur);
}
// Release the status record lock, being sure to flag that