Merge pull request #36572 from mikeash/async-task-dispatch-integration2

[Concurrency] Make Job objects work as Dispatch objects.
This commit is contained in:
Mike Ash
2021-03-31 09:42:58 -04:00
committed by GitHub
10 changed files with 141 additions and 26 deletions

View File

@@ -55,10 +55,15 @@
#include "../CompatibilityOverride/CompatibilityOverride.h"
#include "swift/Runtime/Concurrency.h"
#include "swift/Runtime/EnvironmentVariables.h"
#include "TaskPrivate.h"
#include <dispatch/dispatch.h>
#if !defined(_WIN32)
#include <dlfcn.h>
#endif
using namespace swift;
SWIFT_CC(swift)
@@ -180,18 +185,78 @@ void swift::donateThreadToGlobalExecutorUntil(bool (*condition)(void *),
#else
// Ensure that Job's layout is compatible with what Dispatch expects.
// Note: MinimalDispatchObjectHeader just has the fields we care about, it is
// not complete and should not be used for anything other than these asserts.
struct MinimalDispatchObjectHeader {
const void *VTable;
void *Opaque;
void *Linkage;
};
static_assert(
offsetof(Job, metadata) == offsetof(MinimalDispatchObjectHeader, VTable),
"Job Metadata field must match location of Dispatch VTable field.");
static_assert(offsetof(Job, SchedulerPrivate[Job::DispatchLinkageIndex]) ==
offsetof(MinimalDispatchObjectHeader, Linkage),
"Dispatch Linkage field must match Job "
"SchedulerPrivate[DispatchLinkageIndex].");
/// The function passed to dispatch_async_f to execute a job.
static void __swift_run_job(void *_job) {
Job *job = (Job*) _job;
swift_job_run(job, ExecutorRef::generic());
auto metadata =
reinterpret_cast<const DispatchClassMetadata *>(job->metadata);
metadata->VTableInvoke(job, nullptr, 0);
}
/// A specialized version of __swift_run_job to execute the job on the main
/// executor.
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
static void __swift_run_job_main_executor(void *_job) {
Job *job = (Job*) _job;
swift_job_run(job, ExecutorRef::mainExecutor());
/// The type of a function pointer for enqueueing a Job object onto a dispatch
/// queue.
typedef void (*dispatchEnqueueFuncType)(dispatch_queue_t queue, void *obj,
dispatch_qos_class_t qos);
/// Initialize dispatchEnqueueFunc and then call through to the proper
/// implementation.
static void initializeDispatchEnqueueFunc(dispatch_queue_t queue, void *obj,
dispatch_qos_class_t qos);
/// A function pointer to the function used to enqueue a Job onto a dispatch
/// queue. Initially set to initializeDispatchEnqueueFunc, so that the first
/// call will initialize it. initializeDispatchEnqueueFunc sets it to point
/// either to dispatch_async_swift_job when it's available, otherwise to
/// dispatchEnqueueDispatchAsync.
static std::atomic<dispatchEnqueueFuncType> dispatchEnqueueFunc{
initializeDispatchEnqueueFunc};
/// A small adapter that dispatches a Job onto a queue using dispatch_async_f.
static void dispatchEnqueueDispatchAsync(dispatch_queue_t queue, void *obj,
dispatch_qos_class_t qos) {
dispatch_async_f(queue, obj, __swift_run_job);
}
static void initializeDispatchEnqueueFunc(dispatch_queue_t queue, void *obj,
dispatch_qos_class_t qos) {
dispatchEnqueueFuncType func = nullptr;
// Always fall back to plain dispatch_async_f on Windows for now.
#if !defined(_WIN32)
if (runtime::environment::concurrencyEnableJobDispatchIntegration())
func = reinterpret_cast<dispatchEnqueueFuncType>(
dlsym(RTLD_NEXT, "dispatch_async_swift_job"));
#endif
if (!func)
func = dispatchEnqueueDispatchAsync;
dispatchEnqueueFunc.store(func, std::memory_order_relaxed);
func(queue, obj, qos);
}
/// Enqueue a Job onto a dispatch queue using dispatchEnqueueFunc.
static void dispatchEnqueue(dispatch_queue_t queue, Job *job,
dispatch_qos_class_t qos, void *executorQueue) {
job->SchedulerPrivate[Job::DispatchQueueIndex] = executorQueue;
dispatchEnqueueFunc.load(std::memory_order_relaxed)(queue, job, qos);
}
static constexpr size_t globalQueueCacheCount =
@@ -259,14 +324,12 @@ static void swift_task_enqueueGlobalImpl(Job *job) {
// the priorities of work added to this queue using Dispatch's public
// API, but as discussed above, that is less important than avoiding
// performance problems.
dispatch_function_t dispatchFunction = &__swift_run_job;
void *dispatchContext = job;
JobPriority priority = job->getPriority();
auto queue = getGlobalQueue(priority);
dispatch_async_f(queue, dispatchContext, dispatchFunction);
dispatchEnqueue(queue, job, (dispatch_qos_class_t)priority,
DISPATCH_QUEUE_GLOBAL_EXECUTOR);
#endif
}
@@ -293,6 +356,9 @@ static void swift_task_enqueueGlobalWithDelayImpl(unsigned long long delay,
auto queue = getGlobalQueue(priority);
job->SchedulerPrivate[Job::DispatchQueueIndex] =
DISPATCH_QUEUE_GLOBAL_EXECUTOR;
dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delay);
dispatch_after_f(when, queue, dispatchContext, dispatchFunction);
#endif
@@ -317,13 +383,13 @@ static void swift_task_enqueueMainExecutorImpl(Job *job) {
insertIntoJobQueue(job);
#else
dispatch_function_t dispatchFunction = &__swift_run_job_main_executor;
void *dispatchContext = job;
JobPriority priority = job->getPriority();
// This is an inline function that compiles down to a pointer to a global.
auto mainQueue = dispatch_get_main_queue();
dispatch_async_f(mainQueue, dispatchContext, dispatchFunction);
dispatchEnqueue(mainQueue, job, (dispatch_qos_class_t)priority,
DISPATCH_QUEUE_MAIN_EXECUTOR);
#endif
}