Skip to content

Commit

Permalink
Add (inactive) code to compress large GenQuery results in memory on t…
Browse files Browse the repository at this point in the history
…he fly

Dynamically switches to gzipping GenQuery results >1M, trading CPU for memory.
Hard coded to off for now while I sort out how to connect this to a flag.

PiperOrigin-RevId: 258568242
  • Loading branch information
michajlo authored and copybara-github committed Jul 17, 2019
1 parent 26bcd27 commit 6d63f8a
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import com.google.devtools.build.lib.analysis.Runfiles;
import com.google.devtools.build.lib.analysis.RunfilesProvider;
import com.google.devtools.build.lib.analysis.actions.AbstractFileWriteAction;
import com.google.devtools.build.lib.analysis.actions.ByteStringDeterministicWriter;
import com.google.devtools.build.lib.analysis.actions.AbstractFileWriteAction.DeterministicWriter;
import com.google.devtools.build.lib.analysis.config.CoreOptions;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
Expand Down Expand Up @@ -73,6 +73,7 @@
import com.google.devtools.build.lib.query2.query.output.QueryOptions;
import com.google.devtools.build.lib.query2.query.output.QueryOptions.OrderOutput;
import com.google.devtools.build.lib.query2.query.output.QueryOutputUtils;
import com.google.devtools.build.lib.rules.genquery.GenQueryOutputStream.GenQueryResult;
import com.google.devtools.build.lib.runtime.KeepGoingOption;
import com.google.devtools.build.lib.skyframe.PackageValue;
import com.google.devtools.build.lib.skyframe.TargetPatternValue;
Expand All @@ -92,6 +93,7 @@
import com.google.devtools.common.options.OptionsParsingException;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Collection;
import java.util.HashSet;
Expand All @@ -117,6 +119,7 @@ public ConfiguredTarget create(RuleContext ruleContext)
// The query string
final String query = ruleContext.attributes().get("expression", Type.STRING);

@SuppressWarnings("unchecked")
OptionsParser optionsParser =
OptionsParser.builder()
.optionsClasses(QueryOptions.class, KeepGoingOption.class)
Expand Down Expand Up @@ -169,7 +172,7 @@ public ConfiguredTarget create(RuleContext ruleContext)
// force relative_locations to true so it has a deterministic output across machines.
queryOptions.relativeLocations = true;

ByteString result;
GenQueryResult result;
try (SilentCloseable c =
Profiler.instance().profile("GenQuery.executeQuery/" + ruleContext.getLabel())) {
result =
Expand All @@ -193,11 +196,14 @@ public ConfiguredTarget create(RuleContext ruleContext)
NestedSet<Artifact> filesToBuild = NestedSetBuilder.create(Order.STABLE_ORDER, outputArtifact);
return new RuleConfiguredTargetBuilder(ruleContext)
.setFilesToBuild(filesToBuild)
.add(RunfilesProvider.class, RunfilesProvider.simple(
new Runfiles.Builder(
ruleContext.getWorkspaceName(),
ruleContext.getConfiguration().legacyExternalRunfiles())
.addTransitiveArtifacts(filesToBuild).build()))
.addProvider(
RunfilesProvider.class,
RunfilesProvider.simple(
new Runfiles.Builder(
ruleContext.getWorkspaceName(),
ruleContext.getConfiguration().legacyExternalRunfiles())
.addTransitiveArtifacts(filesToBuild)
.build()))
.build();
}

Expand Down Expand Up @@ -273,7 +279,7 @@ private ExtendedEventHandler getEventHandler(RuleContext ruleContext) {
}

