Skip to content

Commit

Permalink
Add optional idle GC to WorkRequestHandler.
Browse files Browse the repository at this point in the history
Workers have been known to retain releasable memory when not active, but currently we only explicitly GC after a certain amount of CPU has been used. This adds a slower GC scheduled after a certain amount of no activity.

Enabled for JavaBuilder by default.

PiperOrigin-RevId: 501489388
Change-Id: I7d242d7a0af538559783568ef1350e8e7afbb456
  • Loading branch information
larsrc-google authored and copybara-github committed Jan 12, 2023
1 parent 2907a7e commit 57c5ee3
Showing 1 changed file with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -119,8 +123,18 @@ synchronized void addOutput(String s) {
final WorkerMessageProcessor messageProcessor;

private final BiConsumer<Integer, Thread> cancelCallback;

/**
* A scheduler that runs garbage collection after a certain amount of CPU time has passed. In our
* experience, explicit GC reclaims much more than implicit GC. This scheduler helps make sure
* very busy workers don't grow ridiculously large.
*/
private final CpuTimeBasedGcScheduler gcScheduler;
/**
* A scheduler that runs garbage collection after a certain amount of time without any activity.
* In our experience, explicit GC reclaims much more than implicit GC. This scheduler helps make
* sure workers don't hang on to excessive memory after they are done working.
*/
private final IdleGcScheduler idleGcScheduler;

/**
* If set, this worker will stop handling requests and shut itself down. This can happen if
Expand Down Expand Up @@ -190,7 +204,8 @@ private WorkRequestHandler(
stderr,
messageProcessor,
cpuUsageBeforeGc,
cancelCallback);
cancelCallback,
Duration.ZERO);
}

/**
Expand All @@ -207,12 +222,14 @@ private WorkRequestHandler(
PrintStream stderr,
WorkerMessageProcessor messageProcessor,
Duration cpuUsageBeforeGc,
BiConsumer<Integer, Thread> cancelCallback) {
BiConsumer<Integer, Thread> cancelCallback,
Duration idleTimeBeforeGc) {
this.callback = callback;
this.stderr = stderr;
this.messageProcessor = messageProcessor;
this.gcScheduler = new CpuTimeBasedGcScheduler(cpuUsageBeforeGc);
this.cancelCallback = cancelCallback;
this.idleGcScheduler = new IdleGcScheduler(idleTimeBeforeGc);
}

/** A wrapper class for the callback BiFunction */
Expand Down Expand Up @@ -247,6 +264,7 @@ public static class WorkRequestHandlerBuilder {
private final WorkerMessageProcessor messageProcessor;
private Duration cpuUsageBeforeGc = Duration.ZERO;
private BiConsumer<Integer, Thread> cancelCallback;
private Duration idleTimeBeforeGc = Duration.ZERO;

/**
* Creates a {@code WorkRequestHandlerBuilder}.
Expand Down Expand Up @@ -309,10 +327,17 @@ public WorkRequestHandlerBuilder setCancelCallback(BiConsumer<Integer, Thread> c
return this;
}

/** Sets the time without any work that should elapse before forcing a GC. */
@CanIgnoreReturnValue
public WorkRequestHandlerBuilder setIdleTimeBeforeGc(Duration idleTimeBeforeGc) {
this.idleTimeBeforeGc = idleTimeBeforeGc;
return this;
}

/** Returns a WorkRequestHandler instance with the values in this Builder. */
public WorkRequestHandler build() {
return new WorkRequestHandler(
callback, stderr, messageProcessor, cpuUsageBeforeGc, cancelCallback);
callback, stderr, messageProcessor, cpuUsageBeforeGc, cancelCallback, idleTimeBeforeGc);
}
}

Expand All @@ -335,6 +360,7 @@ public void processRequests() throws IOException {
try {
while (!shutdownWorker.get()) {
WorkRequest request = messageProcessor.readWorkRequest();
idleGcScheduler.markActivity(true);
if (request == null) {
break;
}
Expand All @@ -348,6 +374,7 @@ public void processRequests() throws IOException {
stderr.println("Error reading next WorkRequest: " + e);
e.printStackTrace(stderr);
} finally {
idleGcScheduler.stop();
// TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
// but also try to kill stuck threads. For now, we just interrupt the remaining threads.
// We considered doing System.exit here, but that is hard to test and would deny the callers
Expand Down Expand Up @@ -399,6 +426,7 @@ void startResponseThread(WorkerIO workerIO, WorkRequest request) {
RequestInfo requestInfo = activeRequests.get(request.getRequestId());
if (requestInfo == null) {
// Already cancelled
idleGcScheduler.markActivity(!activeRequests.isEmpty());
return;
}
try {
Expand All @@ -412,6 +440,7 @@ void startResponseThread(WorkerIO workerIO, WorkRequest request) {
}
} finally {
activeRequests.remove(request.getRequestId());
idleGcScheduler.markActivity(!activeRequests.isEmpty());
}
},
threadName);
Expand All @@ -423,6 +452,7 @@ void startResponseThread(WorkerIO workerIO, WorkRequest request) {
stderr.println("Error thrown by worker thread, shutting down worker.");
e.printStackTrace(stderr);
currentThread.interrupt();
idleGcScheduler.stop();
}
});
RequestInfo previous = activeRequests.putIfAbsent(request.getRequestId(), new RequestInfo(t));
Expand Down Expand Up @@ -524,6 +554,66 @@ public void close() throws IOException {
messageProcessor.close();
}

/** Schedules GC when the worker has been idle for a while */
private static class IdleGcScheduler {
private Instant lastActivity = Instant.EPOCH;
private Instant lastGc = Instant.EPOCH;
/** Minimum duration from the end of activity until we perform an idle GC. */
private final Duration idleTimeBeforeGc;

private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture<?> futureGc = null;

/**
* Creates a new scheduler.
*
* @param idleTimeBeforeGc The time from the last activity until attempting GC.
*/
public IdleGcScheduler(Duration idleTimeBeforeGc) {
this.idleTimeBeforeGc = idleTimeBeforeGc;
}

synchronized void start() {
if (!idleTimeBeforeGc.isZero()) {
futureGc =
executor.schedule(this::maybeDoGc, idleTimeBeforeGc.toMillis(), TimeUnit.MILLISECONDS);
}
}

/**
* Should be called whenever there is some sort of activity starting or ending. Better to call
* too often.
*/
synchronized void markActivity(boolean anythingActive) {
lastActivity = Instant.now();
if (futureGc != null) {
futureGc.cancel(false);
futureGc = null;
}
if (!anythingActive) {
start();
}
}

private void maybeDoGc() {
if (lastGc.isBefore(lastActivity)
&& lastActivity.isBefore(Instant.now().minus(idleTimeBeforeGc))) {
System.gc();
lastGc = Instant.now();
} else {
start();
}
}

synchronized void stop() {
if (futureGc != null) {
futureGc.cancel(false);
futureGc = null;
}
executor.shutdown();
}
}

/**
* Class that performs GC occasionally, based on how much CPU time has passed. This strikes a
* compromise between blindly doing GC after e.g. every request, which takes too much CPU, and not
Expand Down

0 comments on commit 57c5ee3

Please sign in to comment.