From 780e27df4df6c098b4732750ad5464b97c25744d Mon Sep 17 00:00:00 2001 From: larsrc Date: Tue, 10 Nov 2020 03:38:40 -0800 Subject: [PATCH] Prevent reads from this.process when this.process can be async updated. Only synchronized methods can write to this.process, and we don't want to synchronize too much. Also fixes the isAlive method in FakeSubprocess. RELNOTES: n/a PiperOrigin-RevId: 341587731 --- .../build/lib/worker/WorkerMultiplexer.java | 16 +++++++++++----- .../devtools/build/lib/worker/TestUtils.java | 2 +- .../build/lib/worker/WorkerMultiplexerTest.java | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index dadfdcb7f07869..00af3113f43c75 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -234,7 +234,13 @@ void resetResponseChecker(Integer requestId) throws InterruptedException { * {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}. */ private void waitResponse() throws InterruptedException, IOException { - recordingStream = new RecordingInputStream(process.getInputStream()); + Subprocess p = this.process; + if (p == null || !p.isAlive()) { + // Avoid busy-wait for a new process. + Thread.sleep(1); + return; + } + recordingStream = new RecordingInputStream(p.getInputStream()); recordingStream.startRecording(4096); WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); @@ -303,14 +309,14 @@ String getRecordingStreamMessage() { /** Returns true if this process has died for other reasons than a call to {@code #destroy()}. */ boolean diedUnexpectedly() { - return process != null && !process.isAlive() && !isInterrupted; + Subprocess p = this.process; // Protects against this.process getting null. + return p != null && !p.isAlive() && !isInterrupted; } /** Returns the exit value of multiplexer's process, if it has exited. */ Optional getExitValue() { - return process != null && !process.isAlive() - ? Optional.of(process.exitValue()) - : Optional.empty(); + Subprocess p = this.process; // Protects against this.process getting null. + return p != null && !p.isAlive() ? Optional.of(p.exitValue()) : Optional.empty(); } /** For testing only, to verify that maps are cleared after responses are reaped. */ diff --git a/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java b/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java index b480e5df4b816c..aa4da66b10f827 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java +++ b/src/test/java/com/google/devtools/build/lib/worker/TestUtils.java @@ -162,7 +162,7 @@ public void close() { @Override public boolean isAlive() { - return wasDestroyed; + return !wasDestroyed; } } } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java index 69784c392b15fc..9075e2a458c106 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerTest.java @@ -190,7 +190,7 @@ public void testGetResponse_slowProxy() WorkResponse fakedResponse1 = WorkResponse.newBuilder().setRequestId(3).build(); WorkResponse fakedResponse2 = WorkResponse.newBuilder().setRequestId(42).build(); - // Responses can arrive out of order + // Responses can arrive out of order, and before the workerproxies are ready to get them. fakedResponse2.writeDelimitedTo(workerOutputStream); fakedResponse1.writeDelimitedTo(workerOutputStream); workerOutputStream.flush();