mirror of
https://github.com/apple/swift.git
synced 2025-12-21 12:14:44 +01:00
Implement continuations in task-to-thread model.
This is done using a condition variable upon which the awaiting thread will block if the continuation has not be resumed by the point of await. The resuming thread will signal this condition variable, thereby unblocking the awaiting thread. Rdar://99977665
This commit is contained in:
@@ -25,6 +25,8 @@
|
||||
#include "swift/Runtime/Config.h"
|
||||
#include "swift/Runtime/VoucherShims.h"
|
||||
#include "swift/Basic/STLExtras.h"
|
||||
#include "swift/Threading/ConditionVariable.h"
|
||||
#include "swift/Threading/Mutex.h"
|
||||
#include "bitset"
|
||||
#include "queue" // TODO: remove and replace with our own mpsc
|
||||
|
||||
@@ -711,6 +713,16 @@ public:
|
||||
/// Public ABI.
|
||||
ExecutorRef ResumeToExecutor;
|
||||
|
||||
#if defined(SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY)
|
||||
/// In a task-to-thread model, instead of voluntarily descheduling the task
|
||||
/// from the thread, we will block the thread (and therefore task).
|
||||
/// This condition variable is lazily allocated on the stack only if the
|
||||
/// continuation has not been resumed by the point of await. The mutex in the
|
||||
/// condition variable is therefore not really protecting any state as all
|
||||
/// coordination is done via the AwaitSynchronization atomic
|
||||
ConditionVariable *Cond;
|
||||
#endif
|
||||
|
||||
void setErrorResult(SwiftError *error) {
|
||||
ErrorResult = error;
|
||||
}
|
||||
|
||||
@@ -690,14 +690,28 @@ IRGenModule::IRGenModule(IRGenerator &irgen,
|
||||
*this, "swift.async_task_and_context",
|
||||
{ SwiftTaskPtrTy, SwiftContextPtrTy });
|
||||
|
||||
ContinuationAsyncContextTy = createStructType(
|
||||
*this, "swift.continuation_context",
|
||||
{SwiftContextTy, // AsyncContext header
|
||||
SizeTy, // flags
|
||||
SizeTy, // await synchronization
|
||||
ErrorPtrTy, // error result pointer
|
||||
OpaquePtrTy, // normal result address
|
||||
SwiftExecutorTy}); // resume to executor
|
||||
if (Context.LangOpts.isConcurrencyModelTaskToThread()) {
|
||||
ContinuationAsyncContextTy = createStructType(
|
||||
*this, "swift.continuation_context",
|
||||
{SwiftContextTy, // AsyncContext header
|
||||
SizeTy, // flags
|
||||
SizeTy, // await synchronization
|
||||
ErrorPtrTy, // error result pointer
|
||||
OpaquePtrTy, // normal result address
|
||||
SwiftExecutorTy, // resume to executor
|
||||
SizeTy // pointer to condition variable
|
||||
});
|
||||
} else {
|
||||
ContinuationAsyncContextTy = createStructType(
|
||||
*this, "swift.continuation_context",
|
||||
{SwiftContextTy, // AsyncContext header
|
||||
SizeTy, // flags
|
||||
SizeTy, // await synchronization
|
||||
ErrorPtrTy, // error result pointer
|
||||
OpaquePtrTy, // normal result address
|
||||
SwiftExecutorTy // resume to executor
|
||||
});
|
||||
}
|
||||
ContinuationAsyncContextPtrTy =
|
||||
ContinuationAsyncContextTy->getPointerTo(DefaultAS);
|
||||
|
||||
|
||||
@@ -845,7 +845,7 @@ class DefaultActorImpl : public HeapObject {
|
||||
// the future
|
||||
alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
|
||||
#endif
|
||||
// TODO(rokhinip): Make this a flagset
|
||||
// TODO (rokhinip): Make this a flagset
|
||||
bool isDistributedRemoteActor;
|
||||
|
||||
public:
|
||||
|
||||
@@ -1311,7 +1311,9 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
|
||||
? ContinuationStatus::Awaited
|
||||
: ContinuationStatus::Pending,
|
||||
std::memory_order_relaxed);
|
||||
|
||||
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
||||
context->Cond = nullptr;
|
||||
#endif
|
||||
AsyncTask *task;
|
||||
|
||||
// A preawait immediately suspends the task.
|
||||
@@ -1351,8 +1353,7 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
|
||||
"awaiting a corrupt or already-awaited continuation");
|
||||
|
||||
// If the status is already Resumed, we can resume immediately.
|
||||
// Comparing against Pending may be very slightly more compact.
|
||||
if (oldStatus != ContinuationStatus::Pending) {
|
||||
if (oldStatus == ContinuationStatus::Resumed) {
|
||||
if (context->isExecutorSwitchForced())
|
||||
return swift_task_switch(context, context->ResumeParent,
|
||||
context->ResumeToExecutor);
|
||||
@@ -1364,19 +1365,57 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
|
||||
auto task = swift_task_getCurrent();
|
||||
#endif
|
||||
|
||||
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
||||
// In the task to thread model, we do not suspend the task that is waiting on
|
||||
// the continuation resumption. Instead we simply block the thread on a
|
||||
// condition variable keep the task alive on the thread.
|
||||
//
|
||||
// This condition variable can be allocated on the stack of the blocking
|
||||
// thread - with the address of it published to the resuming thread via the
|
||||
// context.
|
||||
ConditionVariable Cond;
|
||||
|
||||
context->Cond = &Cond;
|
||||
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
||||
// Flag the task as suspended.
|
||||
task->flagAsSuspended();
|
||||
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
||||
|
||||
// Try to transition to Awaited.
|
||||
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
||||
// If the cmpxchg is successful, the store release also publishes the write to
|
||||
// the Cond in the ContinuationAsyncContext to any concurrent accessing
|
||||
// thread.
|
||||
//
|
||||
// If it failed, then someone concurrently resumed the continuation in which
|
||||
// case, we don't care about publishing the Cond in the
|
||||
// ContinuationAsyncContext anyway.
|
||||
#endif
|
||||
// Try to transition to Awaited
|
||||
bool success =
|
||||
sync.compare_exchange_strong(oldStatus, ContinuationStatus::Awaited,
|
||||
/*success*/ std::memory_order_release,
|
||||
/*failure*/ std::memory_order_acquire);
|
||||
|
||||
// If that succeeded, we have nothing to do.
|
||||
if (success) {
|
||||
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
||||
// This lock really protects nothing but we need to hold it
|
||||
// while calling the condition wait
|
||||
Cond.lock();
|
||||
|
||||
// Condition variables can have spurious wakeups so we need to check this in
|
||||
// a do-while loop.
|
||||
do {
|
||||
Cond.wait();
|
||||
oldStatus = sync.load(std::memory_order_relaxed);
|
||||
} while (oldStatus != ContinuationStatus::Resumed);
|
||||
|
||||
Cond.unlock();
|
||||
#else
|
||||
// If that succeeded, we have nothing to do since we've successfully
|
||||
// suspended the task
|
||||
_swift_task_clearCurrent();
|
||||
return;
|
||||
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
||||
}
|
||||
|
||||
// If it failed, it should be because someone concurrently resumed
|
||||
@@ -1384,8 +1423,14 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
|
||||
assert(oldStatus == ContinuationStatus::Resumed &&
|
||||
"continuation was concurrently corrupted or awaited");
|
||||
|
||||
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
||||
// Since the condition variable is stack allocated, we don't need to do
|
||||
// anything here to clean up
|
||||
#else
|
||||
// Restore the running state of the task and resume it.
|
||||
task->flagAsRunning();
|
||||
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
||||
|
||||
if (context->isExecutorSwitchForced())
|
||||
return swift_task_switch(context, context->ResumeParent,
|
||||
context->ResumeToExecutor);
|
||||
@@ -1397,6 +1442,7 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
|
||||
continuationChecking::willResume(context);
|
||||
|
||||
auto &sync = context->AwaitSynchronization;
|
||||
|
||||
auto status = sync.load(std::memory_order_acquire);
|
||||
assert(status != ContinuationStatus::Resumed &&
|
||||
"continuation was already resumed");
|
||||
@@ -1405,27 +1451,41 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
|
||||
// restarting.
|
||||
_swift_tsan_release(static_cast<Job *>(task));
|
||||
|
||||
// The status should be either Pending or Awaited. If it's Awaited,
|
||||
// which is probably the most likely option, then we should immediately
|
||||
// enqueue; we don't need to update the state because there shouldn't
|
||||
// be a racing attempt to resume the continuation. If it's Pending,
|
||||
// we need to set it to Resumed; if that fails (with a strong cmpxchg),
|
||||
// it should be because the original thread concurrently set it to
|
||||
// Awaited, and so we need to enqueue.
|
||||
// The status should be either Pending or Awaited.
|
||||
//
|
||||
// Case 1: Status is Pending
|
||||
// No one has awaited us, we just need to set it to Resumed; if that fails
|
||||
// (with a strong cmpxchg), it should be because the original thread
|
||||
// concurrently set it to Awaited, in which case, we fall into Case 2.
|
||||
//
|
||||
// Case 2: Status is Awaited
|
||||
// This is probably the more frequently hit case.
|
||||
// In task-to-thread model, we update status to be Resumed and signal the
|
||||
// waiting thread. In regular model, we immediately enqueue the task and can
|
||||
// skip updates to the continuation state since there shouldn't be a racing
|
||||
// attempt to resume the continuation.
|
||||
if (status == ContinuationStatus::Pending &&
|
||||
sync.compare_exchange_strong(status, ContinuationStatus::Resumed,
|
||||
/*success*/ std::memory_order_release,
|
||||
/*failure*/ std::memory_order_relaxed)) {
|
||||
/*failure*/ std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
assert(status == ContinuationStatus::Awaited &&
|
||||
"detected concurrent attempt to resume continuation");
|
||||
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
|
||||
// If we see status == ContinuationStatus::Awaited, then we should also be
|
||||
// seeing a pointer to the cond var since we're doing a load acquire on sync
|
||||
// which pairs with the store release in swift_continuation_awaitImpl
|
||||
assert(context->Cond != nullptr);
|
||||
|
||||
sync.store(ContinuationStatus::Resumed, std::memory_order_relaxed);
|
||||
context->Cond->signal();
|
||||
#else
|
||||
// TODO: maybe in some mode we should set the status to Resumed here
|
||||
// to make a stronger best-effort attempt to catch racing attempts to
|
||||
// resume the continuation?
|
||||
|
||||
task->flagAsAndEnqueueOnExecutor(context->ResumeToExecutor);
|
||||
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
|
||||
}
|
||||
|
||||
SWIFT_CC(swift)
|
||||
|
||||
66
test/Concurrency/blocking_continuations.swift
Normal file
66
test/Concurrency/blocking_continuations.swift
Normal file
@@ -0,0 +1,66 @@
|
||||
// RUN: %target-run-simple-swift(-parse-as-library -Xfrontend -disable-availability-checking -Xfrontend -concurrency-model=task-to-thread -g -Xlinker -object_path_lto -Xlinker /tmp/abc.o)
|
||||
// REQUIRES: concurrency
|
||||
// REQUIRES: executable_test
|
||||
// REQUIRES: concurrency_runtime
|
||||
// REQUIRES: freestanding
|
||||
// UNSUPPORTED: threading_none
|
||||
|
||||
@_spi(_TaskToThreadModel) import _Concurrency
|
||||
import StdlibUnittest
|
||||
import Darwin
|
||||
|
||||
var globalContinuation : CheckedContinuation<Int, Never>? = nil
|
||||
|
||||
func waitOnContinuation(_unused : UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
|
||||
Task.runInline {
|
||||
let result = await withCheckedContinuation { continuation in
|
||||
globalContinuation = continuation
|
||||
}
|
||||
print("Continuation successfully resumed")
|
||||
expectEqual(result, 10)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func resumeContinuation(_unused : UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
|
||||
Task.runInline {
|
||||
while (globalContinuation == nil) {}
|
||||
globalContinuation!.resume(returning: 10)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@main struct Main {
|
||||
static func main() {
|
||||
|
||||
let tests = TestSuite("Continuations in task-to-thread")
|
||||
tests.test("Basic continuations - no blocking") {
|
||||
Task.runInline {
|
||||
await withCheckedContinuation { continuation in
|
||||
continuation.resume()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tests.test("Continuations - with blocking") {
|
||||
var thread1 : pthread_t? = nil
|
||||
guard pthread_create(&thread1, nil, waitOnContinuation, nil) == 0 else {
|
||||
fatalError("pthread_create failed")
|
||||
}
|
||||
|
||||
var thread2 : pthread_t? = nil
|
||||
guard pthread_create(&thread2, nil, resumeContinuation, nil) == 0 else {
|
||||
fatalError("pthread_create failed")
|
||||
}
|
||||
|
||||
guard pthread_join(thread1!, nil) == 0 else {
|
||||
fatalError("pthread_join failed")
|
||||
}
|
||||
guard pthread_join(thread2!, nil) == 0 else {
|
||||
fatalError("pthread_join failed")
|
||||
}
|
||||
}
|
||||
|
||||
runAllTests()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user