getting there with assuming that poll is a single task

This commit is contained in:
Konrad `ktoso` Malawski
2021-02-08 22:18:59 +09:00
parent a226259d84
commit a100424b4a
8 changed files with 410 additions and 375 deletions

View File

@@ -72,7 +72,7 @@ namespace swift {
};
/// The result of waiting on a Channel (TaskGroup).
struct GroupPollResult {
struct PollResult {
GroupPollStatus status; // TODO: pack it into storage pointer or not worth it?
/// Storage for the result of the future.
@@ -99,10 +99,10 @@ namespace swift {
status == GroupPollStatus::Empty;
}
static GroupPollResult get(AsyncTask *asyncTask, bool hadErrorResult,
static PollResult get(AsyncTask *asyncTask, bool hadErrorResult,
bool needsSwiftRelease) {
auto fragment = asyncTask->futureFragment();
return GroupPollResult{
return PollResult{
/*status*/ hadErrorResult ?
TaskGroup::GroupPollStatus::Error :
TaskGroup::GroupPollStatus::Success,
@@ -152,39 +152,17 @@ namespace swift {
}
};
/// An item within the wait queue, which includes the status and the
/// head of the list of tasks.
struct WaitQueueItem { // TODO: reuse the future's wait queue instead?
/// Mask used for the low status bits in a wait queue item.
static const uintptr_t statusMask = 0x03;
uintptr_t storage;
WaitStatus getStatus() const {
return static_cast<WaitStatus>(storage & statusMask);
}
AsyncTask *getTask() const {
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
}
static WaitQueueItem get(WaitStatus status, AsyncTask *task) {
return WaitQueueItem{
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
}
};
struct GroupStatus {
static const uint64_t cancelled = 0x01000000000000000ll;
static const uint64_t cancelled = 0b1000000000000000000000000000000000000000000000000000000000000000;
static const uint64_t waiting = 0b0100000000000000000000000000000000000000000000000000000000000000;
static const uint64_t maskReady = 0x00FFFFF0000000000ll;
static const uint64_t oneReadyTask = 0x00000010000000000ll;
// 31 bits for ready tasks counter
static const uint64_t maskReady = 0b0011111111111111111111111111111110000000000000000000000000000000;
static const uint64_t oneReadyTask = 0b0000000000000000000000000000000010000000000000000000000000000000;
static const uint64_t maskPending = 0x0000000FFFFF00000ll;
static const uint64_t onePendingTask = 0x00000000000100000ll;
static const uint64_t maskWaiting = 0x000000000000FFFFFll;
static const uint64_t oneWaitingTask = 0x00000000000000001ll;
// 31 bits for pending tasks counter
static const uint64_t maskPending = 0b0000000000000000000000000000000001111111111111111111111111111111;
static const uint64_t onePendingTask = 0b0000000000000000000000000000000000000000000000000000000000000001;
uint64_t status;
@@ -192,16 +170,16 @@ namespace swift {
return (status & cancelled) > 0;
}
bool hasWaitingTask() {
return (status & waiting) > 0;
}
unsigned int readyTasks() {
return (status & maskReady) >> 40;
return (status & maskReady) >> 31;
}
unsigned int pendingTasks() {
return (status & maskPending) >> 20;
}
unsigned int waitingTasks() {
return status & maskWaiting;
return (status & maskPending);
}
bool isEmpty() {
@@ -209,12 +187,16 @@ namespace swift {
}
/// Status value decrementing the Ready, Pending and Waiting counters by one.
GroupStatus completingReadyPendingWaitingTask() {
assert(pendingTasks() > 0 && "can only complete waiting tasks when pending tasks available");
assert(readyTasks() > 0 && "can only complete waiting tasks when ready tasks available");
assert(waitingTasks() > 0 && "can only complete waiting tasks when waiting tasks available");
// FIXME take into account cancelled !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
return GroupStatus { status - oneReadyTask - oneWaitingTask - onePendingTask };
GroupStatus completingPendingReadyWaiting() {
assert(pendingTasks() && "can only complete waiting task when pending tasks available");
assert(readyTasks() && "can only complete waiting task when ready tasks available");
assert(hasWaitingTask() && "can only complete waiting task when waiting task available");
return GroupStatus{status - waiting - oneReadyTask - onePendingTask};
}
GroupStatus completingPendingReady() {
assert(pendingTasks() && "can only complete waiting task when pending tasks available");
assert(readyTasks() && "can only complete waiting task when ready tasks available");
return GroupStatus{status - oneReadyTask - onePendingTask};
}
/// Pretty prints the status, as follows:
@@ -223,13 +205,13 @@ namespace swift {
std::string str;
str.append("GroupStatus{ ");
str.append("C:"); // cancelled
str.append(isCancelled() ? "y" : "n");
str.append("R:");
str.append(isCancelled() ? "y " : "n ");
str.append("W:"); // has waiting task
str.append(hasWaitingTask() ? "y " : "n ");
str.append("R:"); // ready
str.append(std::to_string(readyTasks()));
str.append(" P:");
str.append(" P:"); // pending
str.append(std::to_string(pendingTasks()));
str.append(" W:");
str.append(std::to_string(waitingTasks()));
str.append(" " + std::bitset<64>(status).to_string());
str.append(" }");
return str;
@@ -237,7 +219,7 @@ namespace swift {
/// Initially there are no waiting and no pending tasks.
static const GroupStatus initial() {
return GroupStatus { 0 };
return GroupStatus{0};
};
};
@@ -268,7 +250,6 @@ namespace swift {
void enqueue(const T item) {
queue.push(item);
}
};
private:
@@ -289,14 +270,9 @@ namespace swift {
// /// Queue containing all pending tasks.
// NaiveQueue<PendingQueueItem> pendingQueue;
/// Queue containing all of the tasks that are waiting in `get()`.
///
/// A task group is also a future, and awaits on the group's result *itself*
/// are enqueued on its future fragment.
///
/// The low bits contain the status, the rest of the pointer is the
/// AsyncTask.
std::atomic<WaitQueueItem> waitQueue;
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
/// or `nullptr` if no task is currently waiting.
std::atomic<AsyncTask*> waitQueue;
friend class AsyncTask;
@@ -305,7 +281,7 @@ namespace swift {
: status(GroupStatus::initial().status),
readyQueue(),
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
waitQueue(WaitQueueItem::get(WaitStatus::Waiting, nullptr)) {}
waitQueue(nullptr) {}
/// Destroy the storage associated with the channel.
void destroy(AsyncTask *task);
@@ -325,8 +301,23 @@ namespace swift {
/// Returns `true` if this is the first time cancelling the group, false otherwise.
bool cancelAll(AsyncTask *task);
GroupStatus statusCancel() {
auto old = status.fetch_or(GroupStatus::cancelled, std::memory_order_relaxed);
return GroupStatus { old };
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddReadyTaskAcquire() {
GroupStatus statusMarkWaitingAssumeAcquire() {
auto old = status.fetch_or(GroupStatus::waiting, std::memory_order_acquire);
return GroupStatus{old | GroupStatus::waiting};
}
GroupStatus statusRemoveWaiting() {
auto old = status.fetch_and(~GroupStatus::waiting, std::memory_order_release);
return GroupStatus{old};
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddReadyAssumeAcquire() {
auto old = status.fetch_add(GroupStatus::oneReadyTask, std::memory_order_acquire);
auto s = GroupStatus {old + GroupStatus::oneReadyTask };
assert(s.readyTasks() <= s.pendingTasks());
@@ -334,36 +325,57 @@ namespace swift {
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddPendingTaskRelaxed(AsyncTask* pendingTask) {
assert(pendingTask->isFuture());
GroupStatus statusAddPendingTaskRelaxed(
// AsyncTask* pendingTask
) {
// assert(pendingTask->isFuture());
auto old = status.fetch_add(GroupStatus::onePendingTask, std::memory_order_relaxed);
auto s = GroupStatus {old + GroupStatus::onePendingTask };
if (s.isCancelled()) {
// revert that add, it was meaningless
auto o = status.fetch_sub(GroupStatus::onePendingTask, std::memory_order_relaxed);
s = GroupStatus {o - GroupStatus::onePendingTask };
}
fprintf(stderr, "[%s:%d] (%s): status %s\n", __FILE__, __LINE__, __FUNCTION__, s.to_string().c_str());
// // FIXME: we won't need the +1 in the status, just the queue?
// pendingQueue.enqueue(PendingQueueItem::get(pendingTask))
return GroupStatus {old + GroupStatus::onePendingTask };
return s;
}
/// Returns *assumed* new status, including the just performed +1.
GroupStatus statusAddWaitingTaskAcquire() {
auto old = status.fetch_add(GroupStatus::oneWaitingTask, std::memory_order_acquire);
return GroupStatus { old + GroupStatus::oneWaitingTask };
GroupStatus statusLoadRelaxed() {
return GroupStatus{status.load(std::memory_order_relaxed)};
}
/// Remove waiting task, without taking any pending task.
GroupStatus statusRemoveWaitingTask() {
return GroupStatus {
status.fetch_sub(GroupStatus::oneWaitingTask, std::memory_order_relaxed)
};
}
// /// Returns *assumed* new status, including the just performed +1.
// GroupStatus statusAddWaitingTaskAcquire() {
// auto old = status.fetch_add(GroupStatus::oneWaitingTask, std::memory_order_acquire);
// return GroupStatus { old + GroupStatus::oneWaitingTask };
// }
// /// Remove waiting task, without taking any pending task.
// GroupStatus statusRemoveWaitingTask() {
// return GroupStatus {
// status.fetch_sub(GroupStatus::oneWaitingTask, std::memory_order_relaxed)
// };
// }
/// Compare-and-set old status to a status derived from the old one,
/// by simultaneously decrementing one Pending and one Waiting tasks.
///
/// This is used to atomically perform a waiting task completion.
bool statusCompleteReadyPendingWaitingTasks(GroupStatus& old) {
bool statusCompletePendingReadyWaiting(GroupStatus& old) {
return status.compare_exchange_weak(
old.status, old.completingReadyPendingWaitingTask().status,
old.status, old.completingPendingReadyWaiting().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_relaxed);
}
bool statusCompletePendingReady(GroupStatus& old) {
return status.compare_exchange_weak(
old.status, old.completingPendingReady().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_relaxed);
}
@@ -380,23 +392,10 @@ namespace swift {
/// result if it is known that no pending tasks in the group,
/// or a `GroupPollStatus::Waiting` result if there are tasks in flight
/// and the waitingTask eventually be woken up by a completion.
TaskGroup::GroupPollResult poll(AsyncTask *waitingTask);
TaskGroup::PollResult poll(AsyncTask *waitingTask);
};
// /// Offer result of a task into this channel.
// /// The value is enqueued at the end of the channel.
// void groupOffer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
//
// /// Attempt to dequeue ready tasks and complete the waitingTask.
// ///
// /// If unable to complete the waiting task immediately (with an readily
// /// available completed task), either returns an `GroupPollStatus::Empty`
// /// result if it is known that no pending tasks in the group,
// /// or a `GroupPollStatus::Waiting` result if there are tasks in flight
// /// and the waitingTask eventually be woken up by a completion.
// TaskGroup::GroupPollResult groupPoll(AsyncTask *waitingTask);
} // end namespace swift
#endif

View File

@@ -203,16 +203,14 @@ void swift_task_group_destroy(AsyncTask *task, TaskGroup *group);
///
/// \code
/// func swift_task_group_add_pending(
/// pending: Builtin.NativeObject,
/// group: Builtin.NativeObject
/// )
/// ) -> Bool
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_group_add_pending(AsyncTask* pendingTask, TaskGroup *group);
bool swift_task_group_add_pending(TaskGroup *group);
/// Cancel all tasks in the group.
///
/// TODO: also stop accepting new ones perhaps? We don't do this today.
/// This also prevents new tasks from being added.
///
/// This can be called from any thread. Its Swift signature is
///
@@ -225,6 +223,22 @@ void swift_task_group_add_pending(AsyncTask* pendingTask, TaskGroup *group);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_task_group_cancel_all(AsyncTask *task, TaskGroup *group);
/// Check ONLY if the group was explicitly cancelled, e.g. by `cancelAll`.
///
/// This check DOES NOT take into account the task in which the group is running
/// being cancelled or not.
///
/// This can be called from any thread. Its Swift signature is
///
/// \code
/// func swift_task_group_is_cancelled(
/// task: Builtin.NativeObject,
/// group: Builtin.NativeObject
/// )
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_group_is_cancelled(AsyncTask *task, TaskGroup *group);
/// Check the readyQueue of a task group, return true if it has no pending tasks.
///
/// This can be called from any thread. Its Swift signature is
@@ -237,18 +251,6 @@ void swift_task_group_cancel_all(AsyncTask *task, TaskGroup *group);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_group_is_empty(TaskGroup *group);
/// Check if the group was cancelled.
///
/// This can be called from any thread. Its Swift signature is
///
/// \code
/// func swift_task_group_is_cancelled(
/// _ group: Builtin.NativeObject
/// ) -> Bool
/// \endcode
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_task_group_is_cancelled(TaskGroup *group);
/// Add a status record to a task. The record should not be
/// modified while it is registered with a task.
///

View File

@@ -37,7 +37,7 @@ using FutureFragment = AsyncTask::FutureFragment;
using ReadyQueueItem = TaskGroup::ReadyQueueItem;
using ReadyStatus = TaskGroup::ReadyStatus;
using GroupPollResult = TaskGroup::GroupPollResult;
using PollResult = TaskGroup::PollResult;
// =============================================================================
// ==== create -----------------------------------------------------------------
@@ -84,20 +84,26 @@ void TaskGroup::destroy(AsyncTask *task) {
// ==== offer ------------------------------------------------------------------
void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
ExecutorRef executor) {
ExecutorRef completingExecutor) {
assert(completedTask);
assert(completedTask->isFuture());
assert(completedTask->hasChildFragment());
assert(completedTask->hasGroupChildFragment());
assert(completedTask->groupChildFragment()->getGroup() == this);
fprintf(stderr, "[%s:%d] (%s): offer %d\n", __FILE__, __LINE__, __FUNCTION__, completedTask);
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
// Immediately increment ready count and acquire the status
// Examples:
// R:0 P:1 W:0 -> R:1 P:1 W:0 //
// R:0 P:1 W:1 -> R:1 P:1 W:1 // complete immediately
auto assumed = statusAddReadyTaskAcquire();
// W:n R:0 P:3 -> W:n R:1 P:3 // no waiter, 2 more pending tasks
// W:n R:0 P:1 -> W:n R:1 P:1 // no waiter, no more pending tasks
// W:n R:0 P:1 -> W:y R:1 P:1 // complete immediately
// W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks
auto assumed = statusAddReadyAssumeAcquire();
fprintf(stderr, "[%s:%d] (%s): offer %s\n", __FILE__, __LINE__, __FUNCTION__, assumed.to_string().c_str());
// If an error was thrown, save it in the future fragment.
auto futureContext = static_cast<FutureAsyncContext *>(context);
@@ -107,80 +113,65 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
hadErrorResult = true;
}
if (assumed.waitingTasks() == 0) {
// ==== a) enqueue message -----------------------------------------------
//
// no-one was waiting (yet), so we have to instead enqueue to the message queue
// when a task polls during next() it will notice that we have a value ready
// for it, and will process it immediately without suspending.
// ==== a) has waiting task, so let us complete it right away
if (assumed.hasWaitingTask()) {
auto waitingTask = waitQueue.load(std::memory_order_acquire);
fprintf(stderr, "[%s:%d] (%s): has waiter! waiter:%d %s\n", __FILE__, __LINE__, __FUNCTION__, waitingTask, assumed.to_string().c_str());
while (true) {
fprintf(stderr, "[%s:%d] (%s): run waiting task directly!!!!!\n", __FILE__, __LINE__, __FUNCTION__);
// ==== a) run waiting task directly -------------------------------------
assert(assumed.hasWaitingTask());
assert(assumed.pendingTasks() && "offered to group with no pending tasks!");
// We are the "first" completed task to arrive,
// and since there is a task waiting we immediately claim and complete it.
if (waitQueue.compare_exchange_weak(
waitingTask, nullptr,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire) &&
statusCompletePendingReadyWaiting(assumed)) {
fprintf(stderr, "[%s:%d] (%s): offer, claimed task!\n", __FILE__, __LINE__, __FUNCTION__);
fprintf(stderr, "[%s:%d] (%s): status now! assumed: %s\n", __FILE__, __LINE__, __FUNCTION__, assumed.to_string().c_str());
fprintf(stderr, "[%s:%d] (%s): status now! load: %s\n", __FILE__, __LINE__, __FUNCTION__, statusLoadRelaxed().to_string().c_str());
// Run the task.
auto result = PollResult::get(
completedTask, hadErrorResult, /*needsRelease*/ false);
// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);
auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
);
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
fprintf(stderr, "[%s:%d] (%s): RUN waiting:%d with result completed:%d\n", __FILE__, __LINE__, __FUNCTION__, waitingTask, completedTask);
swift::runTaskWithPollResult(waitingTask, completingExecutor, result);
return;
} // else, try again
assert(completedTask == readyItem.getTask());
assert(readyItem.getTask()->isFuture());
readyQueue.enqueue(readyItem);
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return;
assert(false && "why should this have to try again ever?"); // FIXME
}
}
while (true) {
// Loop until we either:
// a) no waiters available, and we enqueued the completed task to readyQueue
// b) successfully claim a waiter to complete with this task
assert(assumed.pendingTasks() && "offered to group with no pending tasks!");
if (statusCompleteReadyPendingWaitingTasks(assumed)) {
// ==== b) run waiter --------------------------------------------------
// We are the "first" completed task to arrive, since old status had zero
//
// If old status had no tasks, it means we are the first to arrive,
// and as such may directly get and signal the first waiting task.
// We only signal *one* waiter and relink the waiter queue.
auto waitHead = waitQueue.load(std::memory_order_acquire);
while (auto waitingTask = waitHead.getTask()) {
// Find the next waiting task.
auto nextWaitingTask = waitingTask->getNextWaitingTask();
auto nextWaitQueueItem = TaskGroup::WaitQueueItem::get(
TaskGroup::WaitStatus::Waiting,
nextWaitingTask
);
// ==== b) enqueue completion ------------------------------------------------
//
// else, no-one was waiting (yet), so we have to instead enqueue to the message
// queue when a task polls during next() it will notice that we have a value
// ready for it, and will process it immediately without suspending.
assert(!waitQueue.load(std::memory_order_relaxed));
// Attempt to claim it, we are the future that is going to complete it.
// TODO: there may be other futures trying to do the same right now? FIXME: not really because the status right?
if (waitQueue.compare_exchange_weak(
waitHead, nextWaitQueueItem,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire)) {
// Run the task.
auto result = GroupPollResult::get(
completedTask, hadErrorResult, /*needsRelease*/ false);
// Retain the task while it is in the queue;
// it must remain alive until the task group is alive.
swift_retain(completedTask);
auto readyItem = ReadyQueueItem::get(
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
completedTask
);
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
swift::runTaskWithGroupPollResult(waitingTask, executor, result);
return;
} else {
waitingTask = waitHead.getTask();
}
// DO NOT move to the next task, one element is only signalled *once*.
// E.g. if we somehow had two next() registered, each should get
// individual elements, not the same element after all (!).
// Move to the next task.
// waitingTask = nextWaitingTask;
}
} // else, status-cas failed and we need to try again
}
llvm_unreachable("offer must successfully complete it's cas-loop enqueue!");
assert(completedTask == readyItem.getTask());
assert(readyItem.getTask()->isFuture());
fprintf(stderr, "[%s:%d] (%s): enqueue in ready queue %d\n", __FILE__, __LINE__, __FUNCTION__, completedTask);
readyQueue.enqueue(readyItem);
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return;
}
// =============================================================================
// ==== group.next() implementation (wait_next and groupPoll) ------------------
SWIFT_CC(swiftasync)
void swift::swift_task_group_wait_next(
AsyncTask *waitingTask,
@@ -190,86 +181,76 @@ void swift::swift_task_group_wait_next(
waitingTask->ResumeContext = rawContext;
auto context = static_cast<TaskGroupNextWaitAsyncContext *>(rawContext);
// fprintf(stderr, "[%s:%d] (%s): context %d\n", __FILE__, __LINE__, __FUNCTION__, context);
fprintf(stderr, "[%s:%d] (%s): context %d\n", __FILE__, __LINE__, __FUNCTION__, context);
auto task = context->task;
auto group = context->group;
// fprintf(stderr, "[%s:%d] (%s): task %d\n", __FILE__, __LINE__, __FUNCTION__, task);
// fprintf(stderr, "[%s:%d] (%s): waitingTask %d\n", __FILE__, __LINE__, __FUNCTION__, waitingTask);
// fprintf(stderr, "[%s:%d] (%s): group %d\n", __FILE__, __LINE__, __FUNCTION__, group);
GroupPollResult polled = group->poll(waitingTask);
fprintf(stderr, "[%s:%d] (%s): task %d\n", __FILE__, __LINE__, __FUNCTION__, task);
fprintf(stderr, "[%s:%d] (%s): waitingTask %d\n", __FILE__, __LINE__, __FUNCTION__, waitingTask);
fprintf(stderr, "[%s:%d] (%s): group %d\n", __FILE__, __LINE__, __FUNCTION__, group);
TaskGroup::PollResult polled = group->poll(waitingTask);
fprintf(stderr, "[%s:%d] (%s): group polled: %d\n", __FILE__, __LINE__, __FUNCTION__, polled.status);
if (polled.status == TaskGroup::GroupPollStatus::Waiting) {
fprintf(stderr, "[%s:%d] (%s): group polled: WAITING\n", __FILE__, __LINE__, __FUNCTION__);
// The waiting task has been queued on the channel,
// there were pending tasks so it will be woken up eventually.
return;
}
runTaskWithGroupPollResult(waitingTask, executor, polled);
if (polled.status == TaskGroup::GroupPollStatus::Empty)
fprintf(stderr, "[%s:%d] (%s): group polled: RUN EMPTY\n", __FILE__, __LINE__, __FUNCTION__);
if (polled.status == TaskGroup::GroupPollStatus::Success)
fprintf(stderr, "[%s:%d] (%s): group polled: RUN SUCCESS\n", __FILE__, __LINE__, __FUNCTION__);
if (polled.status == TaskGroup::GroupPollStatus::Error)
fprintf(stderr, "[%s:%d] (%s): group polled: RUN ERROR\n", __FILE__, __LINE__, __FUNCTION__);
runTaskWithPollResult(waitingTask, executor, polled);
}
TaskGroup::GroupPollResult TaskGroup::poll(AsyncTask *waitingTask) {
auto fragment = this;
fragment->mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
mutex.lock(); // TODO: remove group lock, and use status for synchronization
auto assumed = statusMarkWaitingAssumeAcquire();
// immediately update the status counter
auto assumed = fragment->statusAddWaitingTaskAcquire();
GroupPollResult result;
PollResult result;
result.storage = nullptr;
result.retainedTask = nullptr;
fprintf(stderr, "[%s:%d] (%s): status %s\n", __FILE__, __LINE__, __FUNCTION__, assumed.to_string().c_str());
// ==== 1) bail out early if no tasks are pending ----------------------------
if (assumed.isEmpty()) {
// 1) No tasks in flight, we know no tasks were submitted before this poll
// was issued, and if we parked here we'd potentially never be woken up.
// Bail out and return `nil` from `group.next()`.
fragment->statusRemoveWaitingTask(); // "revert" our eager +1 we just did
// No tasks in flight, we know no tasks were submitted before this poll
// was issued, and if we parked here we'd potentially never be woken up.
// Bail out and return `nil` from `group.next()`.
fprintf(stderr, "[%s:%d] (%s): empty! assumed: %s\n", __FILE__, __LINE__, __FUNCTION__, assumed.to_string().c_str());
statusRemoveWaiting();
fprintf(stderr, "[%s:%d] (%s): debug now: %s\n", __FILE__, __LINE__, __FUNCTION__, statusLoadRelaxed().to_string().c_str());
result.status = TaskGroup::GroupPollStatus::Empty;
fragment->mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
mutex.unlock(); // TODO: remove group lock, and use status for synchronization
return result;
}
// ==== Add to wait queue ----------------------------------------------------
while (true) {
// Put the waiting task at the beginning of the wait queue.
auto waitHead = fragment->waitQueue.load(std::memory_order_acquire);
waitingTask->getNextWaitingTask() = waitHead.getTask();
auto newWaitHead = TaskGroup::WaitQueueItem::get(
TaskGroup::WaitStatus::Waiting, waitingTask);
auto waitHead = waitQueue.load(std::memory_order_acquire);
if (fragment->waitQueue.compare_exchange_weak(
waitHead, newWaitHead,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire)) {
// While usually the waiting task will be the group task,
// we can attempt to escalate group task here. // TODO: does this make sense?
// Note that we cannot escalate the specific future (child) task we'd
// like to complete, since we don't know which one that might be.
// swift_task_escalate(this, waitingTask->Flags.getPriority()); // FIXME!!!!!!!!!!
result.status = TaskGroup::GroupPollStatus::Waiting;
// return result;
break;
} // else, try again
}
// ==== 2) Ready task was polled, return with it immediately -----------------
if (assumed.readyTasks()) {
fprintf(stderr, "[%s:%d] (%s): there are ready tasks... %s\n", __FILE__, __LINE__, __FUNCTION__, statusLoadRelaxed().to_string().c_str());
// ==== 3) Ready task was polled, return with it immediately -----------------
auto assumedStatus = assumed.status;
while (assumed.readyTasks()) {
auto assumedStatus = assumed.status;
auto newStatus = TaskGroup::GroupStatus{assumedStatus};
if (fragment->status.compare_exchange_weak(
assumedStatus, newStatus.completingReadyPendingWaitingTask().status,
if (status.compare_exchange_weak(
assumedStatus, newStatus.completingPendingReadyWaiting().status,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire)) {
// Success! We are allowed to poll.
ReadyQueueItem item;
bool taskDequeued = fragment->readyQueue.dequeue(item);
bool taskDequeued = readyQueue.dequeue(item);
if (!taskDequeued) {
result.status = TaskGroup::GroupPollStatus::Waiting;
result.storage = nullptr;
result.retainedTask = nullptr;
fragment->mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
mutex.unlock(); // TODO: remove group lock, and use status for synchronization
return result;
}
@@ -280,6 +261,7 @@ TaskGroup::GroupPollResult TaskGroup::poll(AsyncTask *waitingTask) {
// be swift_release'd; we kept it alive while it was in the readyQueue by
// an additional retain issued as we enqueued it there.
result.retainedTask = item.getTask();
fprintf(stderr, "[%s:%d] (%s): about to return: %d\n", __FILE__, __LINE__, __FUNCTION__, result.retainedTask);
switch (item.getStatus()) {
case ReadyStatus::Success:
@@ -287,7 +269,7 @@ TaskGroup::GroupPollResult TaskGroup::poll(AsyncTask *waitingTask) {
result.status = TaskGroup::GroupPollStatus::Success;
result.storage = futureFragment->getStoragePtr();
assert(result.retainedTask && "polled a task, it must be not null");
fragment->mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return result;
case ReadyStatus::Error:
@@ -296,24 +278,36 @@ TaskGroup::GroupPollResult TaskGroup::poll(AsyncTask *waitingTask) {
result.storage =
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
assert(result.retainedTask && "polled a task, it must be not null");
fragment->mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return result;
case ReadyStatus::Empty:
result.status = TaskGroup::GroupPollStatus::Empty;
result.storage = nullptr;
result.retainedTask = nullptr;
fragment->mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return result;
}
assert(false && "must return result when status compare-and-swap was successful");
} // else, we failed status-cas (some other waiter claimed a ready pending task, try again)
} // no more ready tasks
}
// no ready tasks, so we must wait.
result.status = TaskGroup::GroupPollStatus::Waiting;
fragment->mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
return result;
// ==== 3) Add to wait queue -------------------------------------------------
assert(assumed.readyTasks() == 0);
while (true) {
// Put the waiting task at the beginning of the wait queue.
if (waitQueue.compare_exchange_weak(
waitHead, waitingTask,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire)) {
fprintf(stderr, "[%s:%d] (%s): added to wait queue\n", __FILE__, __LINE__, __FUNCTION__);
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
// no ready tasks, so we must wait.
result.status = TaskGroup::GroupPollStatus::Waiting;
return result;
} // else, try again
}
assert(false && "must successfully compare exchange the waiting task.");
}
// =============================================================================
@@ -324,9 +318,9 @@ bool swift::swift_task_group_is_empty(TaskGroup *group) {
}
// =============================================================================
// ==== isEmpty ----------------------------------------------------------------
// ==== isCancelled ------------------------------------------------------------
bool swift::swift_task_group_is_cancelled(TaskGroup *group) {
bool swift::swift_task_group_is_cancelled(AsyncTask *task, TaskGroup *group) {
return group->isCancelled();
}
@@ -339,10 +333,9 @@ void swift::swift_task_group_cancel_all(AsyncTask *task, TaskGroup *group) {
bool TaskGroup::cancelAll(AsyncTask *task) {
// store the cancelled bit
auto old = status.fetch_or(GroupStatus::cancelled, std::memory_order_acquire);
auto o = GroupStatus { old };
auto old = statusCancel();
if (o.isCancelled()) {
if (old.isCancelled()) {
// already was cancelled previously, nothing to do?
return false;
}
@@ -355,6 +348,6 @@ bool TaskGroup::cancelAll(AsyncTask *task) {
// =============================================================================
// ==== internal ---------------------------------------------------------------
void swift::swift_task_group_add_pending(AsyncTask *pendingTask, TaskGroup *group) {
group->statusAddPendingTaskRelaxed(pendingTask);
bool swift::swift_task_group_add_pending(TaskGroup *group) {
return !group->statusAddPendingTaskRelaxed().isCancelled();
}

View File

@@ -72,9 +72,9 @@ extension Task {
// startingChildTasksOn executor: ExecutorRef? = nil, // TODO: actually respect it
body: (inout Task.Group<TaskResult>) async throws -> BodyResult
) async throws -> BodyResult {
let parent = Builtin.getCurrentAsyncTask()
let _group = _taskGroupCreate(task: parent)
var group: Task.Group<TaskResult>! = Task.Group(group: _group)
let task = Builtin.getCurrentAsyncTask()
let _group = _taskGroupCreate(task: task)
var group: Task.Group<TaskResult>! = Task.Group(task: task, group: _group)
// This defer handles both return/throw cases in the following way:
// - body throws: all tasks must be cancelled immediately as we re-throw
@@ -87,12 +87,13 @@ extension Task {
let result = try await body(&group)
await group._tearDown()
group = nil
_taskGroupDestroy(task: parent, group: _group)
_taskGroupDestroy(task: task, group: _group)
return result
} catch {
group.cancelAll()
await group._tearDown()
group = nil
_taskGroupDestroy(task: parent, group: _group)
_taskGroupDestroy(task: task, group: _group)
throw error
}
}
@@ -102,12 +103,14 @@ extension Task {
/// Its intended use is with the `Task.withGroup` function.
/* @unmoveable */
public struct Group<TaskResult> {
private let _task: Builtin.NativeObject
/// Group task into which child tasks offer their results,
/// and the `next()` function polls those results from.
private let _group: Builtin.NativeObject
/// No public initializers
init(group: Builtin.NativeObject) {
init(task: Builtin.NativeObject, group: Builtin.NativeObject) {
self._task = task
self._group = group
}
@@ -130,19 +133,32 @@ extension Task {
public mutating func add(
overridingPriority priorityOverride: Priority? = nil,
operation: @concurrent @escaping () async throws -> TaskResult
) async -> Task.Handle<TaskResult, Error> {
// FIXME: first create the child, then run it
let childTask = await _runGroupChildTask(
overridingPriority: priorityOverride,
group: _group,
operation: operation
)
) async -> Bool {
let canAdd = _taskGroupAddPendingTask(group: _group)
_taskGroupAddPendingTask(pending: childTask, group: _group)
guard canAdd else {
// the group is cancelled and is not accepting any new work
return false
}
// TODO: only NOW run the child task
// Set up the job flags for a new task.
var flags = Task.JobFlags()
flags.kind = .task
flags.priority = priorityOverride ?? getJobFlags(_task).priority
flags.isFuture = true
flags.isChildTask = true
flags.isGroupChildTask = true
return Handle<TaskResult, Error>(childTask)
// Create the asynchronous task future.
let (childTask, _) = Builtin.createAsyncTaskGroupFuture(
flags.bits, _task, _group, operation)
// Enqueue the resulting job.
_enqueueJobGlobal(Builtin.convertTaskToJob(childTask))
// TODO: need to store task in the group too
return true
}
/// Wait for the a child task that was added to the group to complete,
@@ -167,6 +183,11 @@ extension Task {
/// Awaiting on an empty group results in the immediate return of a `nil`
/// value, without the group task having to suspend.
///
/// ### Thread-safety
/// Please note that the `group` object MUST NOT escape into another task.
/// The `group.next()` MUST be awaited from the task that had originally
/// created the group. It is not allowed to escape the group reference.
///
/// ### Ordering
/// Order of values returned by next() is *completion order*, and not
/// submission order. I.e. if tasks are added to the group one after another:
@@ -184,8 +205,17 @@ extension Task {
/// It is possible to directly rethrow such error out of a `withGroup` body
/// function's body, causing all remaining tasks to be implicitly cancelled.
public mutating func next() async throws -> TaskResult? {
let task = Builtin.getCurrentAsyncTask()
let rawResult = await _taskGroupWaitNext(waitingTask: task, group: _group)
#if NDEBUG
let callingTask = Builtin.getCurrentAsyncTask() // can't inline into the assert sadly
assert(unsafeBitCast(callingTask, to: size_t.self) ==
unsafeBitCast(_task, to: size_t.self),
"""
group.next() invoked from task other than the task which created the group! \
This means the group must have illegally escaped the withGroup{} scope.
""")
#endif
let rawResult = await _taskGroupWaitNext(waitingTask: _task, group: _group)
if rawResult.hadErrorResult {
// Throw the result on error.
@@ -226,9 +256,14 @@ extension Task {
/// - SeeAlso: `Task.addCancellationHandler`
/// - SeeAlso: `Task.checkCancelled`
/// - SeeAlso: `Task.isCancelled`
public mutating func cancelAll() async { // FIXME SHOULD NOT BE ASYNC (!!!!!!)
let task = Builtin.getCurrentAsyncTask() // FIXME: needs the task
_taskGroupCancelAll(task: task, group: _group)
public mutating func cancelAll() {
_taskGroupCancelAll(task: _task, group: _group)
}
// Returns `true` if the group was cancelled, e.g. by `cancelAll`.
public var isCancelled: Bool {
return _taskIsCancelled(_task) ||
_taskGroupIsCancelled(task: _task, group: _group)
}
}
}
@@ -251,8 +286,6 @@ extension Task.Group {
/// If tasks should be cancelled before returning this must be done by an
/// explicit `group.cancelAll()` call within the `withGroup`'s function body.
mutating func _tearDown() async {
await self.cancelAll()
// Drain any not next() awaited tasks if the group wasn't cancelled
// If any of these tasks were to throw
//
@@ -264,39 +297,13 @@ extension Task.Group {
// where one may have various decisions depending on use cases...
continue // keep awaiting on all pending tasks
}
self.cancelAll() // for good measure
}
}
/// ==== -----------------------------------------------------------------------
func _runGroupChildTask<T>(
overridingPriority priorityOverride: Task.Priority?,
group: Builtin.NativeObject,
// startingOn executor: ExecutorRef, // TODO: allow picking executor
operation: @concurrent @escaping () async throws -> T
) async -> Builtin.NativeObject {
let currentTask = Builtin.getCurrentAsyncTask()
// Set up the job flags for a new task.
var flags = Task.JobFlags()
flags.kind = .task
flags.priority = priorityOverride ?? getJobFlags(currentTask).priority
flags.isFuture = true
flags.isChildTask = true
flags.isGroupChildTask = true
// Create the asynchronous task future.
let (task, _) = Builtin.createAsyncTaskGroupFuture(
flags.bits, currentTask, group, operation)
// Enqueue the resulting job.
_enqueueJobGlobal(Builtin.convertTaskToJob(task))
return task
}
/// ==== -----------------------------------------------------------------------
@_silgen_name("swift_retain")
func _swiftRetain(
_ object: Builtin.NativeObject
@@ -320,9 +327,8 @@ func _taskGroupDestroy(
@_silgen_name("swift_task_group_add_pending")
func _taskGroupAddPendingTask(
pending pendingTask: Builtin.NativeObject,
group: Builtin.NativeObject
)
) -> Bool
@_silgen_name("swift_task_group_cancel_all")
func _taskGroupCancelAll(
@@ -330,11 +336,13 @@ func _taskGroupCancelAll(
group: Builtin.NativeObject
)
@_silgen_name("swift_task_group_offer")
func _taskGroupOffer(
group: Builtin.NativeObject,
completedTask: Builtin.NativeObject
)
/// Checks ONLY if the group was specifically cancelled.
/// The task itself being cancelled must be checked separately.
@_silgen_name("swift_task_group_is_cancelled")
func _taskGroupIsCancelled(
task: Builtin.NativeObject,
group: Builtin.NativeObject
) -> Bool
@_silgen_name("swift_task_group_wait_next")
func _taskGroupWaitNext(

View File

@@ -120,9 +120,9 @@ static void runTaskWithFutureResult(
}
/// Run the given task, providing it with the result of the future.
static void runTaskWithGroupPollResult(
static void runTaskWithPollResult(
AsyncTask *waitingTask, ExecutorRef executor,
TaskGroup::GroupPollResult result) {
TaskGroup::PollResult result) {
auto waitingTaskContext =
static_cast<TaskFutureWaitAsyncContext *>(waitingTask->ResumeContext);
@@ -148,7 +148,8 @@ static void runTaskWithGroupPollResult(
}
// TODO: schedule this task on the executor rather than running it directly.
waitingTask->run(executor);
// FIXME: or waitingTask->run(executor); ?
waitingTask->run(waitingTaskContext->ResumeParentExecutor);
// TODO: Not entirely sure when to release; we synchronously run the code above so we can't before
// if we need to, release the now completed task so it can be destroyed

View File

@@ -1,73 +0,0 @@
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s --dump-input=always
// REQUIRES: executable_test
// REQUIRES: concurrency
// XFAIL: windows
// XFAIL: linux
// XFAIL: openbsd
import Dispatch
import Darwin
func completeImmediately(n: Int) async -> Int {
print("complete group.add { \(n) }")
return n
}
func completeSlowly(n: Int) async -> Int {
sleep(3)
print("complete group.add { \(n) }")
return n
}
/// Tasks complete AFTER they are next() polled.
func test_sum_nextOnPending() async {
let sum = try! await Task.withGroup(resultType: Int.self) { (group) async -> Int in
let firstHandle = await group.add {
let res = await completeSlowly(n: 1)
return res
}
let secondHandle = await group.add {
let res = await completeImmediately(n: 2)
return res
}
let thirdHandle = await group.add {
let res = await completeSlowly(n: 3)
return res
}
let first = try! await firstHandle.get()
print("firstHandle.get(): \(first)")
let second = try! await secondHandle.get()
print("secondHandle.get(): \(second)")
let third = try! await thirdHandle.get()
print("thirdHandle.get(): \(third)")
var sum = 0
print("before group.next(), sum: \(sum)")
while let n = try! await group.next() {
assert(n <= 3, "Unexpected value: \(n)! Expected <= 3")
print("next: \(n)")
sum += n
print("before group.next(), sum: \(sum)")
}
print("task group returning: \(sum)")
return sum
}
// CHECK: firstHandle.get(): 1
// CHECK: secondHandle.get(): 2
// CHECK: thirdHandle.get(): 3
// CHECK: task group returning: 6
// CHECK: result: 6
print("result: \(sum)")
assert(sum == 6, "Expected \(6) but got \(sum)")
}
@main struct Main {
static func main() async {
await test_sum_nextOnPending()
}
}

View File

@@ -0,0 +1,105 @@
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s --dump-input always
// REQUIRES: executable_test
// REQUIRES: concurrency
// XFAIL: windows
// XFAIL: linux
// XFAIL: openbsd
import Dispatch
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#endif
func asyncEcho(_ value: Int) async -> Int {
value
}
func pprint(_ m: String, file: String = #file, line: UInt = #line) {
fputs("[\(file):\(line)] \(m)\n", stderr)
print(m)
}
//func test_taskGroup_cancel_then_add() async {
// // COM: CHECK: test_taskGroup_cancel_then_add
// pprint("\(#function)")
// let result: Int = try! await Task.withGroup(resultType: Int.self) { group in
//
// let addedFirst = await group.add { 1 }
// pprint("added first: \(addedFirst)") // COM: CHECK: added first: true
//
// let one = try! await group.next()!
// pprint("next first: \(one)") // COM: CHECK: next first: 1
//
// await group.cancelAll() // FIXME: dont make it async
// pprint("cancelAll")
//
// let addedSecond = await group.add { 1 }
// pprint("added second: \(addedSecond)") // COM: CHECK: added second: false
//
// let none = try! await group.next()
// pprint("next second: \(none)") // COM: CHECK: next second: nil
//
// return (one ?? 0) + (none ?? 0)
// }
//
// pprint("result: \(result)") // COM: CHECK: result: 1
//}
func test_taskGroup_cancel_then_completions() async {
// CHECK: test_taskGroup_cancel_then_completions
pprint("before \(#function)")
// async let outer: Bool = {
// sleep(6)
// return await Task.isCancelled()
// }()
let result: Int = try! await Task.withGroup(resultType: (Int, Bool).self) { group in
pprint("group cancelled: \(group.isCancelled)") // CHECK: group cancelled: false
let addedFirst = await group.add {
pprint("start first")
sleep(1)
pprint("done first")
return (1, await Task.isCancelled())
}
pprint("added first: \(addedFirst)") // CHECK: added first: true
assert(addedFirst)
let addedSecond = await group.add {
pprint("start second")
sleep(3)
pprint("done second")
return (2, await Task.isCancelled())
}
pprint("added second: \(addedSecond)") // CHECK: added second: true
assert(addedSecond)
group.cancelAll() // FIXME: dont make it async
pprint("cancelAll") // CHECK: cancelAll
// let outerCancelled = await outer // should not be cancelled
// pprint("outer cancelled: \(outerCancelled)") // COM: CHECK: outer cancelled: false
// pprint("group cancelled: \(group.isCancelled)") // COM: CHECK: outer cancelled: false
let one = try! await group.next()
pprint("first: \(one)") // CHECK: first: Optional((1,
let two = try! await group.next()
pprint("second: \(two)") // CHECK: second: Optional((2,
let none = try! await group.next()
pprint("none: \(none)") // CHECK: none: nil
return (one?.0 ?? 0) + (two?.0 ?? 0) + (none?.0 ?? 0)
}
pprint("result: \(result)") // CHECK: result: 3
}
@main struct Main {
static func main() async {
// await test_taskGroup_cancel_then_add()
await test_taskGroup_cancel_then_completions()
}
}

View File

@@ -19,7 +19,7 @@ func asyncEcho(_ value: Int) async -> Int {
func test_taskGroup_isEmpty() async {
do {
pprint("before all")
print("before all")
let result = try await Task.withGroup(resultType: Int.self) {
(group) async -> Int in
// CHECK: before add: isEmpty=true