[BatchMode] Add driver::PerformJobsState logic for BatchJobs.

This commit is contained in:
Graydon Hoare
2018-01-16 20:14:35 -08:00
parent 3d9ce0afb9
commit 57fe9ee57a

View File

@@ -27,6 +27,7 @@
#include "swift/Driver/ToolChain.h"
#include "llvm/ADT/DenseSet.h"
#include "llvm/ADT/MapVector.h"
#include "llvm/ADT/SetVector.h"
#include "llvm/ADT/StringExtras.h"
#include "llvm/ADT/TinyPtrVector.h"
#include "llvm/Option/Arg.h"
@@ -136,6 +137,17 @@ namespace driver {
/// don't need to run.
CommandSet ScheduledCommands;
/// A temporary buffer to hold commands that were scheduled but haven't been
/// added to the Task Queue yet, because we might try batching them together
/// first.
CommandSet PendingExecution;
/// Set of synthetic BatchJobs that serve to cluster subsets of jobs waiting
/// in PendingExecution. Also used to identify (then unpack) BatchJobs back
/// to their underlying non-Batch Jobs, when running a callback from
/// TaskQueue.
CommandSet BatchJobs;
/// All jobs which have finished execution or which have been determined
/// that they don't need to run.
CommandSet FinishedCommands;
@@ -222,6 +234,17 @@ namespace driver {
return;
}
// Adding to scheduled means we've committed to its completion (not
// distinguished from skipping). We never remove it once inserted.
ScheduledCommands.insert(Cmd);
// Adding to pending means it should be in the next round of additions to
// the task queue (either batched or singularly); we remove Jobs from
// PendingExecution once we hand them over to the TaskQueue.
PendingExecution.insert(Cmd);
}
void addPendingJobToTaskQueue(const Job *Cmd) {
// FIXME: Failing here should not take down the whole process.
bool success = writeFilelistIfNecessary(Cmd, Comp.Diags);
assert(success && "failed to write filelist");
@@ -229,7 +252,6 @@ namespace driver {
assert(Cmd->getExtraEnvironment().empty() &&
"not implemented for compilations with multiple jobs");
ScheduledCommands.insert(Cmd);
if (Comp.ShowJobLifecycle)
llvm::outs() << "Added to TaskQueue: " << LogJob(Cmd) << "\n";
TQ->addTask(Cmd->getExecutable(), Cmd->getArguments(), llvm::None,
@@ -289,6 +311,8 @@ namespace driver {
parseable_output::emitBeganMessage(llvm::errs(), *BeganCmd, Pid);
}
/// Note that a .swiftdeps file failed to load and take corrective actions:
/// disable incremental logic and schedule all existing deferred commands.
void
dependencyLoadFailed(StringRef DependenciesFile, bool Warn=true) {
if (Warn && Comp.ShowIncrementalBuildDecisions)
@@ -301,9 +325,10 @@ namespace driver {
DeferredCommands.clear();
}
/// Helper that reloads a job's .swiftdeps file after the job exits, and
/// re-runs transitive marking to ensure everything is properly invalidated
/// by any new dependency edges introduced by it.
/// Helper that attmepts to reload a job's .swiftdeps file after the job
/// exits, and re-run transitive marking to ensure everything is properly
/// invalidated by any new dependency edges introduced by it. If reloading
/// fails, this can cause deferred jobs to be immediately scheduled.
template <unsigned N>
void reloadAndRemarkDeps(const Job *FinishedCmd,
int ReturnCode,
@@ -381,6 +406,28 @@ namespace driver {
}
}
/// Unpack a \c BatchJob that has finished into its constituent \c Job
/// members, and call \c taskFinished on each, propagating any \c
/// TaskFinishedResponse other than \c
/// TaskFinishedResponse::ContinueExecution from any of the constituent
/// calls.
TaskFinishedResponse
unpackAndFinishBatch(ProcessId Pid, int ReturnCode, StringRef Output,
StringRef Errors, const BatchJob *B) {
if (Comp.ShowJobLifecycle)
llvm::outs() << "Batch job finished: " << LogJob(B) << "\n";
auto res = TaskFinishedResponse::ContinueExecution;
for (const Job *J : B->getCombinedJobs()) {
if (Comp.ShowJobLifecycle)
llvm::outs() << " ==> Unpacked batch constituent finished: "
<< LogJob(J) << "\n";
auto r = taskFinished(Pid, ReturnCode, Output, Errors, (void *)J);
if (r != TaskFinishedResponse::ContinueExecution)
res = r;
}
return res;
}
/// Callback which will be called immediately after a task has finished
/// execution. Determines if execution should continue, and also schedule
/// any additional Jobs which we now know we need to run.
@@ -393,6 +440,11 @@ namespace driver {
DriverTimers[FinishedCmd]->stopTimer();
}
if (BatchJobs.count(FinishedCmd) != 0) {
return unpackAndFinishBatch(Pid, ReturnCode, Output, Errors,
static_cast<const BatchJob *>(FinishedCmd));
}
if (Comp.Level == OutputLevel::Parseable) {
// Parseable output was requested.
parseable_output::emitFinishedMessage(llvm::errs(), *FinishedCmd, Pid,
@@ -446,6 +498,30 @@ namespace driver {
return TaskFinishedResponse::ContinueExecution;
}
/// Unpack a \c BatchJob that has finished into its constituent \c Job
/// members, and call \c taskSignalled on each, propagating any \c
/// TaskFinishedResponse other than \c
/// TaskFinishedResponse::ContinueExecution from any of the constituent
/// calls.
TaskFinishedResponse
unpackAndSignalBatch(ProcessId Pid, StringRef ErrorMsg, StringRef Output,
StringRef Errors, const BatchJob *B,
Optional<int> Signal) {
if (Comp.ShowJobLifecycle)
llvm::outs() << "Batch job signalled: " << LogJob(B) << "\n";
auto res = TaskFinishedResponse::ContinueExecution;
for (const Job *J : B->getCombinedJobs()) {
if (Comp.ShowJobLifecycle)
llvm::outs() << " ==> Unpacked batch constituent signalled: "
<< LogJob(J) << "\n";
auto r = taskSignalled(Pid, ErrorMsg, Output, Errors,
(void *)J, Signal);
if (r != TaskFinishedResponse::ContinueExecution)
res = r;
}
return res;
}
TaskFinishedResponse
taskSignalled(ProcessId Pid, StringRef ErrorMsg, StringRef Output,
StringRef Errors, void *Context, Optional<int> Signal) {
@@ -455,6 +531,12 @@ namespace driver {
DriverTimers[SignalledCmd]->stopTimer();
}
if (BatchJobs.count(SignalledCmd) != 0) {
return unpackAndSignalBatch(Pid, ErrorMsg, Output, Errors,
static_cast<const BatchJob *>(SignalledCmd),
Signal);
}
if (Comp.Level == OutputLevel::Parseable) {
// Parseable output was requested.
parseable_output::emitSignalledMessage(llvm::errs(), *SignalledCmd,
@@ -596,6 +678,126 @@ namespace driver {
}
}
/// Insert all jobs in \p Cmds (of descriptive name \p Kind) to the \c
/// TaskQueue, and clear \p Cmds.
void transferJobsToTaskQueue(CommandSet &Cmds, StringRef Kind) {
for (const Job *Cmd : Cmds) {
if (Comp.ShowJobLifecycle)
llvm::outs() << "Adding " << Kind
<< " job to task queue: "
<< LogJob(Cmd) << "\n";
addPendingJobToTaskQueue(Cmd);
}
Cmds.clear();
}
/// Partition the jobs in \c PendingExecution into those that are \p
/// Batchable and those that are \p NonBatchable, clearing \p
/// PendingExecution.
void getPendingBatchableJobs(CommandSet &Batchable,
CommandSet &NonBatchable) {
for (const Job *Cmd : PendingExecution) {
if (Comp.getToolChain().jobIsBatchable(Comp, Cmd)) {
if (Comp.ShowJobLifecycle)
llvm::outs() << "Batchable: " << LogJob(Cmd) << "\n";
Batchable.insert(Cmd);
} else {
if (Comp.ShowJobLifecycle)
llvm::outs() << "Not batchable: " << LogJob(Cmd) << "\n";
NonBatchable.insert(Cmd);
}
}
PendingExecution.clear();
}
/// If \p CurrentBatch is nonempty, construct a new \c BatchJob from its
/// contents by calling \p ToolChain::constructBatchJob, then insert the
/// new \c BatchJob into \p Batches and clear \p CurrentBatch.
void
formBatchJobFromCurrentBatch(CommandSet &Batches,
llvm::SetVector<const Job *> &CurrentBatch) {
if (CurrentBatch.empty())
return;
if (Comp.ShowJobLifecycle)
llvm::outs() << "Forming batch job from "
<< CurrentBatch.size() << " constituents\n";
auto const &TC = Comp.getToolChain();
auto J = TC.constructBatchJob(CurrentBatch.getArrayRef(), Comp);
if (J)
Batches.insert(Comp.addJob(std::move(J)));
CurrentBatch.clear();
}
/// Return true iff \p Cmd can be expanded by \p CurrentBatch, meaning
/// that \p CurrentBatch is smaller than \p TargetBatchSize and \p Cmd
/// is batch-combinable with the equivalence class of \p CurrentBatch
/// (as represented by element 0 of \p CurrentBatch).
bool canExpandBatch(const Job *Cmd,
llvm::SetVector<const Job *> &CurrentBatch,
size_t TargetBatchSize) {
auto const &TC = Comp.getToolChain();
return (CurrentBatch.empty() ||
(TC.jobsAreBatchCombinable(Comp, Cmd, CurrentBatch[0]) &&
CurrentBatch.size() < TargetBatchSize));
}
/// If \p CurrentBatch can't be expanded with \p Cmd, form a new \c BatchJob
/// from \p CurrentBatch, add it to \p Batches, and reset\p CurrentBatch;
/// then in either case, insert \p Cmd into \p CurrentBatch.
void expandBatch(const Job *Cmd,
CommandSet &Batches,
llvm::SetVector<const Job *> &CurrentBatch,
size_t TargetBatchSize) {
if (!canExpandBatch(Cmd, CurrentBatch, TargetBatchSize)) {
formBatchJobFromCurrentBatch(Batches, CurrentBatch);
}
llvm::outs() << "Adding to batch: " << LogJob(Cmd) << "\n";
CurrentBatch.insert(Cmd);
}
/// Select jobs that are batch-combinable from \c PendingExecution, combine
/// them together into \p BatchJob instances (also inserted into \p
/// BatchJobs), and enqueue all \c PendingExecution jobs (whether batched or
/// not) into the \c TaskQueue for execution.
void formBatchJobsAndAddPendingJobsToTaskQueue() {
// If batch mode is not enabled, just transfer the set of pending jobs to
// the task queue, as-is.
if (!Comp.getBatchModeEnabled()) {
transferJobsToTaskQueue(PendingExecution, "standard");
return;
}
// Partition the pending jobs.
CommandSet Batchable, NonBatchable, Batches;
getPendingBatchableJobs(Batchable, NonBatchable);
size_t TargetBatchSize = Batchable.size() / Comp.NumberOfParallelCommands;
if (Comp.ShowJobLifecycle) {
llvm::outs() << "Found " << Batchable.size() << " batchable jobs\n";
llvm::outs() << "Aiming for batch size " << TargetBatchSize << '\n';
}
// Batch the batchable jobs.
llvm::SetVector<const Job *> CurrentBatch;
for (const Job *Cmd : Batchable) {
expandBatch(Cmd, Batches, CurrentBatch, TargetBatchSize);
}
// Form a residual incomplete batch if any jobs remain.
if (!CurrentBatch.empty()) {
formBatchJobFromCurrentBatch(Batches, CurrentBatch);
}
// Save batches so we can locate and decompose them on task-exit.
for (const Job *Cmd : Batches)
BatchJobs.insert(Cmd);
// Enqueue the resulting jobs, batched and non-batched alike.
transferJobsToTaskQueue(Batches, "batch");
transferJobsToTaskQueue(NonBatchable, "non-batch");
}
void runTaskQueueToCompletion() {
do {
using namespace std::placeholders;
@@ -607,20 +809,39 @@ namespace driver {
std::bind(&PerformJobsState::taskSignalled, this,
_1, _2, _3, _4, _5, _6));
// Mark all remaining deferred commands as skipped.
// Returning from TaskQueue::execute should mean either an empty
// TaskQueue or a failed subprocess.
assert(!(Result == 0 && TQ->hasRemainingTasks()));
// Task-exit callbacks from TaskQueue::execute may have unblocked jobs,
// which means there might be PendingExecution jobs to enqueue here. If
// there are, we need to continue trying to make progress on the
// TaskQueue before we start marking deferred jobs as skipped, below.
if (!PendingExecution.empty() && Result == 0) {
formBatchJobsAndAddPendingJobsToTaskQueue();
continue;
}
// If we got here, all the queued and pending work we know about is
// done; mark anything still in deferred state as skipped.
for (const Job *Cmd : DeferredCommands) {
if (Comp.Level == OutputLevel::Parseable) {
// Provide output indicating this command was skipped if parseable output
// was requested.
// Provide output indicating this command was skipped if parseable
// output was requested.
parseable_output::emitSkippedMessage(llvm::errs(), *Cmd);
}
ScheduledCommands.insert(Cmd);
markFinished(Cmd, /*Skipped=*/true);
}
DeferredCommands.clear();
// ...which may allow us to go on and do later tasks.
// It's possible that by marking some jobs as skipped, we unblocked
// some jobs and thus have entries in PendingExecution again; push
// those through to the TaskQueue.
formBatchJobsAndAddPendingJobsToTaskQueue();
// If we added jobs to the TaskQueue, and we are not in an error state,
// we want to give the TaskQueue another run.
} while (Result == 0 && TQ->hasRemainingTasks());
}
@@ -848,6 +1069,7 @@ int Compilation::performJobsImpl(bool &abnormalExit) {
State.scheduleInitialJobs();
State.scheduleAdditionalJobs();
State.formBatchJobsAndAddPendingJobsToTaskQueue();
State.runTaskQueueToCompletion();
State.checkUnfinishedJobs();