Skip to content

Commit

Permalink
Output diagnostics when a multiplexer process returns junk.
Browse files Browse the repository at this point in the history
With this implementation, we get the junk for every outstanding request, which is a bit verbose. But they will all fail anyway. Adding complexity for handling this seems excessive.

PiperOrigin-RevId: 523982547
Change-Id: If145ecd18c99235cfc9a8d46ad37850fcf371c64
  • Loading branch information
larsrc-google authored and copybara-github committed Apr 13, 2023
1 parent f93b2f3 commit 6b55a66
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import com.google.errorprone.annotations.concurrent.LazyInit;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class WorkerMultiplexer {
/** The implementation of the worker protocol (JSON or Proto). */
private WorkerProtocolImpl workerProtocol;
/** InputStream from the worker process. */
private RecordingInputStream recordingStream;
@LazyInit private RecordingInputStream recordingStream;
/** True if this multiplexer was explicitly destroyed. */
private boolean wasDestroyed;
/**
Expand Down Expand Up @@ -304,7 +305,7 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
* called on the thread of a {@code WorkerProxy}, and so is subject to interrupts by dynamic
* execution.
*/
public WorkResponse getResponse(Integer requestId) throws InterruptedException {
public WorkResponse getResponse(Integer requestId) throws InterruptedException, IOException {
try {
if (!process.isAlive()) {
// If the process has died, all we can do is return what may already have been returned.
Expand All @@ -320,10 +321,13 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException {
return workerProcessResponse.get(requestId);
}

// Wait for the multiplexer to get our response and release this semaphore. The semaphore will
// throw {@code InterruptedException} when the multiplexer is terminated.
// Wait for the multiplexer to get our response and release this semaphore. If the multiplexer
// process dies, the semaphore gets released with no response available.
waitForResponse.acquire();

if (workerProcessResponse.get(requestId) == null && !process.isAlive()) {
throw new IOException("Worker process for " + workerKey.getMnemonic() + " has died");
}
return workerProcessResponse.get(requestId);
} finally {
responseChecker.remove(requestId);
Expand Down Expand Up @@ -415,9 +419,11 @@ private boolean readResponse() {
}

String getRecordingStreamMessage() {
// Unlike SingleplexWorker, we don't want to read the remaining bytes, as those could contain
// many other responses. We just return what we actually read.
return recordingStream.getRecordedDataAsString();
// Once we read junk, we can't trust the rest of the stream
synchronized (recordingStream) {
recordingStream.readRemaining();
return recordingStream.getRecordedDataAsString();
}
}

/** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void setReporter(EventHandler reporter) {
workerMultiplexer.setReporter(reporter);
}

@Override
public void prepareExecution(
SandboxInputs inputFiles, SandboxOutputs outputs, Set<PathFragment> workerFiles)
throws IOException {
Expand All @@ -79,7 +80,7 @@ protected void putRequest(WorkRequest request) throws IOException {

/** Wait for WorkResponse from multiplexer. */
@Override
WorkResponse getResponse(int requestId) throws InterruptedException {
WorkResponse getResponse(int requestId) throws InterruptedException, IOException {
return workerMultiplexer.getResponse(requestId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.base.Ascii;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -105,10 +106,10 @@ private static void runPersistentWorker(ExampleWorkerMultiplexerOptions workerOp
while (true) {
try {
WorkRequest request = WorkRequest.parseDelimitedFrom(System.in);
int requestId = request.getRequestId();
if (request == null) {
break;
}
int requestId = request.getRequestId();

inputs.clear();
for (Input input : request.getInputsList()) {
Expand Down Expand Up @@ -273,7 +274,7 @@ private static void processRequest(OptionsParser parser, WorkRequest request) th

String residueStr = Joiner.on(' ').join(residue);
if (options.uppercase) {
residueStr = residueStr.toUpperCase();
residueStr = Ascii.toUpperCase(residueStr);
}
outputs.add(residueStr);
String prefix = options.ignoreSandbox ? "" : request.getSandboxDir();
Expand All @@ -285,7 +286,7 @@ private static void processRequest(OptionsParser parser, WorkRequest request) th
List<String> lines = Files.readAllLines(path);
String content = Joiner.on("\n").join(lines);
if (options.uppercase) {
content = content.toUpperCase();
content = Ascii.toUpperCase(content);
}
outputs.add(content);
}
Expand Down
16 changes: 10 additions & 6 deletions src/test/shell/integration/bazel_worker_multiplexer_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def _impl(ctx):
executable=worker,
progress_message="Working on %s" % ctx.label.name,
mnemonic="Work",
execution_requirements={"supports-multiplex-workers": "1", "supports-multiplex-sandboxing": "1"},
execution_requirements={
"supports-multiplex-workers": "1",
"supports-multiplex-sandboxing": "1",
"requires-worker-protocol": "proto",
},
arguments=ctx.attr.worker_args + argfile_arguments,
)
Expand Down Expand Up @@ -303,8 +307,7 @@ EOF

# When a worker does not conform to the protocol and returns a response that is not a parseable
# protobuf, it must be killed and a helpful error message should be printed.
# TODO(philwo) causes Bazel to crash with NullPointerException.
function DISABLED_test_build_fails_when_worker_returns_junk() {
function test_build_fails_when_worker_returns_junk() {
prepare_example_worker
cat >>BUILD <<'EOF'
[work(
Expand All @@ -325,7 +328,7 @@ EOF
# Check that a helpful error message was printed.
expect_log "Worker process returned an unparseable WorkResponse!"
expect_log "Did you try to print something to stdout"
expect_log "I'm a poisoned worker and this is not a protobuf."
expect_log "I'm a poisone" # Hexdump
}

function test_input_digests() {
Expand Down Expand Up @@ -419,7 +422,8 @@ work(
)
EOF

sed -i.bak '/execution_requirements/d' work.bzl
# Delete entire execution_requirements block
sed -i.bak '/execution_requirements/,+4d' work.bzl
rm -f work.bzl.bak

bazel build --worker_quit_after_build :hello_world &> $TEST_log \
Expand Down Expand Up @@ -492,7 +496,7 @@ EOF
bazel build :${func_name}_2 &> $TEST_log \
&& fail "expected build to fail" || true

expect_log "Worker process did not return a WorkResponse:"
expect_log "Worker process returned an unparseable WorkResponse!"
expect_log "^---8<---8<--- Start of log, file at /"
expect_log "I'm a very poisoned worker and will just crash."
expect_log "^---8<---8<--- End of log ---8<---8<---"
Expand Down

0 comments on commit 6b55a66

Please sign in to comment.