Skip to content

Commit

Permalink
Enable BQ streaming inserts for particular docTypes
Browse files Browse the repository at this point in the history
Addresses #588
  • Loading branch information
jklukas committed May 8, 2019
1 parent faa8e99 commit 5537eff
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 29 deletions.
95 changes: 70 additions & 25 deletions ingestion-beam/src/main/java/com/mozilla/telemetry/io/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.mozilla.telemetry.util.DynamicPathTemplate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -54,6 +56,7 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
Expand Down Expand Up @@ -313,52 +316,79 @@ public static class BigQueryOutput extends Write {
private final Duration triggeringFrequency;
private final InputType inputType;
private final int numShards;
private final ValueProvider<List<String>> streamingDocTypes;

/** Public constructor. */
public BigQueryOutput(ValueProvider<String> tableSpecTemplate, BigQueryWriteMethod writeMethod,
Duration triggeringFrequency, InputType inputType, int numShards) {
Duration triggeringFrequency, InputType inputType, int numShards,
ValueProvider<List<String>> streamingDocTypes) {
this.tableSpecTemplate = tableSpecTemplate;
this.writeMethod = writeMethod;
this.triggeringFrequency = triggeringFrequency;
this.inputType = inputType;
this.numShards = numShards;
this.streamingDocTypes = NestedValueProvider.of(streamingDocTypes,
value -> Optional.ofNullable(value).orElse(Collections.emptyList()));
}

@Override
public WithErrors.Result<PDone> expand(PCollection<PubsubMessage> input) {
BigQueryIO.Write<KV<TableDestination, TableRow>> writeTransform = BigQueryIO //
final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();

input = input //
.apply(LimitPayloadSize.toBytes(writeMethod.maxPayloadBytes)).errorsTo(errorCollections);

final BigQueryIO.Write<KV<TableDestination, TableRow>> baseWriteTransform = BigQueryIO //
.<KV<TableDestination, TableRow>>write() //
.withFormatFunction(KV::getValue) //
.to((ValueInSingleWindow<KV<TableDestination, TableRow>> vsw) -> vsw.getValue().getKey())
.withMethod(writeMethod.method)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) //
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) //
.ignoreUnknownValues();

final Optional<PCollection<PubsubMessage>> streamingInput;
final Optional<PCollection<PubsubMessage>> fileLoadsInput;
if (writeMethod == BigQueryWriteMethod.streaming) {
writeTransform = writeTransform
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) //
.skipInvalidRows() //
.withExtendedErrorInfo();
streamingInput = Optional.of(input);
fileLoadsInput = Optional.empty();
} else if (writeMethod == BigQueryWriteMethod.file_loads) {
streamingInput = Optional.empty();
fileLoadsInput = Optional.of(input);
} else {
if (inputType == InputType.pubsub) {
// When using the file_loads method of inserting to BigQuery, BigQueryIO requires
// triggering frequency if the input PCollection is unbounded (which is the case for
// pubsub), but forbids the option if the input PCollection is bounded.
writeTransform = writeTransform //
.withTriggeringFrequency(triggeringFrequency) //
.withNumFileShards(numShards);
}
// writeMethod is mixed.
final PCollectionList<PubsubMessage> partitioned = input //
.apply("PartitionStreamingVsFileLoads", Partition.of(2, //
(message, numPartitions) -> {
message = PubsubConstraints.ensureNonNull(message);
final String namespace = message.getAttribute("document_namespace");
final String docType = message.getAttribute("document_type");
final boolean shouldStream;
if (namespace == null || docType == null) {
shouldStream = false;
} else if ("telemetry".equals(namespace)) {
shouldStream = streamingDocTypes.get().contains(docType);
} else {
shouldStream = streamingDocTypes.get().contains(namespace + "/" + docType);
}
if (shouldStream && message
.getPayload().length < BigQueryWriteMethod.streaming.maxPayloadBytes) {
return 0;
} else {
return 1;
}
}));
streamingInput = Optional.of(partitioned.get(0));
fileLoadsInput = Optional.of(partitioned.get(1));
}

final List<PCollection<PubsubMessage>> errorCollections = new ArrayList<>();

WriteResult writeResult = input //
.apply(LimitPayloadSize.toMB(writeMethod.maxPayloadBytes)).errorsTo(errorCollections)
.apply(PubsubMessageToTableRow.of(tableSpecTemplate)).errorsTo(errorCollections)
.apply(writeTransform);

if (writeMethod == BigQueryWriteMethod.streaming) {
streamingInput.ifPresent(messages -> {
WriteResult writeResult = messages //
.apply(PubsubMessageToTableRow.of(tableSpecTemplate)).errorsTo(errorCollections)
.apply(baseWriteTransform //
.withMethod(BigQueryWriteMethod.streaming.method)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) //
.skipInvalidRows() //
.withExtendedErrorInfo());
errorCollections
.add(writeResult.getFailedInsertsWithErr().apply("Process failed inserts", MapElements
.into(TypeDescriptor.of(PubsubMessage.class)).via((BigQueryInsertError bqie) -> {
Expand All @@ -383,12 +413,27 @@ public WithErrors.Result<PDone> expand(PCollection<PubsubMessage> input) {
byte[] payload = row.toString().getBytes();
return new PubsubMessage(payload, attributes);
})));
}
});

