Files
swift-mirror/stdlib/public/Concurrency/Task.cpp
Erik Eckstein ec1490d06e [concurrency] Implement the Task allocator as bump-pointer allocator.
Use the StackAllocator as task allocator.

TODO: we could pass an initial pre-allocated first slab to the allocator, which is allocated on the stack or with the parent task's allocator.

rdar://problem/71157018
2020-12-09 22:22:28 +01:00

430 lines
14 KiB
C++

//===--- Task.cpp - Task object and management ----------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// Object management routines for asynchronous task objects.
//
//===----------------------------------------------------------------------===//
#include "swift/Runtime/Concurrency.h"
#include "swift/ABI/Task.h"
#include "swift/ABI/Metadata.h"
#include "swift/Runtime/HeapObject.h"
#include "TaskPrivate.h"
// TODO: We shouldn't need this
#include <dispatch/dispatch.h>
using namespace swift;
using FutureFragment = AsyncTask::FutureFragment;
void FutureFragment::destroy() {
auto queueHead = waitQueue.load(std::memory_order_acquire);
switch (queueHead.getStatus()) {
case Status::Executing:
assert(false && "destroying a task that never completed");
case Status::Success:
resultType->vw_destroy(getStoragePtr());
break;
case Status::Error:
swift_unknownObjectRelease(reinterpret_cast<OpaqueValue *>(getError()));
break;
}
}
FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask) {
using Status = FutureFragment::Status;
using WaitQueueItem = FutureFragment::WaitQueueItem;
assert(isFuture());
auto fragment = futureFragment();
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
while (true) {
switch (queueHead.getStatus()) {
case Status::Error:
case Status::Success:
// The task is done; we don't need to wait.
return queueHead.getStatus();
case Status::Executing:
// Task is now complete. We'll need to add ourselves to the queue.
break;
}
// Put the waiting task at the beginning of the wait queue.
waitingTask->getNextWaitingTask() = queueHead.getTask();
auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
if (fragment->waitQueue.compare_exchange_weak(
queueHead, newQueueHead, std::memory_order_release,
std::memory_order_acquire)) {
// Escalate the priority of this task based on the priority
// of the waiting task.
swift_task_escalate(this, waitingTask->Flags.getPriority());
return FutureFragment::Status::Executing;
}
}
}
namespace {
/// An asynchronous context within a task that describes a general "Future".
/// task.
///
/// This type matches the ABI of a function `<T> () async throws -> T`, which
/// is the type used by `Task.runDetached` and `Task.group.add` to create
/// futures.
class TaskFutureWaitAsyncContext : public AsyncContext {
public:
// Error result is always present.
SwiftError *errorResult = nullptr;
// No indirect results.
TaskFutureWaitResult result;
// FIXME: Currently, this is always here, but it isn't technically
// necessary.
void* Self;
// Arguments.
AsyncTask *task;
using AsyncContext::AsyncContext;
};
}
/// Run the given task, privoding it with the result of the future.
static void runTaskWithFutureResult(
AsyncTask *waitingTask, ExecutorRef executor,
FutureFragment *futureFragment, bool hadErrorResult) {
auto waitingTaskContext =
static_cast<TaskFutureWaitAsyncContext *>(waitingTask->ResumeContext);
waitingTaskContext->result.hadErrorResult = hadErrorResult;
if (hadErrorResult) {
waitingTaskContext->result.storage =
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
} else {
waitingTaskContext->result.storage = futureFragment->getStoragePtr();
}
// TODO: schedule this task on the executor rather than running it
// directly.
waitingTask->run(executor);
}
void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
using Status = FutureFragment::Status;
using WaitQueueItem = FutureFragment::WaitQueueItem;
assert(isFuture());
auto fragment = futureFragment();
// If an error was thrown, save it in the future fragment.
auto futureContext = static_cast<FutureAsyncContext *>(context);
bool hadErrorResult = false;
if (auto errorObject = futureContext->errorResult) {
fragment->getError() = errorObject;
hadErrorResult = true;
}
// Update the status to signal completion.
auto newQueueHead = WaitQueueItem::get(
hadErrorResult ? Status::Error : Status::Success,
nullptr
);
auto queueHead = fragment->waitQueue.exchange(
newQueueHead, std::memory_order_acquire);
assert(queueHead.getStatus() == Status::Executing);
// Schedule every waiting task on the executor.
auto waitingTask = queueHead.getTask();
while (waitingTask) {
// Find the next waiting task.
auto nextWaitingTask = waitingTask->getNextWaitingTask();
// Run the task.
runTaskWithFutureResult(waitingTask, executor, fragment, hadErrorResult);
// Move to the next task.
waitingTask = nextWaitingTask;
}
}
SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);
// For a future, destroy the result.
if (task->isFuture()) {
task->futureFragment()->destroy();
}
// The task execution itself should always hold a reference to it, so
// if we get here, we know the task has finished running, which means
// swift_task_complete should have been run, which will have torn down
// the task-local allocator. There's actually nothing else to clean up
// here.
free(task);
}
/// Heap metadata for an asynchronous task.
static FullMetadata<HeapMetadata> taskHeapMetadata = {
{
{
&destroyTask
},
{
/*value witness table*/ nullptr
}
},
{
MetadataKind::Task
}
};
/// The function that we put in the context of a simple task
/// to handle the final return.
SWIFT_CC(swift)
static void completeTask(AsyncTask *task, ExecutorRef executor,
AsyncContext *context) {
// Tear down the task-local allocator immediately; there's no need
// to wait for the object to be destroyed.
_swift_task_alloc_destroy(task);
// Complete the future.
if (task->isFuture()) {
task->completeFuture(context, executor);
}
// TODO: set something in the status?
// TODO: notify the parent somehow?
// TODO: remove this task from the child-task chain?
// Release the task, balancing the retain that a running task
// has on itself.
swift_release(task);
}
AsyncTaskAndContext
swift::swift_task_create(JobFlags flags, AsyncTask *parent,
const AsyncFunctionPointer<void()> *function) {
return swift_task_create_f(flags, parent, function->Function.get(),
function->ExpectedContextSize);
}
AsyncTaskAndContext
swift::swift_task_create_f(JobFlags flags, AsyncTask *parent,
AsyncFunctionType<void()> *function,
size_t initialContextSize) {
return swift_task_create_future_f(
flags, parent, nullptr, function, initialContextSize);
}
AsyncTaskAndContext swift::swift_task_create_future(
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
const AsyncFunctionPointer<void()> *function) {
return swift_task_create_future_f(
flags, parent, futureResultType, function->Function.get(),
function->ExpectedContextSize);
}
AsyncTaskAndContext swift::swift_task_create_future_f(
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
AsyncFunctionType<void()> *function, size_t initialContextSize) {
assert((futureResultType != nullptr) == flags.task_isFuture());
assert(!flags.task_isFuture() ||
initialContextSize >= sizeof(FutureAsyncContext));
assert((parent != nullptr) == flags.task_isChildTask());
// Figure out the size of the header.
size_t headerSize = sizeof(AsyncTask);
if (parent) headerSize += sizeof(AsyncTask::ChildFragment);
if (futureResultType) {
headerSize += FutureFragment::fragmentSize(futureResultType);
}
headerSize = llvm::alignTo(headerSize, llvm::Align(alignof(AsyncContext)));
// Allocate the initial context together with the job.
// This means that we never get rid of this allocation.
size_t amountToAllocate = headerSize + initialContextSize;
assert(amountToAllocate % MaximumAlignment == 0);
void *allocation = malloc(amountToAllocate);
AsyncContext *initialContext =
reinterpret_cast<AsyncContext*>(
reinterpret_cast<char*>(allocation) + headerSize);
// Initialize the task so that resuming it will run the given
// function on the initial context.
AsyncTask *task =
new(allocation) AsyncTask(&taskHeapMetadata, flags,
function, initialContext);
// Initialize the child fragment if applicable.
// TODO: propagate information from the parent?
if (parent) {
auto childFragment = task->childFragment();
new (childFragment) AsyncTask::ChildFragment(parent);
}
// Initialize the future fragment if applicable.
if (futureResultType) {
auto futureFragment = task->futureFragment();
new (futureFragment) FutureFragment(futureResultType);
// Set up the context for the future so there is no error, and a successful
// result will be written into the future fragment's storage.
auto futureContext = static_cast<FutureAsyncContext *>(initialContext);
futureContext->errorResult = nullptr;
futureContext->indirectResult = futureFragment->getStoragePtr();
}
// Configure the initial context.
//
// FIXME: if we store a null pointer here using the standard ABI for
// signed null pointers, then we'll have to authenticate context pointers
// as if they might be null, even though the only time they ever might
// be is the final hop. Store a signed null instead.
initialContext->Parent = nullptr;
initialContext->ResumeParent = &completeTask;
initialContext->ResumeParentExecutor = ExecutorRef::generic();
initialContext->Flags = AsyncContextKind::Ordinary;
initialContext->Flags.setShouldNotDeallocateInCallee(true);
// Initialize the task-local allocator.
// TODO: consider providing an initial pre-allocated first slab to the
// allocator.
_swift_task_alloc_initialize(task);
return {task, initialContext};
}
void swift::swift_task_future_wait(
AsyncTask *waitingTask, ExecutorRef executor,
AsyncContext *rawContext) {
// Suspend the waiting task.
waitingTask->ResumeTask = rawContext->ResumeParent;
waitingTask->ResumeContext = rawContext;
// Wait on the future.
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
auto task = context->task;
assert(task->isFuture());
switch (task->waitFuture(waitingTask)) {
case FutureFragment::Status::Executing:
// The waiting task has been queued on the future.
return;
case FutureFragment::Status::Success:
// Run the task with a successful result.
// FIXME: Want to guarantee a tail call here
runTaskWithFutureResult(
waitingTask, executor, task->futureFragment(),
/*hadErrorResult=*/false);
return;
case FutureFragment::Status::Error:
// Run the task with an error result.
// FIXME: Want to guarantee a tail call here
runTaskWithFutureResult(
waitingTask, executor, task->futureFragment(),
/*hadErrorResult=*/true);
return;
}
}
// TODO: Remove this hack.
void swift::swift_task_run(AsyncTask *taskToRun) {
taskToRun->run(ExecutorRef::generic());
}
size_t swift::swift_task_getJobFlags(AsyncTask *task) {
return task->Flags.getOpaqueValue();
}
namespace {
/// Structure that gets filled in when a task is suspended by `withUnsafeContinuation`.
struct AsyncContinuationContext {
// These fields are unnecessary for resuming a continuation.
void *Unused1;
void *Unused2;
// Storage slot for the error result, if any.
SwiftError *ErrorResult;
// Pointer to where to store a normal result.
OpaqueValue *NormalResult;
// Executor on which to resume execution.
ExecutorRef ResumeExecutor;
};
static void resumeTaskAfterContinuation(AsyncTask *task,
AsyncContinuationContext *context) {
#if __APPLE__
// TODO: Enqueue the task on the specific executor in the continuation
// context.
//
// For now, just enqueue the task resumption on the global concurrent queue
// so that we're able to return back to the caller of resume.
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
^{
task->run(context->ResumeExecutor);
});
#else
swift_unreachable("not implemented");
#endif
}
}
SWIFT_CC(swift)
void swift::swift_continuation_resume(/* +1 */ OpaqueValue *result,
void *continuation,
const Metadata *resumeType) {
auto task = reinterpret_cast<AsyncTask*>(continuation);
auto context = reinterpret_cast<AsyncContinuationContext*>(task->ResumeContext);
resumeType->vw_initializeWithTake(context->NormalResult, result);
resumeTaskAfterContinuation(task, context);
}
SWIFT_CC(swift)
void swift::swift_continuation_throwingResume(/* +1 */ OpaqueValue *result,
void *continuation,
const Metadata *resumeType) {
return swift_continuation_resume(result, continuation, resumeType);
}
SWIFT_CC(swift)
void swift::swift_continuation_throwingResumeWithError(/* +1 */ SwiftError *error,
void *continuation,
const Metadata *resumeType) {
auto task = reinterpret_cast<AsyncTask*>(continuation);
auto context = reinterpret_cast<AsyncContinuationContext*>(task->ResumeContext);
context->ErrorResult = error;
resumeTaskAfterContinuation(task, context);
}