diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index 5474f884233832..9c8e9e4c918ab0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -13,8 +13,6 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle; @@ -26,8 +24,11 @@ import build.bazel.remote.execution.v2.Directory; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.profiler.Profiler; +import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; @@ -38,14 +39,15 @@ import com.google.protobuf.Message; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.subjects.AsyncSubject; import java.io.IOException; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.concurrent.GuardedBy; +import java.util.stream.Collectors; /** A {@link RemoteCache} with additional functionality needed for remote execution. */ public class RemoteExecutionCache extends RemoteCache { @@ -85,13 +87,14 @@ public void ensureInputsPresent( return; } - MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size()); + Single> newDigests = findNewDigests(allDigests, force); + + Single> missingNewDigests = + newDigests.flatMap(digests -> findMissingNewDigests(context, digests)); + Flowable uploads = - Flowable.fromIterable(allDigests) - .flatMapSingle( - digest -> - uploadBlobIfMissing( - context, merkleTree, additionalInputs, force, missingDigestFinder, digest)); + missingNewDigests.flatMapPublisher( + digests -> uploadNewDigests(context, merkleTree, additionalInputs, digests)); try { mergeBulkTransfer(uploads).blockingAwait(); @@ -105,36 +108,6 @@ public void ensureInputsPresent( } } - private Single uploadBlobIfMissing( - RemoteActionExecutionContext context, - MerkleTree merkleTree, - Map additionalInputs, - boolean force, - MissingDigestFinder missingDigestFinder, - Digest digest) { - Completable upload = - casUploadCache.execute( - digest, - Completable.defer( - () -> - // Only reach here if the digest is missing and is not being uploaded. - missingDigestFinder - .registerAndCount(digest) - .flatMapCompletable( - missingDigests -> { - if (missingDigests.contains(digest)) { - return toCompletable( - () -> uploadBlob(context, digest, merkleTree, additionalInputs), - directExecutor()); - } else { - return Completable.complete(); - } - })), - /* onIgnored= */ missingDigestFinder::count, - force); - return toTransferResult(upload); - } - private ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, @@ -165,92 +138,98 @@ private ListenableFuture uploadBlob( digest))); } - /** - * A missing digest finder that initiates the request when the internal counter reaches an - * expected count. - */ - class MissingDigestFinder { - private final int expectedCount; - - private final AsyncSubject> digestsSubject; - private final Single> resultSingle; - - @GuardedBy("this") - private final Set digests; - - @GuardedBy("this") - private int currentCount = 0; - - MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) { - checkArgument(expectedCount > 0, "expectedCount should be greater than 0"); - this.expectedCount = expectedCount; - this.digestsSubject = AsyncSubject.create(); - this.digests = new HashSet<>(); + static class NewDigest { + Digest digest; + AsyncSubject continueSubject; + Completable completion; + } - AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false); - this.resultSingle = - Single.fromObservable( - digestsSubject - .flatMapSingle( - digests -> { - boolean wasCalled = findMissingDigestsCalled.getAndSet(true); - // Make sure we don't have re-subscription caused by refCount() below. - checkState(!wasCalled, "FindMissingDigests is called more than once"); - return toSingle( - () -> findMissingDigests(context, digests), directExecutor()); - }) - // Use replay here because we could have a race condition that downstream hasn't - // been added to the subscription list (to receive the upstream result) while - // upstream is completed. - .replay(1) - .refCount()); - } + private Single> findNewDigests(Iterable allDigests, boolean force) { + return Single.using( + () -> Profiler.instance().profile("collect digest"), + ignored -> + Flowable.fromIterable(allDigests) + .flatMapMaybe(digest -> findNewDigest(digest, force)) + .collect(Collectors.toList()), + SilentCloseable::close); + } - /** - * Register the {@code digest} and increase the counter. - * - *

Returned Single cannot be subscribed more than once. - * - * @return Single that emits the result of the {@code FindMissingDigest} request. - */ - Single> registerAndCount(Digest digest) { - AtomicBoolean subscribed = new AtomicBoolean(false); - // count() will potentially trigger the findMissingDigests call. Adding and counting before - // returning the Single could introduce a race that the result of findMissingDigests is - // available but the consumer doesn't get it because it hasn't subscribed the returned - // Single. In this case, it subscribes after upstream is completed resulting a re-run of - // findMissingDigests (due to refCount()). - // - // Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the - // returned Single to avoid a re-execution of findMissingDigests. - return resultSingle.doOnSubscribe( - d -> { - boolean wasSubscribed = subscribed.getAndSet(true); - checkState(!wasSubscribed, "Single is subscribed more than once"); - synchronized (this) { - digests.add(digest); - } - count(); - }); - } + @SuppressWarnings("CheckReturnValue") + private Maybe findNewDigest(Digest digest, boolean force) { + return Maybe.create( + emitter -> { + AsyncSubject afterSubject = AsyncSubject.create(); + + Completable upload = + casUploadCache.execute( + digest, + Completable.defer( + () -> { + NewDigest newDigest = new NewDigest(); + newDigest.digest = digest; + newDigest.continueSubject = AsyncSubject.create(); + newDigest.completion = Completable.fromObservable(afterSubject); + emitter.onSuccess(newDigest); + return Completable.fromObservable(newDigest.continueSubject); + }), + emitter::onComplete, + force); + + Observable.fromCompletable(upload).subscribeWith(afterSubject); + }); + } - /** Increase the counter. */ - void count() { - ImmutableSet digestsResult = null; + private Single> findMissingNewDigests( + RemoteActionExecutionContext context, Iterable newDigests) { + return Single.using( + () -> Profiler.instance().profile("findMissingDigests"), + ignored -> + toSingle( + () -> + findMissingDigests( + context, + Iterables.transform(newDigests, newDigest -> newDigest.digest)), + directExecutor()) + .map( + missingDigests -> { + List result = new ArrayList<>(); + for (NewDigest newDigest : newDigests) { + if (missingDigests.contains(newDigest.digest)) { + result.add(newDigest); + } else { + newDigest.continueSubject.onComplete(); + } + } + return result; + }), + SilentCloseable::close); + } - synchronized (this) { - if (currentCount < expectedCount) { - currentCount++; - if (currentCount == expectedCount) { - digestsResult = ImmutableSet.copyOf(digests); - } - } - } + private Flowable uploadNewDigests( + RemoteActionExecutionContext context, + MerkleTree merkleTree, + Map additionalInputs, + Iterable newDigests) { + return Flowable.using( + () -> Profiler.instance().profile("upload"), + ignored -> + Flowable.fromIterable(newDigests) + .flatMapSingle( + digest -> uploadNewDigest(context, merkleTree, additionalInputs, digest)), + SilentCloseable::close); + } - if (digestsResult != null) { - digestsSubject.onNext(digestsResult); - digestsSubject.onComplete(); - } - } + @SuppressWarnings("CheckReturnValue") + private Single uploadNewDigest( + RemoteActionExecutionContext context, + MerkleTree merkleTree, + Map additionalInputs, + NewDigest digest) { + Observable.fromCompletable( + toCompletable( + () -> uploadBlob(context, digest.digest, merkleTree, additionalInputs), + directExecutor())) + .subscribeWith(digest.continueSubject); + return toTransferResult(digest.completion); } }