@Nullable
private ByteString executeQuery(
private GenQueryResult executeQuery(
RuleContext ruleContext, QueryOptions queryOptions, Collection<Label> scope, String query)
throws InterruptedException {
SkyFunction.Environment env = ruleContext.getAnalysisEnvironment().getSkyframeEnv();
Expand All @@ -300,7 +306,7 @@ private ByteString executeQuery(

@SuppressWarnings("unchecked")
@Nullable
private ByteString doQuery(
private GenQueryResult doQuery(
QueryOptions queryOptions,
PreloadedMapPackageProvider packageProvider,
Predicate<Label> labelFilter,
Expand Down Expand Up @@ -372,37 +378,39 @@ private ByteString doQuery(
throw new RuntimeException(e);
}

ByteString.Output outputStream = ByteString.newOutput();
// TODO(b/137379942): Enable compression.
GenQueryOutputStream outputStream = new GenQueryOutputStream(/*compressionEnabled=*/ false);
try {
QueryOutputUtils
.output(queryOptions, queryResult, targets.getResult(), formatter, outputStream,
queryOptions.aspectDeps.createResolver(packageProvider, getEventHandler(ruleContext)));
outputStream.close();
} catch (ClosedByInterruptException e) {
throw new InterruptedException(e.getMessage());
} catch (IOException e) {
throw new RuntimeException(e);
}

return outputStream.toByteString();
return outputStream.getResult();
}

@Immutable // assuming no other reference to result
private static final class QueryResultAction extends AbstractFileWriteAction {
private final ByteString result;
private final GenQueryResult result;

private QueryResultAction(ActionOwner owner, Artifact output, ByteString result) {
private QueryResultAction(ActionOwner owner, Artifact output, GenQueryResult result) {
super(owner, ImmutableList.<Artifact>of(), output, /*makeExecutable=*/false);
this.result = result;
}

@Override
public DeterministicWriter newDeterministicWriter(ActionExecutionContext ctx) {
return new ByteStringDeterministicWriter(result);
return new GenQueryResultWriter(result);
}

@Override
protected void computeKey(ActionKeyContext actionKeyContext, Fingerprint fp) {
fp.addBytes(result);
result.fingerprint(fp);
}
}

Expand Down Expand Up @@ -575,4 +583,22 @@ public BrokenQueryScopeException(String message) {
super(message);
}
}

private static class GenQueryResultWriter implements DeterministicWriter {
private final GenQueryResult genQueryResult;

GenQueryResultWriter(GenQueryResult genQueryResult) {
this.genQueryResult = genQueryResult;
}

@Override
public void writeOutputFile(OutputStream out) throws IOException {
genQueryResult.writeTo(out);
}

@Override
public ByteString getBytes() throws IOException {
return genQueryResult.getBytes();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.rules.genquery;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.util.Fingerprint;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* {@link OutputStream} implementation optimized for {@link GenQuery} by (optionally) compressing
* query results on the fly. Produces {@link GenQueryResult}s which are preferred for storing the
* output of {@link GenQuery}'s underlying queries.
*/
class GenQueryOutputStream extends OutputStream {

/**
* When compression is enabled, the threshold at which the stream will switch to compressing
* output. The value of this constant is arbitrary but effective.
*/
private static final int COMPRESSION_THRESHOLD = 1 << 20;

/**
* Encapsulates the output of a {@link GenQuery}'s query. CPU and memory overhead of individual
* methods depends on the underlying content and settings.
*/
interface GenQueryResult {
/** Returns the query output as a {@link ByteString}. */
ByteString getBytes() throws IOException;

/**
* Adds the query output to the supplied {@link Fingerprint}. Equivalent to {@code
* fingerprint.addBytes(genQueryResult.getBytes())}, but potentially more efficient.
*/
void fingerprint(Fingerprint fingerprint);

/**
* Returns the size of the output. This must be a constant-time operation for all
* implementations.
*/
int size();

/**
* Writes the query output to the provided {@link OutputStream}. Equivalent to {@code
* genQueryResult.getBytes().writeTo(out)}, but potentially more efficient.
*/
void writeTo(OutputStream out) throws IOException;
}

private final boolean compressionEnabled;
private int bytesWritten = 0;
private boolean compressed = false;
private boolean closed = false;
private ByteString.Output bytesOut = ByteString.newOutput();
private OutputStream out = bytesOut;

GenQueryOutputStream(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
}

@Override
public void write(int b) throws IOException {
maybeStartCompression(1);
out.write(b);
bytesWritten += 1;
}

@Override
public void write(byte[] bytes) throws IOException {
write(bytes, 0, bytes.length);
}

@Override
public void write(byte[] bytes, int off, int len) throws IOException {
maybeStartCompression(len);
out.write(bytes, off, len);
bytesWritten += len;
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void close() throws IOException {
out.close();
closed = true;
}

GenQueryResult getResult() {
Preconditions.checkState(closed, "Must be closed");
return compressed
? new CompressedResult(bytesOut.toByteString(), bytesWritten)
: new RegularResult(bytesOut.toByteString());
}

private void maybeStartCompression(int additionalBytes) throws IOException {
if (!compressionEnabled) {
return;
}

if (compressed) {
return;
}

if (bytesWritten + additionalBytes < COMPRESSION_THRESHOLD) {
return;
}

ByteString.Output compressedBytesOut = ByteString.newOutput();
GZIPOutputStream gzipOut = new GZIPOutputStream(compressedBytesOut);
bytesOut.writeTo(gzipOut);
bytesOut = compressedBytesOut;
out = gzipOut;
compressed = true;
}

@VisibleForTesting
static class RegularResult implements GenQueryResult {
private final ByteString data;

RegularResult(ByteString data) {
this.data = data;
}

@Override
public ByteString getBytes() {
return data;
}

@Override
public int size() {
return data.size();
}

@Override
public void fingerprint(Fingerprint fingerprint) {
fingerprint.addBytes(data);
}

@Override
public void writeTo(OutputStream out) throws IOException {
data.writeTo(out);
}
}

@VisibleForTesting
static class CompressedResult implements GenQueryResult {
private final ByteString compressedData;
private final int size;

CompressedResult(ByteString compressedData, int size) {
this.compressedData = compressedData;
this.size = size;
}

@Override
public ByteString getBytes() throws IOException {
ByteString.Output out = ByteString.newOutput(size);
try (GZIPInputStream gzipIn = new GZIPInputStream(compressedData.newInput())) {
ByteStreams.copy(gzipIn, out);
}
return out.toByteString();
}

@Override
public int size() {
return size;
}

@Override
public void writeTo(OutputStream out) throws IOException {
try (GZIPInputStream gzipIn = new GZIPInputStream(compressedData.newInput())) {
ByteStreams.copy(gzipIn, out);
}
}

@Override
public void fingerprint(Fingerprint fingerprint) {
try (GZIPInputStream gzipIn = new GZIPInputStream(compressedData.newInput())) {
byte[] chunk = new byte[4092];
int bytesRead;
while ((bytesRead = gzipIn.read(chunk)) > 0) {
fingerprint.addBytes(chunk, 0, bytesRead);
}
} catch (IOException e) {
// Unexpected, everything should be in memory!
throw new IllegalStateException("Unexpected IOException", e);
}
}
}
}
Loading

0 comments on commit 6d63f8a

Please sign in to comment.