Files
swift-mirror/lib/Basic/Unix/TaskQueue.inc
2018-04-21 12:19:18 -07:00

649 lines
20 KiB
C++

//===--- TaskQueue.inc - Unix-specific TaskQueue ----------------*- C++ -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
#include "swift/Basic/Statistic.h"
#include "swift/Basic/TaskQueue.h"
#include "llvm/ADT/DenseMap.h"
#include "llvm/ADT/DenseSet.h"
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/ErrorHandling.h"
#include <cerrno>
#include <string>
#if HAVE_POSIX_SPAWN
#include <spawn.h>
#endif
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <poll.h>
#include <sys/types.h>
#include <sys/wait.h>
#if !defined(__APPLE__)
extern char **environ;
#else
extern "C" {
// _NSGetEnviron is from crt_externs.h which is missing in the iOS SDK.
extern char ***_NSGetEnviron(void);
}
#endif
namespace swift {
namespace sys {
class Task {
/// The path to the executable which this Task will execute.
const char *ExecPath;
/// Any arguments which should be passed during execution.
ArrayRef<const char *> Args;
/// The environment which will be used during execution. If empty, uses
/// this process's environment.
ArrayRef<const char *> Env;
/// Context which should be associated with this task.
void *Context;
/// True if the errors of the Task should be stored in Errors instead of
/// Output.
bool SeparateErrors;
/// The pid of this Task when executing.
pid_t Pid;
/// A pipe for reading output from the child process.
int Pipe;
/// A pipe for reading errors from the child prcess, if SeparateErrors is
/// true.
int ErrorPipe;
/// The current state of the Task.
enum class TaskState { Preparing, Executing, Finished } State;
/// Once the Task has finished, this contains the buffered output of the Task.
std::string Output;
/// Once the Task has finished, if SeparateErrors is true, this contains the
/// errors from the Task.
std::string Errors;
/// Optional place to count I/O and subprocess events.
UnifiedStatsReporter *Stats;
public:
Task(const char *ExecPath, ArrayRef<const char *> Args,
ArrayRef<const char *> Env, void *Context, bool SeparateErrors,
UnifiedStatsReporter *USR)
: ExecPath(ExecPath), Args(Args), Env(Env), Context(Context),
SeparateErrors(SeparateErrors), Pid(-1), Pipe(-1), ErrorPipe(-1),
State(TaskState::Preparing), Stats(USR) {
assert((Env.empty() || Env.back() == nullptr) &&
"Env must either be empty or null-terminated!");
}
const char *getExecPath() const { return ExecPath; }
ArrayRef<const char *> getArgs() const { return Args; }
StringRef getOutput() const { return Output; }
StringRef getErrors() const { return Errors; }
void *getContext() const { return Context; }
pid_t getPid() const { return Pid; }
int getPipe() const { return Pipe; }
int getErrorPipe() const { return ErrorPipe; }
/// \brief Begins execution of this Task.
/// \returns true on error, false on success
bool execute();
/// \brief Reads data from the pipes, if any is available.
///
/// If \p UntilEnd is true, reads until the end of the stream; otherwise reads
/// once (possibly with a retry on EINTR), and returns.
/// \returns true on error, false on success.
bool readFromPipes(bool UntilEnd);
/// \brief Performs any post-execution work for this Task, such as reading
/// piped output and closing the pipe.
void finishExecution();
};
} // end namespace sys
} // end namespace swift
bool Task::execute() {
assert(State < TaskState::Executing && "This Task cannot be executed twice!");
State = TaskState::Executing;
// Construct argv.
SmallVector<const char *, 128> Argv;
Argv.push_back(ExecPath);
Argv.append(Args.begin(), Args.end());
Argv.push_back(0); // argv is expected to be null-terminated.
// Set up the pipe.
int FullPipe[2];
pipe(FullPipe);
Pipe = FullPipe[0];
int FullErrorPipe[2];
if (SeparateErrors) {
pipe(FullErrorPipe);
ErrorPipe = FullErrorPipe[0];
}
// Get the environment to pass down to the subtask.
const char *const *envp = Env.empty() ? nullptr : Env.data();
if (!envp) {
#if __APPLE__
envp = *_NSGetEnviron();
#else
envp = environ;
#endif
}
const char **argvp = Argv.data();
#if HAVE_POSIX_SPAWN
posix_spawn_file_actions_t FileActions;
posix_spawn_file_actions_init(&FileActions);
posix_spawn_file_actions_adddup2(&FileActions, FullPipe[1], STDOUT_FILENO);
if (SeparateErrors) {
posix_spawn_file_actions_adddup2(&FileActions, FullErrorPipe[1],
STDERR_FILENO);
} else {
posix_spawn_file_actions_adddup2(&FileActions, STDOUT_FILENO,
STDERR_FILENO);
}
posix_spawn_file_actions_addclose(&FileActions, FullPipe[0]);
if (SeparateErrors) {
posix_spawn_file_actions_addclose(&FileActions, FullErrorPipe[0]);
}
// Spawn the subtask.
int spawnErr =
posix_spawn(&Pid, ExecPath, &FileActions, nullptr,
const_cast<char **>(argvp), const_cast<char **>(envp));
posix_spawn_file_actions_destroy(&FileActions);
close(FullPipe[1]);
if (SeparateErrors) {
close(FullErrorPipe[1]);
}
if (spawnErr != 0 || Pid == 0) {
close(FullPipe[0]);
if (SeparateErrors) {
close(FullErrorPipe[0]);
}
State = TaskState::Finished;
return true;
}
#else
Pid = fork();
switch (Pid) {
case -1: {
close(FullPipe[0]);
if (SeparateErrors) {
close(FullErrorPipe[0]);
}
State = TaskState::Finished;
Pid = 0;
break;
}
case 0: {
// Child process: Execute the program.
dup2(FullPipe[1], STDOUT_FILENO);
if (SeparateErrors) {
dup2(FullErrorPipe[1], STDERR_FILENO);
} else {
dup2(STDOUT_FILENO, STDERR_FILENO);
}
close(FullPipe[0]);
if (SeparateErrors) {
close(FullErrorPipe[0]);
}
execve(ExecPath, const_cast<char **>(argvp), const_cast<char **>(envp));
// If the execve() failed, we should exit. Follow Unix protocol and
// return 127 if the executable was not found, and 126 otherwise.
// Use _exit rather than exit so that atexit functions and static
// object destructors cloned from the parent process aren't
// redundantly run, and so that any data buffered in stdio buffers
// cloned from the parent aren't redundantly written out.
_exit(errno == ENOENT ? 127 : 126);
}
default:
// Parent process: Break out of the switch to do our processing.
break;
}
close(FullPipe[1]);
if (SeparateErrors) {
close(FullErrorPipe[1]);
}
if (Pid == 0)
return true;
#endif
return false;
}
/// \p Pipe must be in blocking mode, and must contain unread data.
/// Read the data in \p Pipe, and append it to \p Output.
/// If \p UntilEnd is true, keep reading, and possibly hanging, till the pipe is
/// closed. If \p UntilEnd is false, just read once. \return true if error
static bool readFromAPipe(std::string &Output, int Pipe,
UnifiedStatsReporter *Stats, bool UntilEnd) {
char outputBuffer[1024];
ssize_t readBytes = 0;
while ((readBytes = read(Pipe, outputBuffer, sizeof(outputBuffer))) != 0) {
if (readBytes < 0) {
if (errno == EINTR)
// read() was interrupted, so try again.
// FIXME: Should there be a counter to break out of this loop if
// there are more than some number of EINTRs?
continue;
return true;
}
Output.append(outputBuffer, readBytes);
if (Stats)
Stats->getDriverCounters().NumDriverPipeReads++;
if (!UntilEnd)
break;
}
return false;
}
bool Task::readFromPipes(bool UntilEnd) {
bool Ret = readFromAPipe(Output, Pipe, Stats, UntilEnd);
if (SeparateErrors) {
Ret |= readFromAPipe(Errors, ErrorPipe, Stats, UntilEnd);
}
return Ret;
}
void Task::finishExecution() {
assert(State == TaskState::Executing &&
"This Task must be executing to finish execution!");
State = TaskState::Finished;
// Read the output of the command, so we can use it later.
readFromPipes(/*UntilEnd = */ false);
close(Pipe);
if (SeparateErrors) {
close(ErrorPipe);
}
}
bool TaskQueue::supportsBufferingOutput() {
// The Unix implementation supports buffering output.
return true;
}
bool TaskQueue::supportsParallelExecution() {
// The Unix implementation supports parallel execution.
return true;
}
unsigned TaskQueue::getNumberOfParallelTasks() const {
// TODO: add support for choosing a better default value for
// MaxNumberOfParallelTasks if NumberOfParallelTasks is 0. (Optimally, this
// should choose a value > 1 tailored to the current system.)
return NumberOfParallelTasks > 0 ? NumberOfParallelTasks : 1;
}
void TaskQueue::addTask(const char *ExecPath, ArrayRef<const char *> Args,
ArrayRef<const char *> Env, void *Context,
bool SeparateErrors) {
std::unique_ptr<Task> T(
new Task(ExecPath, Args, Env, Context, SeparateErrors, Stats));
QueuedTasks.push(std::move(T));
}
/// Concurrently execute the tasks in the TaskQueue, collecting the outputs from
/// each task. The typical task is a Swift frontend job.
class TaskExecutor {
/// The set of tasks needed to be executed.
std::queue<std::unique_ptr<Task>> &TasksToExecute;
public:
/// Unix system calls deal in process IDs (Pids), so a means is needed
/// to map a Pid back to a Task structure in memory.
using PidToTaskMap = llvm::DenseMap<pid_t, std::unique_ptr<Task>>;
private:
// Stores the current executing Tasks, organized by pid.
PidToTaskMap ExecutingTasks;
// Maintains the current fds we're checking with poll.
std::vector<struct pollfd> PollFds;
/// Limits the number of tasks to run in parallel
const unsigned MaxNumberOfParallelTasks;
/// Optional functions to call to mark significant events in a Task's life.
const TaskQueue::TaskBeganCallback BeganCallBack;
const TaskQueue::TaskFinishedCallback FinishedCallBack;
const TaskQueue::TaskSignalledCallback SignalledCallBack;
/// Optional place to count I/O and subprocess events.
UnifiedStatsReporter *Stats;
public:
TaskExecutor(std::queue<std::unique_ptr<Task>> &TasksToExecute,
const unsigned NumberOfParallelTasks,
const TaskQueue::TaskBeganCallback BeganCallBack,
const TaskQueue::TaskFinishedCallback FinishedCallBack,
const TaskQueue::TaskSignalledCallback SignalledCallBack,
UnifiedStatsReporter *Stats)
: TasksToExecute(TasksToExecute),
MaxNumberOfParallelTasks(
NumberOfParallelTasks == 0 ? 1 : NumberOfParallelTasks),
BeganCallBack(BeganCallBack), FinishedCallBack(FinishedCallBack),
SignalledCallBack(SignalledCallBack), Stats(Stats) {}
/// Run the tasks in the queue, \return true on error.
bool executeTasks();
private:
bool isFinishedExecutingTasks() const {
return ExecutingTasks.empty() && TasksToExecute.empty();
}
/// Start up tasks if we aren't
/// already at the parallel limit, and no earlier subtasks have failed.
/// \return the started tasks, or None for error.
Optional<std::vector<std::unique_ptr<Task>>> startExecutingTasks();
/// Take ownership of the next task to start. Start it, and \return a
/// unique pointer if no error.
std::unique_ptr<Task> startNextTask();
// TODO: factor the polling routines and data structures into a separate class
/// Enter the task and its outputs in this TaskExecutor's data structures so
/// it can be polled.
void recordTasksFdsForPolling(std::unique_ptr<Task> T);
void stopPolling(std::vector<int>);
enum class PollResult { hardError, softError, noError };
PollResult tryPollingFds();
/// \return None on error
Optional<std::vector<int>> readFromReadyFdsReturningFinishedOnes();
enum class FdStatus { hadError, finished, alive };
// TODO: split these next three into a contained class, for handling events?
FdStatus tryReadFromFd(struct pollfd &fd);
FdStatus handleFdEvent(struct pollfd &fd);
};
class TaskCompleter {
Task &T; // CONST? UNIQ?
const struct pollfd &fd;
const TaskQueue::TaskFinishedCallback FinishedCallBack;
const TaskQueue::TaskSignalledCallback SignalledCallBack;
public:
TaskCompleter(Task &T, struct pollfd &fd,
const TaskQueue::TaskFinishedCallback FinishedCallBack,
const TaskQueue::TaskSignalledCallback SignalledCallBack)
: T(T), fd(fd), FinishedCallBack(FinishedCallBack),
SignalledCallBack(SignalledCallBack) {}
/// \return true if had error
bool waitForExitThenCleanUp() const;
private:
/// \return true if had error
bool cleanupExitedTask(int Status) const;
/// \return true if had error
bool cleanupSignalledTask(int Status) const;
/// Block until process \p pid exits then return the exit status.
static Optional<int> waitForPid(pid_t pid);
};
bool TaskExecutor::executeTasks() {
while (!isFinishedExecutingTasks()) {
Optional<std::vector<std::unique_ptr<Task>>> startedTasks =
startExecutingTasks();
if (!startedTasks)
return true;
for (std::unique_ptr<Task> &T : *startedTasks)
recordTasksFdsForPolling(std::move(T));
switch (tryPollingFds()) {
case PollResult::hardError:
return true;
case PollResult::softError:
continue;
case PollResult::noError:
break;
}
Optional<std::vector<int>> FinishedFds =
readFromReadyFdsReturningFinishedOnes();
if (!FinishedFds)
return true;
stopPolling(*FinishedFds);
}
return false;
}
TaskExecutor::PollResult TaskExecutor::tryPollingFds() {
assert(!PollFds.empty() &&
"We should only call poll() if we have fds to watch!");
int ReadyFdCount = poll(PollFds.data(), PollFds.size(), -1);
if (Stats)
Stats->getDriverCounters().NumDriverPipePolls++;
return ReadyFdCount != -1
? PollResult::noError
: errno == EAGAIN || errno == EINTR ? PollResult::softError
: PollResult::hardError;
}
Optional<std::vector<std::unique_ptr<Task>>>
TaskExecutor::startExecutingTasks() {
std::vector<std::unique_ptr<Task>> startedTasks;
while (!TasksToExecute.empty() &&
ExecutingTasks.size() < MaxNumberOfParallelTasks) {
std::unique_ptr<Task> T = startNextTask();
if (!T)
return None;
if (BeganCallBack)
BeganCallBack(T->getPid(), T->getContext());
startedTasks.push_back(std::move(T));
}
return startedTasks;
}
std::unique_ptr<Task> TaskExecutor::startNextTask() {
std::unique_ptr<Task> T(TasksToExecute.front().release());
TasksToExecute.pop();
return T->execute() ? nullptr : std::move(T);
}
void TaskExecutor::recordTasksFdsForPolling(std::unique_ptr<Task> T) {
PollFds.push_back({T->getPipe(), POLLIN | POLLPRI | POLLHUP, 0});
const int Pid = T->getPid();
ExecutingTasks[Pid] = std::move(T);
// We should also poll T->getErrorPipe(), but this introduces timing
// issues with shutting down the task after reading getPipe().
}
Optional<std::vector<int>>
TaskExecutor::readFromReadyFdsReturningFinishedOnes() {
std::vector<int> finishedFds;
for (struct pollfd &fd : PollFds) {
switch (tryReadFromFd(fd)) {
case FdStatus::hadError:
return None;
case FdStatus::finished:
finishedFds.push_back(fd.fd);
break;
case FdStatus::alive:
break;
}
}
return finishedFds;
}
TaskExecutor::FdStatus TaskExecutor::tryReadFromFd(struct pollfd &fd) {
if (fd.revents & POLLIN || fd.revents & POLLPRI || fd.revents & POLLHUP ||
fd.revents & POLLERR) {
FdStatus status = handleFdEvent(fd);
fd.revents = 0;
return status;
}
if (fd.revents & POLLNVAL) {
// We passed an invalid fd; this should never happen,
// since we always mark fds as finished after calling
// Task::finishExecution() (which closes the Task's fd).
llvm_unreachable("Asked poll() to watch a closed fd");
}
fd.revents = 0;
return FdStatus::alive;
}
static Task &findTask(TaskExecutor::PidToTaskMap &ExecutingTasks,
struct pollfd &fd) {
auto predicate =
[&fd](TaskExecutor::PidToTaskMap::value_type &value) -> bool {
return value.second->getPipe() == fd.fd;
};
auto iter =
std::find_if(ExecutingTasks.begin(), ExecutingTasks.end(), predicate);
assert(iter != ExecutingTasks.end() &&
"All outstanding fds must be associated with an executing Task");
return *iter->second;
}
TaskExecutor::FdStatus TaskExecutor::handleFdEvent(struct pollfd &fd) {
Task &T = findTask(ExecutingTasks, fd);
if (fd.revents & POLLIN || fd.revents & POLLPRI) {
// There's data available to read. Read _some_ of it here, but not
// necessarily _all_, since the pipe is in blocking mode and we might
// have other input pending (or soon -- before this subprocess is done
// writing) from other subprocesses.
//
// FIXME: longer term, this should probably either be restructured to
// use O_NONBLOCK, or at very least poll the stderr file descriptor as
// well; the whole loop here is a bit of a mess.
T.readFromPipes(/*UntilEnd = */ false);
}
if (fd.revents & POLLHUP || fd.revents & POLLERR) {
// This fd was "hung up" or had an error, so we need to wait for the
// Task and then clean up.
const bool hadError =
TaskCompleter(T, fd, FinishedCallBack, SignalledCallBack)
.waitForExitThenCleanUp();
ExecutingTasks.erase(T.getPid());
return hadError ? FdStatus::hadError : FdStatus::finished;
}
return FdStatus::alive;
}
bool TaskCompleter::waitForExitThenCleanUp() const {
const Optional<int> StatusIfOK = waitForPid(T.getPid());
if (!StatusIfOK)
return true;
T.finishExecution();
int Status = *StatusIfOK;
return WIFEXITED(Status)
? cleanupExitedTask(Status)
: WIFSIGNALED(Status) ? cleanupSignalledTask(Status)
: false /* Can this case ever happen? */;
}
Optional<int> TaskCompleter::waitForPid(const pid_t pidToWaitFor) {
for (;;) {
int Status = 0;
const pid_t pidFromWait = waitpid(pidToWaitFor, &Status, 0);
if (pidFromWait == pidToWaitFor)
return Status;
if (pidFromWait > 0)
llvm_unreachable(
"We asked to wait for this Task, but we got another Pid!");
if (pidFromWait == 0)
llvm_unreachable("We do not pass WNOHANG, so we should always get a pid");
if (errno == ECHILD || errno == EINVAL)
return None;
}
}
bool TaskCompleter::cleanupExitedTask(int Status) const {
const int Result = WEXITSTATUS(Status);
if (!FinishedCallBack) {
// Since we don't have a TaskFinishedCallback, treat a subtask
// which returned a nonzero exit code as having failed.
return Result != 0;
}
// If we have a TaskFinishedCallback, only have an error if the callback
// returns StopExecution.
return TaskFinishedResponse::StopExecution ==
FinishedCallBack(T.getPid(), Result, T.getOutput(), T.getErrors(),
T.getContext());
}
bool TaskCompleter::cleanupSignalledTask(int Status) const {
// The process exited due to a signal.
const int Signal = WTERMSIG(Status);
StringRef ErrorMsg = strsignal(Signal);
if (!SignalledCallBack) {
// Since we don't have a TaskCrashedCallback, treat a crashing
// subtask as having failed.
return true;
}
// If we have a TaskCrashedCallback, only return an error if the callback
// returns StopExecution.
return TaskFinishedResponse::StopExecution ==
SignalledCallBack(T.getPid(), ErrorMsg, T.getOutput(), T.getErrors(),
T.getContext(), Signal);
}
void TaskExecutor::stopPolling(std::vector<int> FinishedFds) {
// Remove any fds which we've closed from PollFds.
for (int fd : FinishedFds) {
auto predicate = [&fd](struct pollfd &i) { return i.fd == fd; };
auto iter = std::find_if(PollFds.begin(), PollFds.end(), predicate);
assert(iter != PollFds.end() && "The finished fd must be in PollFds!");
PollFds.erase(iter);
}
}
bool TaskQueue::execute(TaskBeganCallback BeganCallBack,
TaskFinishedCallback FinishedCallBack,
TaskSignalledCallback SignalledCallBack) {
TaskExecutor TE(QueuedTasks, getNumberOfParallelTasks(), BeganCallBack,
FinishedCallBack, SignalledCallBack, Stats);
return TE.executeTasks();
}