Skip to content

Commit

Permalink
Remote: Fix performance regression in "upload missing inputs".
Browse files Browse the repository at this point in the history
Also add more tests.

Fixes bazelbuild#15872.
  • Loading branch information
coeuvre committed Jul 16, 2022
1 parent 8e03d82 commit 6c2b485
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
Expand All @@ -26,8 +24,11 @@
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -36,16 +37,21 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import java.util.stream.Collectors;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand Down Expand Up @@ -85,13 +91,14 @@ public void ensureInputsPresent(
return;
}

MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
Single<Iterable<UploadTask>> uploadTasks = collectDigests(allDigests, force);

Single<List<UploadTask>> missingNewDigests =
uploadTasks.flatMap(tasks -> findMissingUploads(context, tasks));

Flowable<TransferResult> uploads =
Flowable.fromIterable(allDigests)
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));
missingNewDigests.flatMapPublisher(
digests -> uploadNewDigests(context, merkleTree, additionalInputs, digests));

try {
mergeBulkTransfer(uploads).blockingAwait();
Expand All @@ -105,36 +112,6 @@ public void ensureInputsPresent(
}
}

private Single<TransferResult> uploadBlobIfMissing(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() ->
// Only reach here if the digest is missing and is not being uploaded.
missingDigestFinder
.registerAndCount(digest)
.flatMapCompletable(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
directExecutor());
} else {
return Completable.complete();
}
})),
/* onIgnored= */ missingDigestFinder::count,
force);
return toTransferResult(upload);
}

private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
Expand Down Expand Up @@ -165,92 +142,152 @@ private ListenableFuture<Void> uploadBlob(
digest)));
}

/**
* A missing digest finder that initiates the request when the internal counter reaches an
* expected count.
*/
class MissingDigestFinder {
private final int expectedCount;
static class UploadTask {
Digest digest;
// If continuation is not null, we own this task and need to upload the digest if missing and
// then use continuation to notify downstream.
// Otherwise, the task is owned by others, we only care about the completion.
@Nullable CompletableEmitter continuation;
Completable completion;
@Nullable Disposable disposable;
}

private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
private final Single<ImmutableSet<Digest>> resultSingle;
private Single<Iterable<UploadTask>> collectDigests(Iterable<Digest> allDigests, boolean force) {
return Single.using(
() -> Profiler.instance().profile("collect digests"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(digest -> maybeCreateUploadTask(digest, force))
.collect(Collectors.toList()),
SilentCloseable::close);
}

@GuardedBy("this")
private final Set<Digest> digests;
private Maybe<UploadTask> maybeCreateUploadTask(Digest digest, boolean force) {
return Maybe.create(
emitter -> {
AsyncSubject<Void> completion = AsyncSubject.create();
UploadTask uploadTask = new UploadTask();
uploadTask.digest = digest;
uploadTask.completion =
Completable.fromObservable(
completion.doOnDispose(
() -> {
if (uploadTask.disposable != null) {
uploadTask.disposable.dispose();
}
}));
Completable upload =
casUploadCache.execute(
digest,
Completable.create(
continuation -> {
uploadTask.continuation = continuation;
emitter.onSuccess(uploadTask);
}),
() -> emitter.onSuccess(uploadTask),
emitter::onComplete,
force);
upload.subscribe(
new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
uploadTask.disposable = d;
}

@GuardedBy("this")
private int currentCount = 0;
@Override
public void onComplete() {
completion.onComplete();
}

MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
this.expectedCount = expectedCount;
this.digestsSubject = AsyncSubject.create();
this.digests = new HashSet<>();
@Override
public void onError(@NonNull Throwable e) {
completion.onError(e);
}
});
});
}

AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
this.resultSingle =
Single.fromObservable(
digestsSubject
.flatMapSingle(
digests -> {
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
// Make sure we don't have re-subscription caused by refCount() below.
checkState(!wasCalled, "FindMissingDigests is called more than once");
return toSingle(
() -> findMissingDigests(context, digests), directExecutor());
})
// Use replay here because we could have a race condition that downstream hasn't
// been added to the subscription list (to receive the upstream result) while
// upstream is completed.
.replay(1)
.refCount());
}
private Single<List<UploadTask>> findMissingUploads(
RemoteActionExecutionContext context, Iterable<UploadTask> newDigests) {
return Single.using(
() -> Profiler.instance().profile("findMissingDigests"),
ignored ->
toSingle(
() ->
findMissingDigests(
context,
Iterables.transform(
Iterables.filter(
newDigests, uploadTask -> uploadTask.continuation != null),
uploadTask -> uploadTask.digest)),
directExecutor())
.doOnDispose(
() -> {
for (UploadTask uploadTask : newDigests) {
if (uploadTask.disposable != null) {
uploadTask.disposable.dispose();
}
}
})
.map(
missingDigests -> {
List<UploadTask> result = new ArrayList<>();
for (UploadTask uploadTask : newDigests) {
if (missingDigests.contains(uploadTask.digest)) {
result.add(uploadTask);
} else {
if (uploadTask.continuation != null) {
uploadTask.continuation.onComplete();
}
}
}
return result;
}),
SilentCloseable::close);
}

/**
* Register the {@code digest} and increase the counter.
*
* <p>Returned Single cannot be subscribed more than once.
*
* @return Single that emits the result of the {@code FindMissingDigest} request.
*/
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
AtomicBoolean subscribed = new AtomicBoolean(false);
// count() will potentially trigger the findMissingDigests call. Adding and counting before
// returning the Single could introduce a race that the result of findMissingDigests is
// available but the consumer doesn't get it because it hasn't subscribed the returned
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
// findMissingDigests (due to refCount()).
//
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
// returned Single to avoid a re-execution of findMissingDigests.
return resultSingle.doOnSubscribe(
d -> {
boolean wasSubscribed = subscribed.getAndSet(true);
checkState(!wasSubscribed, "Single is subscribed more than once");
synchronized (this) {
digests.add(digest);
}
count();
});
}
private Flowable<TransferResult> uploadNewDigests(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<UploadTask> newDigests) {
return Flowable.using(
() -> Profiler.instance().profile("upload"),
ignored ->
Flowable.fromIterable(newDigests)
.flatMapSingle(
digest -> uploadNewDigest(context, merkleTree, additionalInputs, digest)),
SilentCloseable::close);
}

/** Increase the counter. */
void count() {
ImmutableSet<Digest> digestsResult = null;
private Single<TransferResult> uploadNewDigest(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
UploadTask uploadTask) {
CompletableEmitter continuation = uploadTask.continuation;
if (continuation != null) {
toCompletable(
() -> uploadBlob(context, uploadTask.digest, merkleTree, additionalInputs),
directExecutor())
.subscribe(
new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
continuation.setDisposable(d);
}

synchronized (this) {
if (currentCount < expectedCount) {
currentCount++;
if (currentCount == expectedCount) {
digestsResult = ImmutableSet.copyOf(digests);
}
}
}
@Override
public void onComplete() {
continuation.onComplete();
}

if (digestsResult != null) {
digestsSubject.onNext(digestsResult);
digestsSubject.onComplete();
}
@Override
public void onError(@NonNull Throwable e) {
continuation.onError(e);
}
});
}
return toTransferResult(uploadTask.completion);
}
}
Loading

0 comments on commit 6c2b485

Please sign in to comment.