Skip to content

Commit

Permalink
Add Subprocess::DoWork(int64_t timeout_ms) method.
Browse files Browse the repository at this point in the history
Allow the DoWork() method to return after a given timeout
has expired. The new method returns a WorkResult value which
is an enum that can take three values to indicate process
completion, user interruption or that the timeout has
expired.
  • Loading branch information
digit-google committed Nov 11, 2024
1 parent bc31ad0 commit f797e92
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 41 deletions.
102 changes: 65 additions & 37 deletions src/subprocess-posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,38 @@ Subprocess *SubprocessSet::Add(const string& command, bool use_console) {
return subprocess;
}

#ifdef USE_PPOLL
bool SubprocessSet::DoWork() {
vector<pollfd> fds;
WorkResult ret = DoWork(-1);
return ret == WorkResult::INTERRUPTION;
}

// An optional timespec struct value for pselect() or ppoll(). Usage:
// - Create instance, pass timeout in milliseconds.
// - Call ptr() tp get the pointer to pass to pselect() or ppoll().
struct TimeoutHelper {
// Constructor. A negative timeout_ms value means no timeout.
TimeoutHelper(int64_t timeout_ms) {
if (timeout_ms >= 0) {
ts_.tv_sec = static_cast<long>(timeout_ms / 1000);
ts_.tv_nsec = static_cast<long>((timeout_ms % 1000) * 1000000L);
ptr_ = &ts_;
}
}

const struct timespec* ptr() const { return ptr_; }

private:
struct timespec ts_ {};
const struct timespec* ptr_ = nullptr;
};

