Skip to content

Commit

Permalink
[7.2.0] Fix certain deadlocks in repo fetching with worker threads (#…
Browse files Browse the repository at this point in the history
…22261)

RELNOTES: Fixed certain deadlocks in repo fetching with worker threads
(`--experimental_worker_for_repo_fetching=auto`).
  • Loading branch information
Wyverald authored May 6, 2024
1 parent b9fd0a3 commit f661878
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider;
import com.google.devtools.build.lib.analysis.RuleDefinition;
Expand Down Expand Up @@ -63,6 +62,7 @@
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
Expand Down Expand Up @@ -120,8 +120,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -161,9 +159,6 @@ public class BazelRepositoryModule extends BlazeModule {
private List<String> allowedYankedVersions = ImmutableList.of();
private boolean disableNativeRepoRules;
private SingleExtensionEvalFunction singleExtensionEvalFunction;
private final ExecutorService repoFetchingWorkerThreadPool =
Executors.newFixedThreadPool(
100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build());

@Nullable private CredentialModule credentialModule;

Expand Down Expand Up @@ -313,36 +308,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {

RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
if (repoOptions != null) {
switch (repoOptions.workerForRepoFetching) {
case OFF:
starlarkRepositoryFunction.setWorkerExecutorService(null);
break;
case PLATFORM:
starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
break;
case VIRTUAL:
case AUTO:
try {
// Since Google hasn't migrated to JDK 21 yet, we can't directly call
// Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt
// anyone... right? (OSS Bazel already ships with a bundled JDK 21)
starlarkRepositoryFunction.setWorkerExecutorService(
(ExecutorService)
Executors.class
.getDeclaredMethod("newVirtualThreadPerTaskExecutor")
.invoke(null));
} catch (ReflectiveOperationException e) {
if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
starlarkRepositoryFunction.setWorkerExecutorService(null);
} else {
throw new AbruptExitException(
detailedExitCode(
"couldn't create virtual worker thread executor for repo fetching",
Code.BAD_DOWNLOADER_CONFIG),
e);
}
}
}
starlarkRepositoryFunction.setUseWorkers(
repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF);
downloadManager.setDisableDownload(repoOptions.disableDownload);
if (repoOptions.repositoryDownloaderRetries >= 0) {
downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,8 @@ public Converter() {
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
+ " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
+ " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If"
+ " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+),"
+ " otherwise no worker thread is used.")
+ " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker"
+ " thread.")
public WorkerForRepoFetching workerForRepoFetching;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,53 @@

package com.google.devtools.build.lib.bazel.repository.starlark;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/**
* Captures state that persists across different invocations of {@link
* com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction}, specifically {@link
* StarlarkRepositoryFunction}.
*
* <p>This class is used to hold on to a worker thread (in reality just a {@link Future} object)
* when fetching repos using a worker thread is enabled. The worker thread uses a {@link
* SkyFunction.Environment} object acquired from the host thread, and can signal the host thread to
* restart to get a fresh environment object.
* <p>This class is used to hold on to a worker thread when fetching repos using a worker thread is
* enabled. The worker thread uses a {@link SkyFunction.Environment} object acquired from the host
* thread, and can signal the host thread to restart to get a fresh environment object.
*/
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {

/** A signal that the worker thread can send to the host Skyframe thread. */
enum Signal {
/**
* Indicates that the host thread should return {@code null}, causing a Skyframe restart. After
* sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting
* for the host thread to send a fresh {@link SkyFunction.Environment} over.
*/
RESTART,
/**
* Indicates that the worker thread has finished running, either yielding a result or an
* exception.
*/
DONE
}

/** The channel for the worker thread to send a signal to the host Skyframe thread. */
final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
/**
* A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
* (successfully or otherwise), or to indicate that the host thread should return {@code null},
* causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
* delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
* over.
*/
// A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
// be interrupted.
final Semaphore signalSemaphore = new Semaphore(0);

/**
* The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
* back to the worker thread.
*/
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
// We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
// gets interrupted before the host thread restarts, the host thread doesn't hang forever.
final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new ArrayBlockingQueue<>(1);

/**
* This future holds on to the worker thread in order to cancel it when necessary; it also serves
Expand All @@ -69,7 +70,14 @@ enum Signal {
// could happen on multiple threads. Canceling a future multiple times is safe, though, so we
// only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
// we might block in `close()`, which is potentially bad (see its javadoc).
@Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;
@Nullable volatile ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;

/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
// sure the worker thread has shut down (with its blocking `close()` method).
ListeningExecutorService workerExecutorService;

private final String repoName;

/**
* This is where the recorded inputs & values for the whole invocation is collected.
Expand All @@ -79,17 +87,71 @@ enum Signal {
*/
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

RepoFetchingSkyKeyComputeState(String repoName) {
this.repoName = repoName;
reset();
}

// This may only be called from the host Skyframe thread, *and* only when no worker thread is
// running.
private void reset() {
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
recordedInputValues.clear();
}

/**
* Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
* back. This may only be called from the worker thread.
*/
SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
signalQueue.put(Signal.RESTART);
signalSemaphore.release();
return delegateEnvQueue.take();
}

/**
* Starts a worker thread running the given callable. This sets the {@code workerFuture} field,
* and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes,
* successfully or otherwise. Returns the worker future. This may only be called from the host
* Skyframe thread.
*/
ListenableFuture<RepositoryDirectoryValue.Builder> startWorker(
Callable<RepositoryDirectoryValue.Builder> c) {
var workerFuture = workerExecutorService.submit(c);
this.workerFuture = workerFuture;
workerFuture.addListener(signalSemaphore::release, directExecutor());
return workerFuture;
}

// This may be called from any thread, including the host Skyframe thread and the
// high-memory-pressure listener thread.
@Override
public void close() {
var myWorkerFuture = workerFuture;
workerFuture = null;
if (myWorkerFuture != null) {
myWorkerFuture.cancel(true);
}
workerExecutorService.shutdownNow();
}

/**
* Closes the state object, and blocks until all pending async work is finished. The state object
* will reset to a clean slate after this method finishes. This may only be called from the host
* Skyframe thread.
*/
public void closeAndWaitForTermination() throws InterruptedException {
close();
workerExecutorService.close(); // This blocks
// We reset the state object back to its very initial state, since the host SkyFunction may be
// re-entered (for example b/330892334 and https://github.com/bazelbuild/bazel/issues/21238).
reset();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}
Loading

0 comments on commit f661878

Please sign in to comment.