fileLoadsInput.ifPresent(messages -> {
BigQueryIO.Write<KV<TableDestination, TableRow>> fileLoadsWrite = baseWriteTransform
.withMethod(BigQueryWriteMethod.file_loads.method);
if (inputType == InputType.pubsub) {
// When using the file_loads method of inserting to BigQuery, BigQueryIO requires
// triggering frequency if the input PCollection is unbounded (which is the case for
// pubsub), but forbids the option if the input PCollection is bounded.
fileLoadsWrite = fileLoadsWrite.withTriggeringFrequency(triggeringFrequency) //
.withNumFileShards(numShards);
}
messages //
.apply(PubsubMessageToTableRow.of(tableSpecTemplate)).errorsTo(errorCollections)
.apply(fileLoadsWrite);
});

PCollection<PubsubMessage> errorCollection = PCollectionList.of(errorCollections)
.apply("Flatten bigquery errors", Flatten.pCollections());

return WithErrors.Result.of(PDone.in(writeResult.getPipeline()), errorCollection);
return WithErrors.Result.of(PDone.in(input.getPipeline()), errorCollection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;

/**
* Enum for the two types of inserts we can make to BigQuery, along with options that we need
* Enum describing the types of inserts we can make to BigQuery, along with options that we need
* to set differently for the two types; see https://cloud.google.com/bigquery/quotas
*/
public enum BigQueryWriteMethod {
streaming(Method.STREAMING_INSERTS, 1000 * 1000), file_loads(Method.FILE_LOADS, 10 * 1000 * 1000);
streaming(Method.STREAMING_INSERTS, 1000 * 1000), //
file_loads(Method.FILE_LOADS, 10 * 1000 * 1000), //
mixed(Method.DEFAULT, 10 * 1000 * 1000);

public final BigQueryIO.Write.Method method;
public final int maxPayloadBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Write write(SinkOptions.Parsed options) {
public Write write(SinkOptions.Parsed options) {
return new BigQueryOutput(options.getOutput(), options.getBqWriteMethod(),
options.getParsedBqTriggeringFrequency(), options.getInputType(),
options.getBqNumFileShards());
options.getBqNumFileShards(), options.getBqStreamingDocTypes());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ public interface SinkOptions extends PipelineOptions {

void setBqNumFileShards(int value);

@Description("A comma-separated list of docTypes that should be published to BigQuery via the"
+ " streaming InsertAll endpoint rather than via file loads;"
+ " only relevant if --bqWriteMethod=mixed;"
+ " you may use a slash in each entry to separate namespace from type;"
+ " the telemetry namespace is assumed for entries that do not contain a slash")
ValueProvider<List<String>> getBqStreamingDocTypes();

void setBqStreamingDocTypes(ValueProvider<List<String>> value);

@Description("File format for --outputType=file|stdout; must be one of"
+ " json (each line contains payload[String] and attributeMap[String,String]) or"
+ " text (each line is payload)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,44 @@ public void canWriteViaFileLoads() throws Exception {
assertThat(errorOutputLines, Matchers.hasSize(2));
}

@Test
public void canWriteWithMixedMethod() throws Exception {
String table = "my_test_table";
TableId tableId = TableId.of(dataset, table);

bigquery.create(DatasetInfo.newBuilder(dataset).build());
bigquery
.create(
TableInfo
.newBuilder(tableId,
StandardTableDefinition
.of(Schema.of(Field.of("clientId", LegacySQLTypeName.STRING),
Field.of("type", LegacySQLTypeName.STRING),
Field.of("submission_timestamp", LegacySQLTypeName.TIMESTAMP)))
.toBuilder().setTimePartitioning(submissionTimestampPartitioning).build())
.build());

String input = Resources
.getResource("testdata/bigquery-integration/input-with-attributes.ndjson").getPath();
String output = String.format("%s:%s.%s", projectId, dataset, "${document_type}_table");
String errorOutput = outputPath + "/error/out";

PipelineResult result = Sink.run(new String[] { "--inputFileFormat=json", "--inputType=file",
"--input=" + input, "--outputType=bigquery", "--output=" + output, "--bqWriteMethod=mixed",
"--bqStreamingDocTypes=my-namespace/my-test", "--errorOutputType=file",
"--tempLocation=gs://gcp-ingestion-static-test-bucket/temp/bq-loads",
"--errorOutputFileCompression=UNCOMPRESSED", "--errorOutput=" + errorOutput });

result.waitUntilFinish();

String tableSpec = String.format("%s.%s", dataset, table);
assertThat(stringValuesQueryWithRetries("SELECT clientId FROM " + tableSpec),
matchesInAnyOrder(ImmutableList.of("abc123")));

List<String> errorOutputLines = Lines.files(outputPath + "/error/out*.ndjson");
assertThat(errorOutputLines, Matchers.hasSize(2));
}

@Test
public void canRecoverFailedInsertsInStreamingMode() throws Exception {
String table = "table_with_required_col";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"attributeMap":{"document_type":"nonexistent"},"payload":"eyJjbGllbnRJZCI6ImFiYzEyMyIsInR5cGUiOiJtYWluIiwibWV0YWRhdGEiOnsic29tZWludCI6Mywic3VibWlzc2lvbl90aW1lc3RhbXAiOiIyMDE5LTA3LTAxVDEyOjEzOjE0LjEyMzQ1NiJ9fQo="}
{"attributeMap":{"document_type":"my-test"},"payload":"eyJjbGllbnRJZCI6ImFiYzEyMyIsInR5cGUiOiJldmVudCJ9Cg=="}
{"attributeMap":{"document_namespace":"my-namespace", "document_type":"my-test"},"payload":"eyJjbGllbnRJZCI6ImFiYzEyMyIsInR5cGUiOiJldmVudCJ9Cg=="}
{"attributeMap":null,"payload":"eyJjbGllbnRJZCI6ImRlZjQ1NiIsInR5cGUiOiJtYWluIn0K"}

0 comments on commit 5537eff

Please sign in to comment.