#ifdef USE_PPOLL
SubprocessSet::WorkResult SubprocessSet::DoWork(int64_t timeout_millis) {
std::vector<pollfd> fds;
nfds_t nfds = 0;

for (vector<Subprocess*>::iterator i = running_.begin();
i != running_.end(); ++i) {
int fd = (*i)->fd_;
for (const auto& proc : running_) {
int fd = proc->fd_;
if (fd < 0)
continue;
pollfd pfd = { fd, POLLIN | POLLPRI, 0 };
Expand All @@ -264,49 +288,50 @@ bool SubprocessSet::DoWork() {
}

interrupted_ = 0;
int ret = ppoll(&fds.front(), nfds, NULL, &old_mask_);
TimeoutHelper timeout(timeout_millis);
int ret = ppoll(&fds.front(), nfds, timeout.ptr(), &old_mask_);
if (ret == 0) {
return WorkResult::TIMEOUT;
}
if (ret == -1) {
if (errno != EINTR) {
perror("ninja: ppoll");
return false;
Fatal("ppoll", strerror(errno));
}
return IsInterrupted();
return IsInterrupted() ? WorkResult::INTERRUPTION : WorkResult::COMPLETION;
}

HandlePendingInterruption();
if (IsInterrupted())
return true;
return WorkResult::INTERRUPTION;

nfds_t cur_nfd = 0;
for (vector<Subprocess*>::iterator i = running_.begin();
i != running_.end(); ) {
int fd = (*i)->fd_;
for (auto it = running_.begin(); it != running_.end();) {
int fd = (*it)->fd_;
if (fd < 0)
continue;
assert(fd == fds[cur_nfd].fd);
if (fds[cur_nfd++].revents) {
(*i)->OnPipeReady();
if ((*i)->Done()) {
finished_.push(*i);
i = running_.erase(i);
(*it)->OnPipeReady();
if ((*it)->Done()) {
finished_.push(*it);
it = running_.erase(it);
continue;
}
}
++i;
++it;
}

return IsInterrupted();
return IsInterrupted() ? WorkResult::INTERRUPTION : WorkResult::COMPLETION;
}

#else // !defined(USE_PPOLL)
bool SubprocessSet::DoWork() {
SubprocessSet::WorkResult SubprocessSet::DoWork(int64_t timeout_millis) {
fd_set set;
int nfds = 0;
FD_ZERO(&set);

for (vector<Subprocess*>::iterator i = running_.begin();
i != running_.end(); ++i) {
int fd = (*i)->fd_;
for (const auto& proc : running_) {
int fd = proc->fd_;
if (fd >= 0) {
FD_SET(fd, &set);
if (nfds < fd+1)
Expand All @@ -315,34 +340,37 @@ bool SubprocessSet::DoWork() {
}

interrupted_ = 0;
int ret = pselect(nfds, &set, 0, 0, 0, &old_mask_);
TimeoutHelper timeout(timeout_millis);
int ret = pselect(nfds, &set, 0, 0, timeout.ptr(), &old_mask_);
if (ret == 0)
return WorkResult::TIMEOUT;

if (ret == -1) {
if (errno != EINTR) {
perror("ninja: pselect");
return false;
Fatal("pselect", strerror(errno));
}
return IsInterrupted();
return IsInterrupted() ? WorkResult::INTERRUPTION : WorkResult::COMPLETION;
}

HandlePendingInterruption();
if (IsInterrupted())
return true;
return WorkResult::INTERRUPTION;

for (vector<Subprocess*>::iterator i = running_.begin();
i != running_.end(); ) {
int fd = (*i)->fd_;
for (std::vector<Subprocess*>::iterator it = running_.begin();
it != running_.end();) {
int fd = (*it)->fd_;
if (fd >= 0 && FD_ISSET(fd, &set)) {
(*i)->OnPipeReady();
if ((*i)->Done()) {
finished_.push(*i);
i = running_.erase(i);
(*it)->OnPipeReady();
if ((*it)->Done()) {
finished_.push(*it);
it = running_.erase(it);
continue;
}
}
++i;
++it;
}

return IsInterrupted();
return IsInterrupted() ? WorkResult::INTERRUPTION : WorkResult::COMPLETION;
}
#endif // !defined(USE_PPOLL)

Expand Down
16 changes: 13 additions & 3 deletions src/subprocess-win32.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,29 @@ Subprocess *SubprocessSet::Add(const string& command, bool use_console) {
}

bool SubprocessSet::DoWork() {
WorkResult ret = DoWork(-1);
return ret == WorkResult::INTERRUPTION;
}

SubprocessSet::WorkResult SubprocessSet::DoWork(int64_t timeout_millis) {
DWORD bytes_read;
Subprocess* subproc;
OVERLAPPED* overlapped;
DWORD timeout =
(timeout_millis < 0) ? INFINITE : static_cast<DWORD>(timeout_millis);

if (!GetQueuedCompletionStatus(ioport_, &bytes_read, (PULONG_PTR)&subproc,
&overlapped, INFINITE)) {
&overlapped, timeout)) {
if (!overlapped) {
return WorkResult::TIMEOUT;
}
if (GetLastError() != ERROR_BROKEN_PIPE)
Win32Fatal("GetQueuedCompletionStatus");
}

if (!subproc) // A NULL subproc indicates that we were interrupted and is
// delivered by NotifyInterrupted above.
return true;
return WorkResult::INTERRUPTION;

subproc->OnPipeReady();

Expand All @@ -277,7 +287,7 @@ bool SubprocessSet::DoWork() {
}
}

return false;
return WorkResult::COMPLETION;
}

Subprocess* SubprocessSet::NextFinished() {
Expand Down
38 changes: 37 additions & 1 deletion src/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#ifndef NINJA_SUBPROCESS_H_
#define NINJA_SUBPROCESS_H_

#include <stdint.h>

#include <queue>
#include <string>
#include <vector>
#include <queue>

#ifdef _WIN32
#include <windows.h>
Expand Down Expand Up @@ -83,9 +85,43 @@ struct SubprocessSet {
SubprocessSet();
~SubprocessSet();

// Value returned by DoWork() method,
// - COMPLETION means that a process has completed.
// - INTERRUPTION means that user interruption happened. On Posix this means
// a SIGINT, SIGHUP or SIGTERM signal. On Win32, this means Ctrl-C was
// pressed.
// - TIMEOUT means that the called timed out.
enum class WorkResult {
COMPLETION = 0,
INTERRUPTION = 1,
TIMEOUT = 3,
};

// Start a new subprocess running |command|. Set |use_console| to true
// if the process will inherit the current console handles (terminal
// input and outputs on Posix). If false, the subprocess' output
// will be buffered instead, and available after completion.
Subprocess* Add(const std::string& command, bool use_console = false);

// Equivalent to DoWork(-1), which returns true in case of interruption
// and false otherwise.
bool DoWork();

// Wait for at most |timeout_millis| milli-seconds for either a process
// completion or a user-initiated interruption. If |timeout_millis| is
// negative, waits indefinitely, and never return WorkStatus::TIMEOUT.
//
// IMPORTANT: On Posix, spurious wakeups are possible, and will return
// WorkResult::COMPLETION even though no process has really
// completed. The caller should call NextFinished() and compare the
// its result to nullptr to check for this rare condition.
WorkResult DoWork(int64_t timeout_millis);

// Return the next Subprocess after a WorkResult::COMPLETION result.
// The result can be nullptr on Posix in case of spurious wakeups.
// NOTE: This transfers ownership of the Subprocess instance to the caller.
Subprocess* NextFinished();

void Clear();

std::vector<Subprocess*> running_;
Expand Down
15 changes: 15 additions & 0 deletions src/subprocess_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,21 @@ TEST_F(SubprocessTest, InterruptParentWithSigHup) {
ASSERT_FALSE("We should have been interrupted");
}

TEST_F(SubprocessTest, Timeout) {
Subprocess* subproc = subprocs_.Add("sleep 1");
ASSERT_NE((Subprocess*)0, subproc);
size_t timeout_count = 0;
while (true) {
ASSERT_FALSE(subproc->Done());
SubprocessSet::WorkResult ret = subprocs_.DoWork(100);
if (ret == SubprocessSet::WorkResult::TIMEOUT)
++timeout_count;
else
break;
}
ASSERT_GT(timeout_count, 0);
}

TEST_F(SubprocessTest, Console) {
// Skip test if we don't have the console ourselves.
if (isatty(0) && isatty(1) && isatty(2)) {
Expand Down

0 comments on commit f797e92

Please sign in to comment.