mirror of
https://github.com/apple/swift.git
synced 2025-12-21 12:14:44 +01:00
I have identified the following conceptual synchronization points at which task data and computation can cross thread boundaries. We need to model these in TSan to avoid false positives: Awaiting an async task (`AsyncTask::waitFuture`), which has two cases: 1) The task has completed (`AsyncTask::completeFuture`). Everything that happened during task execution "happened before" before the point where we access its result. We synchronize on the *awaited* task. 2) The task is still executing: the current execution is suspended and the waiting task is put into the list of "waiters". Once the awaited task completes, the waiters will be scheduled. In this case, we synchronize on the *waiting* task. Note: there is a similar relationship for task groups which I still have to investigate. I will follow-up with an additional patch and tests. Actor job execution (`swift::runJobInExecutorContext`): Job scheduling (`swift::swift_task_enqueue`) precedes/happens before job execution. Also all job executions (switching actors or suspend/resume) are serially ordered. Note: the happens-before edge for schedule->execute isn't strictly needed in most cases since scheduling calls through to libdispatch's `dispatch_async_f`, which we already intercept and model in TSan. However, I am trying to model Swift Task semantics to increase the chance of things to continue to work in case the "task backend" is switched out. rdar://74256733
1453 lines
50 KiB
C++
1453 lines
50 KiB
C++
///===--- Actor.cpp - Standard actor implementation ------------------------===///
|
|
///
|
|
/// This source file is part of the Swift.org open source project
|
|
///
|
|
/// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
|
|
/// Licensed under Apache License v2.0 with Runtime Library Exception
|
|
///
|
|
/// See https:///swift.org/LICENSE.txt for license information
|
|
/// See https:///swift.org/CONTRIBUTORS.txt for the list of Swift project authors
|
|
///
|
|
///===----------------------------------------------------------------------===///
|
|
///
|
|
/// The default actor implementation for Swift actors, plus related
|
|
/// routines such as generic executor enqueuing and switching.
|
|
///
|
|
///===----------------------------------------------------------------------===///
|
|
|
|
#include "swift/Runtime/Concurrency.h"
|
|
|
|
#include "swift/Runtime/Atomic.h"
|
|
#include "swift/Runtime/Casting.h"
|
|
#include "swift/Runtime/Mutex.h"
|
|
#include "swift/Runtime/ThreadLocal.h"
|
|
#include "swift/ABI/Task.h"
|
|
#include "swift/ABI/Actor.h"
|
|
#include "llvm/ADT/PointerIntPair.h"
|
|
#include "TaskPrivate.h"
|
|
|
|
using namespace swift;
|
|
|
|
/// Should we yield the thread?
|
|
static bool shouldYieldThread() {
|
|
// FIXME: system scheduler integration
|
|
return false;
|
|
}
|
|
|
|
/*****************************************************************************/
|
|
/******************************* TASK TRACKING ******************************/
|
|
/*****************************************************************************/
|
|
|
|
namespace {
|
|
|
|
/// An extremely silly class which exists to make pointer
|
|
/// default-initialization constexpr.
|
|
template <class T> struct Pointer {
|
|
T *Value;
|
|
constexpr Pointer() : Value(nullptr) {}
|
|
constexpr Pointer(T *value) : Value(value) {}
|
|
operator T *() const { return Value; }
|
|
T *operator->() const { return Value; }
|
|
};
|
|
|
|
/// A class which encapsulates the information we track about
|
|
/// the current thread and active executor.
|
|
class ExecutorTrackingInfo {
|
|
/// A thread-local variable pointing to the active tracking
|
|
/// information about the current thread, if any.
|
|
///
|
|
/// TODO: this is obviously runtime-internal and therefore not
|
|
/// reasonable to make ABI. We might want to also provide a way
|
|
/// for generated code to efficiently query the identity of the
|
|
/// current executor, in order to do a cheap comparison to avoid
|
|
/// doing all the work to suspend the task when we're already on
|
|
/// the right executor. It would make sense for that to be a
|
|
/// separate thread-local variable (or whatever is most efficient
|
|
/// on the target platform).
|
|
static SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(Pointer<ExecutorTrackingInfo>,
|
|
ActiveInfoInThread);
|
|
|
|
/// The active executor.
|
|
ExecutorRef ActiveExecutor = ExecutorRef::generic();
|
|
|
|
/// The tracking info that was active when this one was entered.
|
|
ExecutorTrackingInfo *SavedInfo;
|
|
|
|
public:
|
|
ExecutorTrackingInfo() = default;
|
|
|
|
ExecutorTrackingInfo(const ExecutorTrackingInfo &) = delete;
|
|
ExecutorTrackingInfo &operator=(const ExecutorTrackingInfo &) = delete;
|
|
|
|
/// Unconditionally initialize a fresh tracking state on the
|
|
/// current state, shadowing any previous tracking state.
|
|
/// leave() must be called beforet the object goes out of scope.
|
|
void enterAndShadow(ExecutorRef currentExecutor) {
|
|
ActiveExecutor = currentExecutor;
|
|
SavedInfo = ActiveInfoInThread.get();
|
|
ActiveInfoInThread.set(this);
|
|
}
|
|
|
|
/// Initialize a tracking state on the current thread if there
|
|
/// isn't one already, or else update the current tracking state.
|
|
///
|
|
/// Returns a pointer to the active tracking info. If this is the
|
|
/// same as the object on which this was called, leave() must be
|
|
/// called before the object goes out of scope.
|
|
ExecutorTrackingInfo *enterOrUpdate(ExecutorRef currentExecutor) {
|
|
if (auto activeInfo = ActiveInfoInThread.get()) {
|
|
activeInfo->ActiveExecutor = currentExecutor;
|
|
return activeInfo;
|
|
}
|
|
|
|
ActiveExecutor = currentExecutor;
|
|
SavedInfo = nullptr;
|
|
ActiveInfoInThread.set(this);
|
|
return this;
|
|
}
|
|
|
|
ExecutorRef getActiveExecutor() const {
|
|
return ActiveExecutor;
|
|
}
|
|
|
|
void leave() {
|
|
ActiveInfoInThread.set(SavedInfo);
|
|
}
|
|
};
|
|
|
|
class ActiveTask {
|
|
/// A thread-local variable pointing to the active tracking
|
|
/// information about the current thread, if any.
|
|
static SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(Pointer<AsyncTask>, Value);
|
|
|
|
public:
|
|
static void set(AsyncTask *task) { Value.set(task); }
|
|
static AsyncTask *get() { return Value.get(); }
|
|
};
|
|
|
|
/// Define the thread-locals.
|
|
SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(
|
|
Pointer<ExecutorTrackingInfo>,
|
|
ExecutorTrackingInfo::ActiveInfoInThread);
|
|
SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(
|
|
Pointer<AsyncTask>,
|
|
ActiveTask::Value);
|
|
|
|
} // end anonymous namespace
|
|
|
|
AsyncTask*
|
|
swift::swift_task_get_active() {
|
|
return ActiveTask::get();
|
|
}
|
|
|
|
void swift::swift_job_run(Job *job, ExecutorRef executor) {
|
|
ExecutorTrackingInfo trackingInfo;
|
|
trackingInfo.enterAndShadow(executor);
|
|
|
|
runJobInExecutorContext(job, executor);
|
|
|
|
trackingInfo.leave();
|
|
}
|
|
|
|
void swift::runJobInExecutorContext(Job *job, ExecutorRef executor) {
|
|
_swift_tsan_acquire(job);
|
|
|
|
if (auto task = dyn_cast<AsyncTask>(job)) {
|
|
// Update the active task in the current thread.
|
|
ActiveTask::set(task);
|
|
|
|
// FIXME: update the task status to say that it's running
|
|
// on the current thread. If the task suspends itself to run
|
|
// on an actor, it should update the task status appropriately;
|
|
// we don't need to update it afterwards.
|
|
|
|
task->runInFullyEstablishedContext(executor);
|
|
|
|
// Clear the active task.
|
|
ActiveTask::set(nullptr);
|
|
} else {
|
|
// There's no extra bookkeeping to do for simple jobs.
|
|
job->runSimpleInFullyEstablishedContext(executor);
|
|
}
|
|
|
|
_swift_tsan_release(job);
|
|
}
|
|
|
|
AsyncTask *swift::swift_task_getCurrent() {
|
|
return ActiveTask::get();
|
|
}
|
|
|
|
void swift::_swift_task_clearCurrent() {
|
|
ActiveTask::set(nullptr);
|
|
}
|
|
|
|
/*****************************************************************************/
|
|
/*********************** DEFAULT ACTOR IMPLEMENTATION ************************/
|
|
/*****************************************************************************/
|
|
|
|
namespace {
|
|
|
|
class DefaultActorImpl;
|
|
|
|
/// A job to process a default actor. Allocated inline in the actor.
|
|
class ProcessInlineJob : public Job {
|
|
public:
|
|
ProcessInlineJob(JobPriority priority)
|
|
: Job({JobKind::DefaultActorInline, priority}, &process) {}
|
|
|
|
SWIFT_CC(swiftasync)
|
|
static void process(Job *job, ExecutorRef executor);
|
|
|
|
static bool classof(const Job *job) {
|
|
return job->Flags.getKind() == JobKind::DefaultActorInline;
|
|
}
|
|
};
|
|
|
|
/// A job to process a default actor that's allocated separately from
|
|
/// the actor but doesn't need the override mechanics.
|
|
class ProcessOutOfLineJob : public Job {
|
|
DefaultActorImpl *Actor;
|
|
public:
|
|
ProcessOutOfLineJob(DefaultActorImpl *actor, JobPriority priority)
|
|
: Job({JobKind::DefaultActorSeparate, priority}, &process),
|
|
Actor(actor) {}
|
|
|
|
SWIFT_CC(swiftasync)
|
|
static void process(Job *job, ExecutorRef executor);
|
|
|
|
static bool classof(const Job *job) {
|
|
return job->Flags.getKind() == JobKind::DefaultActorSeparate;
|
|
}
|
|
};
|
|
|
|
/// A job to process a default actor with a new priority; allocated
|
|
/// separately from the actor.
|
|
class ProcessOverrideJob;
|
|
|
|
/// Information about the currently-running processing job.
|
|
struct RunningJobInfo {
|
|
enum KindType : uint8_t {
|
|
Inline, Override, Other
|
|
};
|
|
KindType Kind;
|
|
JobPriority Priority;
|
|
ProcessOverrideJob *OverrideJob;
|
|
|
|
bool wasInlineJob() const {
|
|
return Kind == Inline;
|
|
}
|
|
|
|
static RunningJobInfo forOther(JobPriority priority) {
|
|
return {Other, priority, nullptr};
|
|
}
|
|
static RunningJobInfo forInline(JobPriority priority) {
|
|
return {Inline, priority, nullptr};
|
|
}
|
|
static RunningJobInfo forOverride(ProcessOverrideJob *job);
|
|
|
|
void setAbandoned();
|
|
void setRunning();
|
|
bool waitForActivation();
|
|
};
|
|
|
|
class JobRef {
|
|
enum : uintptr_t {
|
|
NeedsPreprocessing = 0x1,
|
|
IsOverride = 0x2,
|
|
JobMask = ~uintptr_t(NeedsPreprocessing | IsOverride)
|
|
};
|
|
|
|
/// A Job* that may have one of the two bits above mangled into it.
|
|
uintptr_t Value;
|
|
|
|
JobRef(Job *job, unsigned flags)
|
|
: Value(reinterpret_cast<uintptr_t>(job) | flags) {}
|
|
public:
|
|
constexpr JobRef() : Value(0) {}
|
|
|
|
/// Return a reference to a job that's been properly preprocessed.
|
|
static JobRef getPreprocessed(Job *job) {
|
|
/// We allow null pointers here.
|
|
return { job, 0 };
|
|
}
|
|
|
|
/// Return a reference to a job that hasn't been preprocesssed yet.
|
|
static JobRef getUnpreprocessed(Job *job) {
|
|
assert(job && "passing a null job");
|
|
return { job, NeedsPreprocessing };
|
|
}
|
|
|
|
/// Return a reference to an override job, which needs special
|
|
/// treatment during preprocessing.
|
|
static JobRef getOverride(ProcessOverrideJob *job);
|
|
|
|
/// Is this a null reference?
|
|
operator bool() const { return Value != 0; }
|
|
|
|
/// Does this job need to be pre-processed before we can treat
|
|
/// the job queue as a proper queue?
|
|
bool needsPreprocessing() const {
|
|
return Value & NeedsPreprocessing;
|
|
}
|
|
|
|
/// Is this an unpreprocessed override job?
|
|
bool isOverride() const {
|
|
return Value & IsOverride;
|
|
}
|
|
|
|
/// Given that this is an override job, return it.
|
|
ProcessOverrideJob *getAsOverride() const {
|
|
assert(isOverride());
|
|
return reinterpret_cast<ProcessOverrideJob*>(Value & JobMask);
|
|
}
|
|
ProcessOverrideJob *getAsPreprocessedOverride() const;
|
|
|
|
Job *getAsJob() const {
|
|
assert(!isOverride());
|
|
return reinterpret_cast<Job*>(Value & JobMask);
|
|
}
|
|
Job *getAsPreprocessedJob() const {
|
|
assert(!isOverride() && !needsPreprocessing());
|
|
return reinterpret_cast<Job*>(Value);
|
|
}
|
|
|
|
bool operator==(JobRef other) const {
|
|
return Value == other.Value;
|
|
}
|
|
bool operator!=(JobRef other) const {
|
|
return Value != other.Value;
|
|
}
|
|
};
|
|
|
|
/// The default actor implementation.
|
|
///
|
|
/// Ownership of the actor is subtle. Jobs are assumed to keep the actor
|
|
/// alive as long as they're executing on it; this allows us to avoid
|
|
/// retaining and releasing whenever threads are scheduled to run a job.
|
|
/// While jobs are enqueued on the actor, there is a conceptual shared
|
|
/// ownership of the currently-enqueued jobs which is passed around
|
|
/// between threads and processing jobs and managed using extra retains
|
|
/// and releases of the actor. The basic invariant is as follows:
|
|
///
|
|
/// - Let R be 1 if there are jobs enqueued on the actor or if a job
|
|
/// is currently running on the actor; otherwise let R be 0.
|
|
/// - Let N be the number of active processing jobs for the actor.
|
|
/// - N >= R
|
|
/// - There are N - R extra retains of the actor.
|
|
///
|
|
/// We can think of this as there being one "owning" processing job
|
|
/// and K "extra" jobs. If there is a processing job that is actively
|
|
/// running the actor, it is always the owning job; otherwise, any of
|
|
/// the N jobs may win the race to become the owning job.
|
|
///
|
|
/// We then have the following ownership rules:
|
|
///
|
|
/// - When we enqueue the first job on an actor, then R becomes 1, and
|
|
/// we must create a processing job so that N >= R. We do not need to
|
|
/// retain the actor.
|
|
/// - When we create an extra job to process an actor (e.g. because of
|
|
/// priority overrides), N increases but R remains the same. We must
|
|
/// retain the actor.
|
|
/// - When we start running an actor, our job definitively becomes the
|
|
/// owning job, but neither N nor R changes. We do not need to retain
|
|
/// the actor.
|
|
/// - When we go to start running an actor and for whatever reason we
|
|
/// don't actually do so, we are eliminating an extra processing job,
|
|
/// and so N decreases but R remains the same. We must release the
|
|
/// actor.
|
|
/// - When we are running an actor and give it up, and there are no
|
|
/// remaining jobs on it, then R becomes 0 and N decreases by 1.
|
|
/// We do not need to release the actor.
|
|
/// - When we are running an actor and give it up, and there are jobs
|
|
/// remaining on it, then R remains 1 but N is decreasing by 1.
|
|
/// We must either release the actor or create a new processing job
|
|
/// for it to maintain the balance.
|
|
class DefaultActorImpl : public HeapObject {
|
|
enum class Status {
|
|
/// The actor is not currently scheduled. Completely redundant
|
|
/// with the job list being empty.
|
|
Idle,
|
|
|
|
/// There is currently a job scheduled to process the actor at the
|
|
/// stored max priority.
|
|
Scheduled,
|
|
|
|
/// There is currently a thread processing the actor at the stored
|
|
/// max priority.
|
|
Running
|
|
};
|
|
|
|
struct Flags : public FlagSet<size_t> {
|
|
enum : size_t {
|
|
Status_offset = 0,
|
|
Status_width = 2,
|
|
|
|
HasActiveInlineJob = 2,
|
|
|
|
MaxPriority = 8,
|
|
MaxPriority_width = JobFlags::Priority_width,
|
|
|
|
// FIXME: add a reference to the running thread ID so that we
|
|
// can boost priorities.
|
|
};
|
|
|
|
/// What is the current high-level status of this actor?
|
|
FLAGSET_DEFINE_FIELD_ACCESSORS(Status_offset, Status_width, Status,
|
|
getStatus, setStatus)
|
|
|
|
/// Is there currently an active processing job allocated inline
|
|
/// in the actor?
|
|
FLAGSET_DEFINE_FLAG_ACCESSORS(HasActiveInlineJob,
|
|
hasActiveInlineJob, setHasActiveInlineJob)
|
|
|
|
/// What is the maximum priority of jobs that are currently running
|
|
/// or enqueued on this actor?
|
|
///
|
|
/// Note that the above isn't quite correct: we don't actually
|
|
/// lower this after we finish processing higher-priority tasks.
|
|
/// (Doing so introduces some subtleties around kicking off
|
|
/// lower-priority processing jobs.)
|
|
FLAGSET_DEFINE_FIELD_ACCESSORS(MaxPriority, MaxPriority_width,
|
|
JobPriority,
|
|
getMaxPriority, setMaxPriority)
|
|
};
|
|
|
|
/// This is designed to fit into two words, which can generally be
|
|
/// done lock-free on all our supported platforms.
|
|
struct alignas(2 * sizeof(void*)) State {
|
|
JobRef FirstJob;
|
|
struct Flags Flags;
|
|
};
|
|
|
|
swift::atomic<State> CurrentState;
|
|
|
|
friend class ProcessInlineJob;
|
|
union {
|
|
ProcessInlineJob JobStorage;
|
|
};
|
|
|
|
public:
|
|
|
|
/// Properly construct an actor, except for the heap header.
|
|
void initialize() {
|
|
new (&CurrentState) std::atomic<State>(State{JobRef(), Flags()});
|
|
}
|
|
|
|
/// Properly destruct an actor, except for the heap header.
|
|
void destroy() {
|
|
assert(CurrentState.load(std::memory_order_relaxed).Flags.getStatus()
|
|
== Status::Idle && "actor not idle during destruction?");
|
|
}
|
|
|
|
/// Add a job to this actor.
|
|
void enqueue(Job *job);
|
|
|
|
/// Take over running this actor in the current thread, if possible.
|
|
bool tryAssumeThread(RunningJobInfo runner);
|
|
|
|
/// Give up running this actor in the current thread.
|
|
void giveUpThread(RunningJobInfo runner);
|
|
|
|
/// Claim the next job off the actor or give it up.
|
|
Job *claimNextJobOrGiveUp(bool actorIsOwned, RunningJobInfo runner);
|
|
|
|
private:
|
|
/// Schedule an inline processing job. This can generally only be
|
|
/// done if we know nobody else is trying to do it at the same time,
|
|
/// e.g. if this thread just sucessfully transitioned the actor from
|
|
/// Idle to Scheduled.
|
|
void scheduleNonOverrideProcessJob(JobPriority priority,
|
|
bool hasActiveInlineJob);
|
|
|
|
static DefaultActorImpl *fromInlineJob(Job *job) {
|
|
assert(isa<ProcessInlineJob>(job));
|
|
#pragma clang diagnostic push
|
|
#pragma clang diagnostic ignored "-Winvalid-offsetof"
|
|
return reinterpret_cast<DefaultActorImpl*>(
|
|
reinterpret_cast<char*>(job) - offsetof(DefaultActorImpl, JobStorage));
|
|
#pragma clang diagnostic pop
|
|
}
|
|
|
|
class OverrideJobCache {
|
|
ProcessOverrideJob *Job = nullptr;
|
|
bool IsNeeded = false;
|
|
#ifndef NDEBUG
|
|
bool WasCommitted = false;
|
|
#endif
|
|
public:
|
|
OverrideJobCache() = default;
|
|
OverrideJobCache(const OverrideJobCache &) = delete;
|
|
OverrideJobCache &operator=(const OverrideJobCache &) = delete;
|
|
~OverrideJobCache() {
|
|
assert(WasCommitted && "didn't commit override job!");
|
|
}
|
|
|
|
void addToState(DefaultActorImpl *actor, State &newState);
|
|
void setNotNeeded() { IsNeeded = false; }
|
|
void commit();
|
|
};
|
|
};
|
|
|
|
} /// end anonymous namespace
|
|
|
|
static_assert(sizeof(DefaultActorImpl) <= sizeof(DefaultActor) &&
|
|
alignof(DefaultActorImpl) <= alignof(DefaultActor),
|
|
"DefaultActorImpl doesn't fit in DefaultActor");
|
|
|
|
static DefaultActorImpl *asImpl(DefaultActor *actor) {
|
|
return reinterpret_cast<DefaultActorImpl*>(actor);
|
|
}
|
|
|
|
static DefaultActor *asAbstract(DefaultActorImpl *actor) {
|
|
return reinterpret_cast<DefaultActor*>(actor);
|
|
}
|
|
|
|
/*****************************************************************************/
|
|
/*********************** DEFAULT ACTOR IMPLEMENTATION ************************/
|
|
/*****************************************************************************/
|
|
|
|
/// Given that a job is enqueued normally on a default actor, get/set
|
|
/// the next job in the actor's queue.
|
|
///
|
|
/// Note that this must not be used on the override jobs that can appear
|
|
/// in the queue; those jobs are not actually in the actor's queue (they're
|
|
/// on the global execution queues). So the actor's actual queue flows
|
|
/// through the NextJob field on those objects rather than through
|
|
/// the SchedulerPrivate fields.
|
|
static JobRef getNextJobInQueue(Job *job) {
|
|
return *reinterpret_cast<JobRef*>(job->SchedulerPrivate);
|
|
}
|
|
static void setNextJobInQueue(Job *job, JobRef next) {
|
|
*reinterpret_cast<JobRef*>(job->SchedulerPrivate) = next;
|
|
}
|
|
|
|
/// Schedule a processing job that doesn't have to be an override job.
|
|
///
|
|
/// We can either do this with inline storage or heap-allocated.
|
|
/// To ues inline storage, we need to verify that the hasActiveInlineJob
|
|
/// flag is not set in the state and then successfully set it. The
|
|
/// argument reports that this has happened correctly.
|
|
///
|
|
/// We should only schedule a non-override processing job at all if
|
|
/// we're transferring ownership of the jobs in it; see the ownership
|
|
/// comment on DefaultActorImpl.
|
|
void DefaultActorImpl::scheduleNonOverrideProcessJob(JobPriority priority,
|
|
bool hasActiveInlineJob) {
|
|
Job *job;
|
|
if (hasActiveInlineJob) {
|
|
job = new ProcessOutOfLineJob(this, priority);
|
|
} else {
|
|
job = new (&JobStorage) ProcessInlineJob(priority);
|
|
}
|
|
swift_task_enqueueGlobal(job);
|
|
}
|
|
|
|
|
|
namespace {
|
|
|
|
/// A job to process a specific default actor at a higher priority than
|
|
/// it was previously running at.
|
|
///
|
|
/// When an override job is successfully registered with an actor
|
|
/// (not enqueued there), the thread processing the actor and the
|
|
/// thread processing the override job coordinate by each calling
|
|
/// one of a set of methods on the object.
|
|
class ProcessOverrideJob : public Job {
|
|
DefaultActorImpl *Actor;
|
|
|
|
ConditionVariable::Mutex Lock;
|
|
ConditionVariable Queue;
|
|
|
|
/// Has the actor made a decision about this job yet?
|
|
bool IsResolvedByActor = false;
|
|
|
|
/// Has the job made a decision about itself yet?
|
|
bool IsResolvedByJob = false;
|
|
|
|
/// Has this job been abandoned?
|
|
bool IsAbandoned = false;
|
|
|
|
public:
|
|
/// SchedulerPrivate in an override job is used for actually scheduling
|
|
/// the job, so the actor queue goes through this instead.
|
|
///
|
|
/// We also use this temporarily for the list of override jobs on
|
|
/// the actor that we need to wake up.
|
|
JobRef NextJob;
|
|
|
|
public:
|
|
ProcessOverrideJob(DefaultActorImpl *actor, JobPriority priority,
|
|
JobRef nextJob)
|
|
: Job({JobKind::DefaultActorOverride, priority}, &process),
|
|
Actor(actor), NextJob(nextJob) {}
|
|
|
|
DefaultActorImpl *getActor() const { return Actor; }
|
|
|
|
/// Called by the job to notify the actor that the job has chosen
|
|
/// to abandon its work. This is irrevocable: the job is not going
|
|
/// to have a thread behind it.
|
|
///
|
|
/// This may delete the job or cause it to be deleted on another thread.
|
|
void setAbandoned() {
|
|
bool shouldDelete = false;
|
|
Lock.withLock([&] {
|
|
assert(!IsResolvedByJob && "job already resolved itself");
|
|
IsResolvedByJob = true;
|
|
IsAbandoned = true;
|
|
shouldDelete = IsResolvedByJob && IsResolvedByActor;
|
|
});
|
|
if (shouldDelete) delete this;
|
|
}
|
|
|
|
/// Called by the job to notify the actor that the job has successfully
|
|
/// taken over the actor and is now running it.
|
|
///
|
|
/// This may delete the job object or cause it to be deleted on
|
|
/// another thread.
|
|
void setRunning() {
|
|
bool shouldDelete = false;
|
|
Lock.withLock([&] {
|
|
assert(!IsResolvedByJob && "job already resolved itself");
|
|
IsResolvedByJob = true;
|
|
shouldDelete = IsResolvedByJob && IsResolvedByActor;
|
|
});
|
|
if (shouldDelete) delete this;
|
|
}
|
|
|
|
/// Called by the job to wait for the actor to resolve what the job
|
|
/// should do.
|
|
bool waitForActivation() {
|
|
bool isActivated = false;
|
|
Lock.withLockOrWait(Queue, [&] {
|
|
assert(!IsResolvedByJob && "job already resolved itself");
|
|
if (IsResolvedByActor) {
|
|
isActivated = !IsAbandoned;
|
|
IsResolvedByJob = true;
|
|
return true;
|
|
}
|
|
return false;
|
|
});
|
|
delete this;
|
|
return isActivated;
|
|
}
|
|
|
|
/// Called by the actor to notify this job that the actor thinks it
|
|
/// should try to take over the actor. It's okay if that doesn't
|
|
/// succeed (as long as that's because some other job is going to
|
|
/// take over).
|
|
///
|
|
/// This may delete the job or cause it to be deleted on another
|
|
/// thread.
|
|
bool wakeAndActivate() {
|
|
bool shouldDelete = false;
|
|
bool mayHaveBeenActivated = false;
|
|
Lock.withLockThenNotifyAll(Queue, [&] {
|
|
assert(!IsResolvedByActor && "actor already resolved this sjob");
|
|
IsResolvedByActor = true;
|
|
mayHaveBeenActivated = IsResolvedByJob && !IsAbandoned;
|
|
shouldDelete = IsResolvedByJob && IsResolvedByActor;
|
|
});
|
|
if (shouldDelete) delete this;
|
|
return mayHaveBeenActivated;
|
|
}
|
|
|
|
/// Called by the actor to notify this job that the actor does not
|
|
/// think it should try to take over the actor. It's okay if the
|
|
/// job successfully takes over the actor anyway.
|
|
///
|
|
/// This may delete the job or cause it to be deleted on another
|
|
/// thread.
|
|
void wakeAndAbandon() {
|
|
bool shouldDelete = false;
|
|
Lock.withLockThenNotifyAll(Queue, [&] {
|
|
assert(!IsResolvedByActor && "actor already resolved this job");
|
|
IsResolvedByActor = true;
|
|
IsAbandoned = true;
|
|
shouldDelete = IsResolvedByJob && IsResolvedByActor;
|
|
});
|
|
if (shouldDelete) delete this;
|
|
}
|
|
|
|
SWIFT_CC(swiftasync)
|
|
static void process(Job *job, ExecutorRef _executor);
|
|
|
|
static bool classof(const Job *job) {
|
|
return job->Flags.getKind() == JobKind::DefaultActorOverride;
|
|
}
|
|
};
|
|
|
|
} /// end anonymous namespace
|
|
|
|
JobRef JobRef::getOverride(ProcessOverrideJob *job) {
|
|
return JobRef(job, NeedsPreprocessing | IsOverride);
|
|
}
|
|
ProcessOverrideJob *JobRef::getAsPreprocessedOverride() const {
|
|
return cast_or_null<ProcessOverrideJob>(getAsPreprocessedJob());
|
|
}
|
|
RunningJobInfo RunningJobInfo::forOverride(ProcessOverrideJob *job) {
|
|
return {Override, job->getPriority(), job};
|
|
}
|
|
|
|
/// Flag that the current processing job has been abandoned
|
|
/// and will not be running the actor.
|
|
void RunningJobInfo::setAbandoned() {
|
|
if (OverrideJob) {
|
|
OverrideJob->setAbandoned();
|
|
OverrideJob = nullptr;
|
|
}
|
|
}
|
|
|
|
/// Flag that the current processing job is now running the actor.
|
|
void RunningJobInfo::setRunning() {
|
|
if (OverrideJob) {
|
|
OverrideJob->setRunning();
|
|
OverrideJob = nullptr;
|
|
}
|
|
}
|
|
|
|
/// Try to wait for the current processing job to be activated,
|
|
/// if that's possible. It's okay to call this multiple times
|
|
/// (or to call setAbandoned/setRunning after it) as long as
|
|
/// it's all on a single value.
|
|
bool RunningJobInfo::waitForActivation() {
|
|
if (Kind == Override) {
|
|
// If we don't have an override job, it's because we've already
|
|
// waited for activation successfully.
|
|
if (!OverrideJob) return true;
|
|
|
|
bool result = OverrideJob->waitForActivation();
|
|
OverrideJob = nullptr;
|
|
return result;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/// Wake all the overrides in the given list, activating the first
|
|
/// that exactly matches the target priority, if any.
|
|
static void wakeOverrides(ProcessOverrideJob *nextOverride,
|
|
Optional<JobPriority> targetPriority) {
|
|
bool hasAlreadyActivated = false;
|
|
while (nextOverride) {
|
|
// We have to advance to the next override before we call one of
|
|
// the wake methods because they can delete the job immediately
|
|
// (and even if they don't, we'd still be racing with deletion).
|
|
auto cur = nextOverride;
|
|
nextOverride = cur->NextJob.getAsPreprocessedOverride();
|
|
|
|
if (hasAlreadyActivated ||
|
|
!targetPriority ||
|
|
cur->getPriority() != *targetPriority)
|
|
cur->wakeAndAbandon();
|
|
else
|
|
hasAlreadyActivated = cur->wakeAndActivate();
|
|
}
|
|
}
|
|
|
|
/// Flag that an override job is needed and create it.
|
|
void DefaultActorImpl::OverrideJobCache::addToState(DefaultActorImpl *actor,
|
|
State &newState) {
|
|
IsNeeded = true;
|
|
auto newPriority = newState.Flags.getMaxPriority();
|
|
auto nextJob = newState.FirstJob;
|
|
if (Job) {
|
|
Job->Flags.setPriority(newPriority);
|
|
Job->NextJob = nextJob;
|
|
} else {
|
|
// Override jobs are always "extra" from the perspective of our
|
|
// ownership rules and so require a retain of the actor. We must
|
|
// do this before changing the actor state because other jobs may
|
|
// race to release the actor as soon as we change the actor state.
|
|
swift_retain(actor);
|
|
Job = new ProcessOverrideJob(actor, newPriority, nextJob);
|
|
}
|
|
newState.FirstJob = JobRef::getOverride(Job);
|
|
}
|
|
|
|
/// Schedule the override job if we created one and still need it.
|
|
/// If we created one but didn't end up needing it (which can happen
|
|
/// with a race to override), destroy it.
|
|
void DefaultActorImpl::OverrideJobCache::commit() {
|
|
#ifndef NDEBUG
|
|
assert(!WasCommitted && "committing override job multiple timee");
|
|
WasCommitted = true;
|
|
#endif
|
|
|
|
if (Job) {
|
|
if (IsNeeded) {
|
|
swift_task_enqueueGlobal(Job);
|
|
} else {
|
|
swift_release(Job->getActor());
|
|
delete Job;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Preprocess the prefix of the actor's queue that hasn't already
|
|
/// been preprocessed:
|
|
///
|
|
/// - Split the jobs into registered overrides and actual jobs.
|
|
/// - Append the actual jobs to any already-preprocessed job list.
|
|
///
|
|
/// The returned job should become the new root of the job queue
|
|
/// (or may be immediately dequeued, in which its successor should).
|
|
/// All of the jobs in this list are guaranteed to be non-override jobs.
|
|
static Job *preprocessQueue(JobRef first,
|
|
JobRef previousFirst,
|
|
Job *previousFirstNewJob,
|
|
ProcessOverrideJob *&overridesToWake) {
|
|
assert(previousFirst || previousFirstNewJob == nullptr);
|
|
|
|
if (!first.needsPreprocessing())
|
|
return first.getAsPreprocessedJob();
|
|
|
|
Job *firstNewJob = nullptr;
|
|
|
|
while (first != previousFirst) {
|
|
// If we find something that doesn't need preprocessing, it must've
|
|
// been left by a previous queue-processing, which means that
|
|
// this must be our first attempt to preprocess in this processing.
|
|
// Just treat the queue from this point as a well-formed whole
|
|
// to which we need to add any new items we might've just found.
|
|
if (!first.needsPreprocessing()) {
|
|
assert(!previousFirst && !previousFirstNewJob);
|
|
previousFirstNewJob = first.getAsPreprocessedJob();
|
|
break;
|
|
}
|
|
|
|
// If the job is an override, add it to the list of override jobs
|
|
// that we need to wake up. Note that the list of override jobs
|
|
// flows through NextJob; we must not use getNextJobInQueue because
|
|
// that touches queue-private state, and the override job is
|
|
// not enqueued on the actor, merely registered with it.
|
|
if (first.isOverride()) {
|
|
auto overrideJob = first.getAsOverride();
|
|
first = overrideJob->NextJob;
|
|
overrideJob->NextJob = JobRef::getPreprocessed(overridesToWake);
|
|
overridesToWake = overrideJob;
|
|
continue;
|
|
}
|
|
|
|
// If the job isn't an override, add it to the front of the list of
|
|
// jobs we're building up. Note that this reverses the order of
|
|
// jobs; since enqueue() always adds jobs to the front, reversing
|
|
// the order effectively makes the actor queue FIFO, which is what
|
|
// we want.
|
|
// FIXME: but we should also sort by priority
|
|
auto job = first.getAsJob();
|
|
first = getNextJobInQueue(job);
|
|
setNextJobInQueue(job, JobRef::getPreprocessed(firstNewJob));
|
|
firstNewJob = job;
|
|
}
|
|
|
|
// If there are jobs already in the queue, put the new jobs at the end.
|
|
if (!firstNewJob) {
|
|
firstNewJob = previousFirstNewJob;
|
|
} else if (previousFirstNewJob) {
|
|
auto cur = previousFirstNewJob;
|
|
while (true) {
|
|
auto next = getNextJobInQueue(cur).getAsPreprocessedJob();
|
|
if (!next) {
|
|
setNextJobInQueue(cur, JobRef::getPreprocessed(firstNewJob));
|
|
break;
|
|
}
|
|
cur = next;
|
|
}
|
|
firstNewJob = previousFirstNewJob;
|
|
}
|
|
|
|
return firstNewJob;
|
|
}
|
|
|
|
void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
|
|
auto oldState = CurrentState.load(std::memory_order_acquire);
|
|
assert(oldState.Flags.getStatus() == Status::Running);
|
|
|
|
ProcessOverrideJob *overridesToWake = nullptr;
|
|
auto firstNewJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr,
|
|
overridesToWake);
|
|
|
|
while (true) {
|
|
State newState = oldState;
|
|
newState.FirstJob = JobRef::getPreprocessed(firstNewJob);
|
|
if (firstNewJob) {
|
|
newState.Flags.setStatus(Status::Scheduled);
|
|
} else {
|
|
newState.Flags.setStatus(Status::Idle);
|
|
}
|
|
|
|
// If the runner was an inline job, it's no longer active.
|
|
if (runner.wasInlineJob()) {
|
|
newState.Flags.setHasActiveInlineJob(false);
|
|
}
|
|
|
|
bool hasMoreJobs = (bool) newState.FirstJob;
|
|
bool hasOverrideAtNewPriority =
|
|
(runner.Priority < oldState.Flags.getMaxPriority());
|
|
bool hasActiveInlineJob = newState.Flags.hasActiveInlineJob();
|
|
bool needsNewProcessJob = hasMoreJobs && !hasOverrideAtNewPriority;
|
|
|
|
// If we want to create a new inline job below, be sure to claim that
|
|
// in the new state.
|
|
if (needsNewProcessJob && !hasActiveInlineJob) {
|
|
newState.Flags.setHasActiveInlineJob(true);
|
|
}
|
|
|
|
auto firstPreprocessed = oldState.FirstJob;
|
|
if (!CurrentState.compare_exchange_weak(oldState, newState,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_acquire)) {
|
|
// Preprocess any new queue items.
|
|
firstNewJob = preprocessQueue(oldState.FirstJob,
|
|
firstPreprocessed,
|
|
firstNewJob,
|
|
overridesToWake);
|
|
|
|
// Try again.
|
|
continue;
|
|
}
|
|
|
|
// The priority of the remaining work.
|
|
auto newPriority = newState.Flags.getMaxPriority();
|
|
|
|
// Wake any overrides.
|
|
wakeOverrides(overridesToWake, newPriority);
|
|
|
|
// This is the actor's owning job; per the ownership rules (see
|
|
// the comment on DefaultActorImpl), if there are remaining
|
|
// jobs, we need to balance out our ownership one way or another.
|
|
// We also, of course, need to ensure that there's a thread that's
|
|
// actually going to process the actor.
|
|
if (hasMoreJobs) {
|
|
// If we know that there's an override job at the new priority,
|
|
// we can let it become the owning job. We just need to release.
|
|
if (hasOverrideAtNewPriority) {
|
|
swift_release(this);
|
|
|
|
// Otherwies, enqueue a job that will try to take over running
|
|
// with the new priority. This also ensures that there's a job
|
|
// at that priority which will actually take over the actor.
|
|
} else {
|
|
scheduleNonOverrideProcessJob(newPriority, hasActiveInlineJob);
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
/// Claim the next job on the actor or give it up forever.
|
|
///
|
|
/// The running thread doesn't need to already own the actor to do this.
|
|
/// It does need to be participating correctly in the ownership
|
|
/// scheme as a "processing job"; see the comment on DefaultActorImpl.
|
|
Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
|
|
RunningJobInfo runner) {
|
|
auto oldState = CurrentState.load(std::memory_order_acquire);
|
|
|
|
// The status had better be Running unless we're trying to acquire
|
|
// our first job.
|
|
assert(oldState.Flags.getStatus() == Status::Running ||
|
|
!actorIsOwned);
|
|
|
|
// If we don't yet own the actor, we need to try to claim the actor
|
|
// first; we cannot safely access the queue memory yet because other
|
|
// threads may concurrently be trying to do this.
|
|
if (!actorIsOwned) {
|
|
while (true) {
|
|
// A helper function when the only change we need to try is to
|
|
// update for an inline runner.
|
|
auto tryUpdateForInlineRunner = [&]{
|
|
if (!runner.wasInlineJob()) return true;
|
|
|
|
auto newState = oldState;
|
|
newState.Flags.setHasActiveInlineJob(false);
|
|
return CurrentState.compare_exchange_weak(oldState, newState,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ std::memory_order_acquire);
|
|
};
|
|
|
|
// If the actor is out of work, or its priority doesn't match our
|
|
// priority, don't try to take over the actor.
|
|
if (!oldState.FirstJob ||
|
|
oldState.Flags.getMaxPriority() != runner.Priority) {
|
|
|
|
// The only change we need here is inline-runner bookkeeping.
|
|
if (!tryUpdateForInlineRunner())
|
|
continue;
|
|
|
|
// We're eliminating a processing thread; balance ownership.
|
|
swift_release(this);
|
|
runner.setAbandoned();
|
|
return nullptr;
|
|
}
|
|
|
|
// If the actor is currently running, we'd need to wait for
|
|
// it to stop. We can do this if we're an override job;
|
|
// otherwise we need to exit.
|
|
if (oldState.Flags.getStatus() == Status::Running) {
|
|
if (!runner.waitForActivation()) {
|
|
// The only change we need here is inline-runner bookkeeping.
|
|
if (!tryUpdateForInlineRunner())
|
|
continue;
|
|
|
|
swift_release(this);
|
|
return nullptr;
|
|
}
|
|
|
|
// Fall through into the compare-exchange below, but anticipate
|
|
// that the actor is now Scheduled instead of Running.
|
|
oldState.Flags.setStatus(Status::Scheduled);
|
|
}
|
|
|
|
// Try to set the state as Running.
|
|
assert(oldState.Flags.getStatus() == Status::Scheduled);
|
|
auto newState = oldState;
|
|
newState.Flags.setStatus(Status::Running);
|
|
|
|
// Also do our inline-runner bookkeeping.
|
|
if (runner.wasInlineJob())
|
|
newState.Flags.setHasActiveInlineJob(false);
|
|
|
|
if (!CurrentState.compare_exchange_weak(oldState, newState,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ std::memory_order_acquire))
|
|
continue;
|
|
|
|
// If that succeeded, we can proceed to the main body.
|
|
oldState = newState;
|
|
runner.setRunning();
|
|
break;
|
|
}
|
|
}
|
|
|
|
assert(oldState.Flags.getStatus() == Status::Running);
|
|
|
|
// We should have taken care of the inline-job bookkeeping now.
|
|
assert(!oldState.Flags.hasActiveInlineJob() || !runner.wasInlineJob());
|
|
|
|
// Okay, now it's safe to look at queue state.
|
|
// Preprocess any queue items at the front of the queue.
|
|
ProcessOverrideJob *overridesToWake = nullptr;
|
|
auto newFirstJob = preprocessQueue(oldState.FirstJob, JobRef(),
|
|
nullptr, overridesToWake);
|
|
|
|
Optional<JobPriority> remainingJobPriority;
|
|
while (true) {
|
|
State newState = oldState;
|
|
|
|
// If the priority we're currently running with is adqeuate for
|
|
// all the remaining jobs, try to dequeue something.
|
|
// FIXME: should this be an exact match in priority instead of
|
|
// potentially running jobs with too high a priority?
|
|
Job *jobToRun;
|
|
if (oldState.Flags.getMaxPriority() <= runner.Priority &&
|
|
newFirstJob) {
|
|
jobToRun = newFirstJob;
|
|
newState.FirstJob = getNextJobInQueue(newFirstJob);
|
|
newState.Flags.setStatus(Status::Running);
|
|
|
|
// Otherwise, we should give up the thread.
|
|
} else {
|
|
jobToRun = nullptr;
|
|
newState.FirstJob = JobRef::getPreprocessed(newFirstJob);
|
|
newState.Flags.setStatus(newFirstJob ? Status::Scheduled
|
|
: Status::Idle);
|
|
}
|
|
|
|
// Try to update the queue. The changes we've made to the queue
|
|
// structure need to be made visible even if we aren't dequeuing
|
|
// anything.
|
|
auto firstPreprocessed = oldState.FirstJob;
|
|
if (!CurrentState.compare_exchange_weak(oldState, newState,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_acquire)) {
|
|
// Preprocess any new queue items, which will have been formed
|
|
// into a linked list leading to the last head we observed.
|
|
// (The fact that that job may not be the head anymore doesn't
|
|
// matter; we're looking for an exact match with that.)
|
|
newFirstJob = preprocessQueue(oldState.FirstJob,
|
|
firstPreprocessed,
|
|
newFirstJob,
|
|
overridesToWake);
|
|
|
|
// Loop to retry updating the state.
|
|
continue;
|
|
}
|
|
|
|
// We successfully updated the state.
|
|
|
|
// If we're giving up the thread with jobs remaining, we need
|
|
// to release the actor, and we should wake overrides with the
|
|
// right priority.
|
|
Optional<JobPriority> remainingJobPriority;
|
|
if (!jobToRun && newFirstJob) {
|
|
remainingJobPriority = newState.Flags.getMaxPriority();
|
|
}
|
|
|
|
// Wake the overrides.
|
|
wakeOverrides(overridesToWake, remainingJobPriority);
|
|
|
|
// Per the ownership rules (see the comment on DefaultActorImpl),
|
|
// release the actor if we're giving up the thread with jobs
|
|
// remaining. We intentionally do this after wakeOverrides to
|
|
// try to get the overrides running a little faster.
|
|
if (remainingJobPriority)
|
|
swift_release(this);
|
|
|
|
return jobToRun;
|
|
}
|
|
}
|
|
|
|
/// The primary function for processing an actor on a thread. Start
|
|
/// processing the given default actor as the active default actor on
|
|
/// the current thread, and keep processing whatever actor we're
|
|
/// running when code returns back to us until we're not processing
|
|
/// any actors anymore.
|
|
static void processDefaultActor(DefaultActorImpl *currentActor,
|
|
RunningJobInfo runner) {
|
|
// Register that we're processing a default actor in this frame.
|
|
ExecutorTrackingInfo trackingInfo;
|
|
auto activeTrackingInfo = trackingInfo.enterOrUpdate(
|
|
ExecutorRef::forDefaultActor(asAbstract(currentActor)));
|
|
|
|
bool threadIsRunningActor = false;
|
|
while (true) {
|
|
assert(currentActor);
|
|
|
|
// Immediately check if we've been asked to yield the thread.
|
|
if (shouldYieldThread())
|
|
break;
|
|
|
|
// Claim another job from the current actor.
|
|
auto job = currentActor->claimNextJobOrGiveUp(threadIsRunningActor,
|
|
runner);
|
|
|
|
// If we failed to claim a job, we have nothing to do.
|
|
if (!job) {
|
|
// We also gave up the actor as part of failing to claim it.
|
|
// Make sure we don't try to give up the actor again.
|
|
currentActor = nullptr;
|
|
break;
|
|
}
|
|
|
|
// Run the job.
|
|
auto executor = ExecutorRef::forDefaultActor(asAbstract(currentActor));
|
|
runJobInExecutorContext(job, executor);
|
|
|
|
// The current actor may have changed after the job.
|
|
// If it's become nil, or not a default actor, we have nothing to do.
|
|
auto currentExecutor = activeTrackingInfo->getActiveExecutor();
|
|
if (!currentExecutor.isDefaultActor())
|
|
break;
|
|
currentActor = asImpl(currentExecutor.getDefaultActor());
|
|
|
|
// Otherwise, we know that we're running the actor on this thread.
|
|
threadIsRunningActor = true;
|
|
}
|
|
|
|
if (activeTrackingInfo == &trackingInfo)
|
|
trackingInfo.leave();
|
|
|
|
// If we still have an active actor, we should give it up.
|
|
if (currentActor)
|
|
currentActor->giveUpThread(runner);
|
|
}
|
|
|
|
void ProcessInlineJob::process(Job *job, ExecutorRef _executor) {
|
|
DefaultActorImpl *actor = DefaultActorImpl::fromInlineJob(job);
|
|
|
|
// Pull the priority out of the job before we do anything that might
|
|
// invalidate it.
|
|
auto targetPriority = job->getPriority();
|
|
auto runner = RunningJobInfo::forInline(targetPriority);
|
|
|
|
// FIXME: force tail call
|
|
return processDefaultActor(actor, runner);
|
|
}
|
|
|
|
void ProcessOutOfLineJob::process(Job *job, ExecutorRef _executor) {
|
|
auto self = cast<ProcessOutOfLineJob>(job);
|
|
DefaultActorImpl *actor = self->Actor;
|
|
|
|
// Pull the priority out of the job before we do anything that might
|
|
// invalidate it.
|
|
auto targetPriority = job->getPriority();
|
|
auto runner = RunningJobInfo::forOther(targetPriority);
|
|
|
|
delete self;
|
|
|
|
// FIXME: force tail call
|
|
return processDefaultActor(actor, runner);
|
|
}
|
|
|
|
void ProcessOverrideJob::process(Job *job, ExecutorRef _executor) {
|
|
auto self = cast<ProcessOverrideJob>(job);
|
|
|
|
// Pull the actor and priority out of the job.
|
|
auto actor = self->Actor;
|
|
auto runner = RunningJobInfo::forOverride(self);
|
|
|
|
// FIXME: force tail call
|
|
return processDefaultActor(actor, runner);
|
|
}
|
|
|
|
void DefaultActorImpl::enqueue(Job *job) {
|
|
auto oldState = CurrentState.load(std::memory_order_relaxed);
|
|
|
|
OverrideJobCache overrideJob;
|
|
|
|
while (true) {
|
|
auto newState = oldState;
|
|
|
|
// Put the job at the front of the job list (which will get
|
|
// reversed during preprocessing).
|
|
setNextJobInQueue(job, oldState.FirstJob);
|
|
newState.FirstJob = JobRef::getUnpreprocessed(job);
|
|
|
|
auto oldStatus = oldState.Flags.getStatus();
|
|
bool wasIdle = oldStatus == Status::Idle;
|
|
|
|
// Update the priority: the prriority of the job we're adding
|
|
// if the actor was idle, or the max if not. Only the running
|
|
// thread can decrease the actor's priority once it's non-idle.
|
|
// (But note that the job we enqueue can still observe a
|
|
// lowered priority.)
|
|
auto oldPriority = oldState.Flags.getMaxPriority();
|
|
auto newPriority =
|
|
wasIdle ? job->getPriority()
|
|
: std::max(oldPriority, job->getPriority());
|
|
newState.Flags.setMaxPriority(newPriority);
|
|
|
|
// If we need an override job, create it (if necessary) and
|
|
// register it with the queue.
|
|
bool needsOverride = !wasIdle && newPriority != oldPriority;
|
|
if (needsOverride) {
|
|
overrideJob.addToState(this, newState);
|
|
} else {
|
|
overrideJob.setNotNeeded();
|
|
}
|
|
|
|
// If we don't need an override job, then we might be able to
|
|
// create an inline job; flag that.
|
|
bool hasActiveInlineJob = newState.Flags.hasActiveInlineJob();
|
|
if (wasIdle && !hasActiveInlineJob)
|
|
newState.Flags.setHasActiveInlineJob(true);
|
|
|
|
// Make sure the status is at least Scheduled. We'll actually
|
|
// schedule the job below, if we succeed at this.
|
|
if (wasIdle) {
|
|
newState.Flags.setStatus(Status::Scheduled);
|
|
}
|
|
|
|
// Try the compare-exchange, and try again if it fails.
|
|
if (!CurrentState.compare_exchange_weak(oldState, newState,
|
|
/*success*/ std::memory_order_release,
|
|
/*failure*/ std::memory_order_relaxed))
|
|
continue;
|
|
|
|
// Okay, we successfully updated the status. Schedule a job to
|
|
// process the actor if necessary.
|
|
|
|
// Commit the override job if we created one.
|
|
overrideJob.commit();
|
|
|
|
// If the actor is currently idle, schedule it using the
|
|
// invasive job.
|
|
if (wasIdle) {
|
|
assert(!needsOverride);
|
|
scheduleNonOverrideProcessJob(newPriority, hasActiveInlineJob);
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
bool DefaultActorImpl::tryAssumeThread(RunningJobInfo runner) {
|
|
// We have to load-acquire in order to properly order accesses to
|
|
// the actor's state for the new task.
|
|
auto oldState = CurrentState.load(std::memory_order_acquire);
|
|
|
|
// If the actor is currently idle, try to mark it as running.
|
|
while (oldState.Flags.getStatus() == Status::Idle) {
|
|
assert(!oldState.FirstJob);
|
|
auto newState = oldState;
|
|
newState.Flags.setStatus(Status::Running);
|
|
newState.Flags.setMaxPriority(runner.Priority);
|
|
|
|
if (CurrentState.compare_exchange_weak(oldState, newState,
|
|
/*success*/ std::memory_order_relaxed,
|
|
/*failure*/ std::memory_order_acquire))
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void swift::swift_defaultActor_initialize(DefaultActor *_actor) {
|
|
asImpl(_actor)->initialize();
|
|
}
|
|
|
|
void swift::swift_defaultActor_destroy(DefaultActor *_actor) {
|
|
asImpl(_actor)->destroy();
|
|
}
|
|
|
|
void swift::swift_defaultActor_enqueue(Job *job, DefaultActor *_actor) {
|
|
asImpl(_actor)->enqueue(job);
|
|
}
|
|
|
|
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
|
|
namespace swift {
|
|
Metadata* MainActorMetadata = nullptr;
|
|
}
|
|
|
|
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
|
|
void swift::swift_MainActor_register(HeapObject *actor) {
|
|
assert(actor);
|
|
MainActorMetadata = const_cast<Metadata*>(swift_getObjectType(actor));
|
|
assert(MainActorMetadata);
|
|
}
|
|
|
|
/*****************************************************************************/
|
|
/****************************** ACTOR SWITCHING ******************************/
|
|
/*****************************************************************************/
|
|
|
|
/// Can the current executor give up its thread?
|
|
static bool canGiveUpThreadForSwitch(ExecutorRef currentExecutor) {
|
|
// We can certainly "give up" a generic executor to try to run
|
|
// a task for an actor.
|
|
if (currentExecutor.isGeneric())
|
|
return true;
|
|
|
|
// If the current executor is a default actor, we know how to make
|
|
// it give up its thread.
|
|
if (currentExecutor.isDefaultActor())
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
/// Tell the current executor to give up its thread, given that it
|
|
/// returned true from canGiveUpThreadForSwitch().
|
|
///
|
|
/// Note that we don't update DefaultActorProcessingFrame here; we'll
|
|
/// do that in runOnAssumedThread.
|
|
static void giveUpThreadForSwitch(ExecutorRef currentExecutor,
|
|
RunningJobInfo runner) {
|
|
if (currentExecutor.isGeneric())
|
|
return;
|
|
|
|
asImpl(currentExecutor.getDefaultActor())->giveUpThread(runner);
|
|
}
|
|
|
|
/// Try to assume control of the current thread for the given executor
|
|
/// in order to run the given job.
|
|
///
|
|
/// This doesn't actually run the job yet.
|
|
///
|
|
/// Note that we don't update DefaultActorProcessingFrame here; we'll
|
|
/// do that in runOnAssumedThread.
|
|
static bool tryAssumeThreadForSwitch(ExecutorRef newExecutor,
|
|
RunningJobInfo runner) {
|
|
// If the new executor is generic, we don't need to do anything.
|
|
if (newExecutor.isGeneric()) {
|
|
return true;
|
|
}
|
|
|
|
// If the new executor is a default actor, ask it to assume the thread.
|
|
if (newExecutor.isDefaultActor()) {
|
|
return asImpl(newExecutor.getDefaultActor())->tryAssumeThread(runner);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/// Given that we've assumed control of an executor on this thread,
|
|
/// continue to run the given task on it.
|
|
SWIFT_CC(swiftasync)
|
|
static void runOnAssumedThread(AsyncTask *task, ExecutorRef executor,
|
|
RunningJobInfo runner) {
|
|
// Set that this actor is now the active default actor on this thread,
|
|
// and set up tracking info if there isn't any already.
|
|
ExecutorTrackingInfo trackingInfo;
|
|
auto activeTrackingInfo = trackingInfo.enterOrUpdate(executor);
|
|
|
|
// If one already existed, we should just tail-call the task; we don't
|
|
// want these frames to potentially accumulate linearly.
|
|
if (activeTrackingInfo != &trackingInfo) {
|
|
// FIXME: force tail call
|
|
return task->runInFullyEstablishedContext(executor);
|
|
}
|
|
|
|
// Otherwise, run the new task.
|
|
task->runInFullyEstablishedContext(executor);
|
|
|
|
// Leave the tracking frame, and give up the current actor if
|
|
// we have one.
|
|
//
|
|
// In principle, we could execute more tasks here, but that's probably
|
|
// not a reasonable thing to do in an assumed context rather than a
|
|
// dedicated actor-processing job.
|
|
executor = trackingInfo.getActiveExecutor();
|
|
trackingInfo.leave();
|
|
|
|
if (executor.isDefaultActor())
|
|
asImpl(executor.getDefaultActor())->giveUpThread(runner);
|
|
}
|
|
|
|
SWIFT_CC(swiftasync)
|
|
void swift::swift_task_switch(AsyncTask *task, ExecutorRef currentExecutor,
|
|
ExecutorRef newExecutor) {
|
|
assert(task && "no task provided");
|
|
|
|
// If the current executor is compatible with running the new executor,
|
|
// just continue running.
|
|
if (!currentExecutor.mustSwitchToRun(newExecutor)) {
|
|
// FIXME: force tail call
|
|
return task->runInFullyEstablishedContext(currentExecutor);
|
|
}
|
|
|
|
// Okay, we semantically need to switch.
|
|
auto runner = RunningJobInfo::forOther(task->getPriority());
|
|
|
|
// If the current executor can give up its thread, and the new executor
|
|
// can take over a thread, try to do so; but don't do this if we've
|
|
// been asked to yield the thread.
|
|
if (canGiveUpThreadForSwitch(currentExecutor) &&
|
|
!shouldYieldThread() &&
|
|
tryAssumeThreadForSwitch(newExecutor, runner)) {
|
|
giveUpThreadForSwitch(currentExecutor, runner);
|
|
// FIXME: force tail call
|
|
return runOnAssumedThread(task, newExecutor, runner);
|
|
}
|
|
|
|
// Otherwise, just asynchronously enqueue the task on the given
|
|
// executor.
|
|
swift_task_enqueue(task, newExecutor);
|
|
}
|
|
|
|
/*****************************************************************************/
|
|
/************************* GENERIC ACTOR INTERFACES **************************/
|
|
/*****************************************************************************/
|
|
|
|
void swift::swift_task_enqueue(Job *job, ExecutorRef executor) {
|
|
assert(job && "no job provided");
|
|
|
|
_swift_tsan_release(job);
|
|
|
|
if (executor.isGeneric())
|
|
return swift_task_enqueueGlobal(job);
|
|
|
|
/// FIXME: only exists for the quick-and-dirty MainActor implementation.
|
|
if (executor.isMainExecutor())
|
|
return swift_task_enqueueMainExecutor(job);
|
|
|
|
if (executor.isDefaultActor())
|
|
return asImpl(executor.getDefaultActor())->enqueue(job);
|
|
|
|
// Just assume it's actually a default actor that we haven't tagged
|
|
// properly.
|
|
// FIXME: call the general method.
|
|
return asImpl(reinterpret_cast<DefaultActor*>(executor.getRawValue()))
|
|
->enqueue(job);
|
|
}
|