diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 24b22bdf274f64..b95cea8d5aee56 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -29,6 +29,7 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import com.google.errorprone.annotations.concurrent.LazyInit; import java.io.File; import java.io.IOException; import java.io.InterruptedIOException; @@ -82,7 +83,7 @@ public class WorkerMultiplexer { /** The implementation of the worker protocol (JSON or Proto). */ private WorkerProtocolImpl workerProtocol; /** InputStream from the worker process. */ - private RecordingInputStream recordingStream; + @LazyInit private RecordingInputStream recordingStream; /** True if this multiplexer was explicitly destroyed. */ private boolean wasDestroyed; /** @@ -304,7 +305,7 @@ public synchronized void putRequest(WorkRequest request) throws IOException { * called on the thread of a {@code WorkerProxy}, and so is subject to interrupts by dynamic * execution. */ - public WorkResponse getResponse(Integer requestId) throws InterruptedException { + public WorkResponse getResponse(Integer requestId) throws InterruptedException, IOException { try { if (!process.isAlive()) { // If the process has died, all we can do is return what may already have been returned. @@ -320,10 +321,13 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { return workerProcessResponse.get(requestId); } - // Wait for the multiplexer to get our response and release this semaphore. The semaphore will - // throw {@code InterruptedException} when the multiplexer is terminated. + // Wait for the multiplexer to get our response and release this semaphore. If the multiplexer + // process dies, the semaphore gets released with no response available. waitForResponse.acquire(); + if (workerProcessResponse.get(requestId) == null && !process.isAlive()) { + throw new IOException("Worker process for " + workerKey.getMnemonic() + " has died"); + } return workerProcessResponse.get(requestId); } finally { responseChecker.remove(requestId); @@ -415,9 +419,11 @@ private boolean readResponse() { } String getRecordingStreamMessage() { - // Unlike SingleplexWorker, we don't want to read the remaining bytes, as those could contain - // many other responses. We just return what we actually read. - return recordingStream.getRecordedDataAsString(); + // Once we read junk, we can't trust the rest of the stream + synchronized (recordingStream) { + recordingStream.readRemaining(); + return recordingStream.getRecordedDataAsString(); + } } /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */ diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java index 92fb4cb59383c0..dc5b8ec07ded6b 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -56,6 +56,7 @@ void setReporter(EventHandler reporter) { workerMultiplexer.setReporter(reporter); } + @Override public void prepareExecution( SandboxInputs inputFiles, SandboxOutputs outputs, Set workerFiles) throws IOException { @@ -79,7 +80,7 @@ protected void putRequest(WorkRequest request) throws IOException { /** Wait for WorkResponse from multiplexer. */ @Override - WorkResponse getResponse(int requestId) throws InterruptedException { + WorkResponse getResponse(int requestId) throws InterruptedException, IOException { return workerMultiplexer.getResponse(requestId); } diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java index 54d7c159d4d30a..9fcbec8e804967 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java @@ -15,6 +15,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.base.Ascii; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -105,10 +106,10 @@ private static void runPersistentWorker(ExampleWorkerMultiplexerOptions workerOp while (true) { try { WorkRequest request = WorkRequest.parseDelimitedFrom(System.in); - int requestId = request.getRequestId(); if (request == null) { break; } + int requestId = request.getRequestId(); inputs.clear(); for (Input input : request.getInputsList()) { @@ -273,7 +274,7 @@ private static void processRequest(OptionsParser parser, WorkRequest request) th String residueStr = Joiner.on(' ').join(residue); if (options.uppercase) { - residueStr = residueStr.toUpperCase(); + residueStr = Ascii.toUpperCase(residueStr); } outputs.add(residueStr); String prefix = options.ignoreSandbox ? "" : request.getSandboxDir(); @@ -285,7 +286,7 @@ private static void processRequest(OptionsParser parser, WorkRequest request) th List lines = Files.readAllLines(path); String content = Joiner.on("\n").join(lines); if (options.uppercase) { - content = content.toUpperCase(); + content = Ascii.toUpperCase(content); } outputs.add(content); } diff --git a/src/test/shell/integration/bazel_worker_multiplexer_test.sh b/src/test/shell/integration/bazel_worker_multiplexer_test.sh index 666a8508bd8171..dd9df4133b24f8 100755 --- a/src/test/shell/integration/bazel_worker_multiplexer_test.sh +++ b/src/test/shell/integration/bazel_worker_multiplexer_test.sh @@ -90,7 +90,11 @@ def _impl(ctx): executable=worker, progress_message="Working on %s" % ctx.label.name, mnemonic="Work", - execution_requirements={"supports-multiplex-workers": "1", "supports-multiplex-sandboxing": "1"}, + execution_requirements={ + "supports-multiplex-workers": "1", + "supports-multiplex-sandboxing": "1", + "requires-worker-protocol": "proto", + }, arguments=ctx.attr.worker_args + argfile_arguments, ) @@ -303,8 +307,7 @@ EOF # When a worker does not conform to the protocol and returns a response that is not a parseable # protobuf, it must be killed and a helpful error message should be printed. -# TODO(philwo) causes Bazel to crash with NullPointerException. -function DISABLED_test_build_fails_when_worker_returns_junk() { +function test_build_fails_when_worker_returns_junk() { prepare_example_worker cat >>BUILD <<'EOF' [work( @@ -325,7 +328,7 @@ EOF # Check that a helpful error message was printed. expect_log "Worker process returned an unparseable WorkResponse!" expect_log "Did you try to print something to stdout" - expect_log "I'm a poisoned worker and this is not a protobuf." + expect_log "I'm a poisone" # Hexdump } function test_input_digests() { @@ -419,7 +422,8 @@ work( ) EOF - sed -i.bak '/execution_requirements/d' work.bzl + # Delete entire execution_requirements block + sed -i.bak '/execution_requirements/,+4d' work.bzl rm -f work.bzl.bak bazel build --worker_quit_after_build :hello_world &> $TEST_log \ @@ -492,7 +496,7 @@ EOF bazel build :${func_name}_2 &> $TEST_log \ && fail "expected build to fail" || true - expect_log "Worker process did not return a WorkResponse:" + expect_log "Worker process returned an unparseable WorkResponse!" expect_log "^---8<---8<--- Start of log, file at /" expect_log "I'm a very poisoned worker and will just crash." expect_log "^---8<---8<--- End of log ---8<---8<---"