Skip to content

Commit

Permalink
Put the parsed WorkResponse objects into workerProcessResponse, rathe…
Browse files Browse the repository at this point in the history
…r than a reconstituted InputStream.

Also adds some reporting when worker_verbose is set.

RELNOTES: n/a
PiperOrigin-RevId: 341826014
  • Loading branch information
larsrc-google authored and copybara-github committed Nov 11, 2020
1 parent d1ad61c commit 1c40d14
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public Worker create(WorkerKey key) throws Exception {
Path workDir = getSandboxedWorkerPath(key, workerId);
worker = new SandboxedWorker(key, workerId, workDir, logFile);
} else if (key.getProxied()) {
WorkerMultiplexer workerMultiplexer = WorkerMultiplexerManager.getInstance(key, logFile);
WorkerMultiplexer workerMultiplexer =
WorkerMultiplexerManager.getInstance(
key, logFile, workerOptions.workerVerbose ? reporter : null);
worker =
new WorkerProxy(
key, workerId, key.getExecRoot(), workerMultiplexer.getLogFile(), workerMultiplexer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,6 @@ public void afterCommand() {
if (this.workerFactory != null) {
this.workerFactory.setReporter(null);
}
WorkerMultiplexerManager.afterCommandCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.shell.Subprocess;
import com.google.devtools.build.lib.shell.SubprocessBuilder;
import com.google.devtools.build.lib.shell.SubprocessFactory;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -48,7 +47,7 @@ public class WorkerMultiplexer extends Thread {
* A map of {@code WorkResponse}s received from the worker process. They are stored in this map
* until the corresponding {@code WorkerProxy} picks them up.
*/
private final Map<Integer, InputStream> workerProcessResponse;
private final Map<Integer, WorkResponse> workerProcessResponse;
/** A semaphore to protect {@code workerProcessResponse} object. */
private final Semaphore semWorkerProcessResponse;
/**
Expand All @@ -63,8 +62,8 @@ public class WorkerMultiplexer extends Thread {
/** The worker process that this WorkerMultiplexer should be talking to. */
private Subprocess process;
/**
* Set to true if one of the worker processes returns an unparseable response. We then discard all
* the responses from other worker processes and abort.
* Set to true if one of the worker processes returns an unparseable response. We then abort the
* worker process.
*/
private boolean isUnparseable;
/** InputStream from the worker process. */
Expand All @@ -85,15 +84,34 @@ public class WorkerMultiplexer extends Thread {
/** For testing only, allow a way to fake subprocesses. */
private SubprocessFactory subprocessFactory;

/**
* The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared
* at the end of a command execution.
*/
private Reporter reporter;

WorkerMultiplexer(Path logFile) {
this.logFile = logFile;
semWorkerProcessResponse = new Semaphore(1);
semResponseChecker = new Semaphore(1);
responseChecker = new HashMap<>();
workerProcessResponse = new HashMap<>();
isUnparseable = false;
isWorkerStreamClosed = false;
isInterrupted = false;
this.logFile = logFile;
}

/** Sets or clears the reporter for outputting verbose info. */
void setReporter(Reporter reporter) {
this.reporter = reporter;
}

/** Reports a string to the user if reporting is enabled. */
private void report(String s) {
Reporter r = this.reporter; // Protect against race condition with setReporter().
if (r != null && s != null) {
r.handle(Event.info(s));
}
}

/**
Expand All @@ -119,6 +137,7 @@ public synchronized void createProcess(WorkerKey workerKey, Path workDir) throws
processBuilder.setStderr(logFile.getPathFile());
processBuilder.setEnv(workerKey.getEnv());
this.process = processBuilder.start();
report(String.format("Created new multiplexer process for %s", workerKey.getMnemonic()));
}
if (!this.isAlive()) {
this.start();
Expand Down Expand Up @@ -179,7 +198,7 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
* Waits on a semaphore for the {@code WorkResponse} returned from worker process. This method is
* called on the thread of a {@code WorkerProxy}.
*/
public InputStream getResponse(Integer requestId) throws IOException, InterruptedException {
public WorkResponse getResponse(Integer requestId) throws IOException, InterruptedException {
try {
semResponseChecker.acquire();
Semaphore waitForResponse = responseChecker.get(requestId);
Expand All @@ -205,7 +224,7 @@ public InputStream getResponse(Integer requestId) throws IOException, Interrupte
}

semWorkerProcessResponse.acquire();
InputStream response = workerProcessResponse.get(requestId);
WorkResponse response = workerProcessResponse.get(requestId);
semWorkerProcessResponse.release();
return response;
} finally {
Expand Down Expand Up @@ -247,23 +266,32 @@ private void waitResponse() throws InterruptedException, IOException {
// A null parsedResponse can only happen if the input stream is closed.
if (parsedResponse == null) {
isWorkerStreamClosed = true;
report("Multiplexer process has closed its output, aborting multiplexer");
releaseAllSemaphores();
return;
}

int requestId = parsedResponse.getRequestId();
ByteArrayOutputStream tempOs = new ByteArrayOutputStream();
parsedResponse.writeDelimitedTo(tempOs);

semWorkerProcessResponse.acquire();
workerProcessResponse.put(requestId, new ByteArrayInputStream(tempOs.toByteArray()));
workerProcessResponse.put(requestId, parsedResponse);
semWorkerProcessResponse.release();

// TODO(b/151767359): When allowing cancellation, remove responses that have no matching
// entry in responseChecker.
semResponseChecker.acquire();
responseChecker.get(requestId).release();
semResponseChecker.release();
Semaphore semaphore = responseChecker.get(requestId);
if (semaphore != null) {
semaphore.release();
semResponseChecker.release();
} else {
semResponseChecker.release();
logger.atWarning().log("Received response for unknown request %d.", requestId);
semWorkerProcessResponse.acquire();
// Prevent memory leak of useless responses.
workerProcessResponse.remove(requestId);
semWorkerProcessResponse.release();
}
}

/** The multiplexer thread that listens to the WorkResponse from worker process. */
Expand All @@ -274,6 +302,7 @@ public void run() {
waitResponse();
} catch (IOException e) {
isUnparseable = true;
report("Multiplexer process was interrupted during I/O, aborting multiplexer");
releaseAllSemaphores();
logger.atWarning().withCause(e).log(
"IOException was caught while waiting for worker response. "
Expand All @@ -293,13 +322,20 @@ public void run() {
private void releaseAllSemaphores() {
try {
semResponseChecker.acquire();
for (Integer requestId : responseChecker.keySet()) {
responseChecker.get(requestId).release();
for (Semaphore semaphore : responseChecker.values()) {
semaphore.release();
}
} catch (InterruptedException e) {
// Do nothing
} finally {
responseChecker.clear();
semResponseChecker.release();
} catch (InterruptedException e) {
// Do nothing - we only get interrupted during shutdown
}
try {
semWorkerProcessResponse.acquire();
workerProcessResponse.clear();
semWorkerProcessResponse.release();
} catch (InterruptedException e) {
// Do nothing - we only get interrupted during shutdown
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.server.FailureDetails;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.server.FailureDetails.Worker.Code;
Expand Down Expand Up @@ -46,12 +47,13 @@ private WorkerMultiplexerManager() {}
* objects with the same {@code WorkerKey} talk to the same {@code WorkerMultiplexer}. Also,
* record how many {@code WorkerProxy} objects are talking to this {@code WorkerMultiplexer}.
*/
public static WorkerMultiplexer getInstance(WorkerKey key, Path logFile)
public static WorkerMultiplexer getInstance(WorkerKey key, Path logFile, Reporter reporter)
throws InterruptedException {
semMultiplexer.acquire();
multiplexerInstance.putIfAbsent(key, new InstanceInfo(logFile));
multiplexerInstance.get(key).increaseRefCount();
WorkerMultiplexer workerMultiplexer = multiplexerInstance.get(key).getWorkerMultiplexer();
workerMultiplexer.setReporter(reporter);
semMultiplexer.release();
return workerMultiplexer;
}
Expand All @@ -77,6 +79,19 @@ public static void removeInstance(WorkerKey key) throws InterruptedException, Us
}
}

/** Is called when a build is done, to do per-build cleanup. */
static void afterCommandCleanup() {
try {
semMultiplexer.acquire();
for (InstanceInfo i : multiplexerInstance.values()) {
i.getWorkerMultiplexer().setReporter(null);
}
semMultiplexer.release();
} catch (InterruptedException e) {
// Interrupted during cleanup, not much we can do.
}
}

@VisibleForTesting
static WorkerMultiplexer getMultiplexer(WorkerKey key) throws UserExecException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -78,48 +77,15 @@ synchronized void destroy() {

/** Send the WorkRequest to multiplexer. */
@Override
void putRequest(WorkRequest request) throws IOException {
try {
workerMultiplexer.resetResponseChecker(request.getRequestId());
workerMultiplexer.putRequest(request);
} catch (InterruptedException e) {
/**
* We can't throw InterruptedException to WorkerSpawnRunner because of the principle of
* override. InterruptedException will happen when Bazel is waiting for semaphore but user
* terminates the process, so we do nothing here.
*/
logger.atWarning().withCause(e).log(
"InterruptedException was caught while sending worker request. "
+ "It could because the multiplexer was interrupted.");
}
void putRequest(WorkRequest request) throws IOException, InterruptedException {
workerMultiplexer.resetResponseChecker(request.getRequestId());
workerMultiplexer.putRequest(request);
}

/** Wait for WorkResponse from multiplexer. */
@Override
WorkResponse getResponse(int requestId) throws IOException {
try {
InputStream inputStream = workerMultiplexer.getResponse(requestId);
if (inputStream == null) {
return null;
}
return WorkResponse.parseDelimitedFrom(inputStream);
} catch (IOException e) {
throw new IOException(
"IOException was caught while waiting for worker response. "
+ "It could because the worker returned unparseable response.");
} catch (InterruptedException e) {
/**
* We can't throw InterruptedException to WorkerSpawnRunner because of the principle of
* override. InterruptedException will happen when Bazel is waiting for semaphore but user
* terminates the process, so we do nothing here.
*/
logger.atWarning().withCause(e).log(
"InterruptedException was caught while waiting for work response. "
+ "It could because the multiplexer was interrupted.");
}
// response can be null when the worker has already closed stdout at this point and thus
// the InputStream is at EOF.
return null;
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
return workerMultiplexer.getResponse(requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void instanceCreationRemovalTest() throws Exception {
false,
false,
WorkerProtocolFormat.PROTO);
WorkerMultiplexer wm1 = WorkerMultiplexerManager.getInstance(workerKey1, logFile);
WorkerMultiplexer wm1 =
WorkerMultiplexerManager.getInstance(workerKey1, logFile, /* reporter */ null);

assertThat(WorkerMultiplexerManager.getMultiplexer(workerKey1)).isEqualTo(wm1);
assertThat(WorkerMultiplexerManager.getRefCount(workerKey1)).isEqualTo(1);
Expand All @@ -78,14 +79,16 @@ public void instanceCreationRemovalTest() throws Exception {
false,
false,
WorkerProtocolFormat.PROTO);
WorkerMultiplexer wm2 = WorkerMultiplexerManager.getInstance(workerKey2, logFile);
WorkerMultiplexer wm2 =
WorkerMultiplexerManager.getInstance(workerKey2, logFile, /* reporter */ null);

assertThat(WorkerMultiplexerManager.getMultiplexer(workerKey2)).isEqualTo(wm2);
assertThat(WorkerMultiplexerManager.getRefCount(workerKey2)).isEqualTo(1);
assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(2);

// Use the same WorkerProxy hash, it shouldn't instantiate a new WorkerMultiplexer.
WorkerMultiplexer wm2Annex = WorkerMultiplexerManager.getInstance(workerKey2, logFile);
WorkerMultiplexer wm2Annex =
WorkerMultiplexerManager.getInstance(workerKey2, logFile, /* reporter */ null);

assertThat(wm2).isEqualTo(wm2Annex);
assertThat(WorkerMultiplexerManager.getRefCount(workerKey2)).isEqualTo(2);
Expand Down
Loading

0 comments on commit 1c40d14

Please sign in to comment.