Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

BigQuery: fix an issue with option propagation and refactor to future-proof #540

Merged
merged 4 commits into from
Feb 16, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
return new BigQueryReader(this, bqServices.getReaderFromQuery(
bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql));
bqOptions, createBasicQueryConfig(), executingProject.get()));
}

@Override
Expand Down Expand Up @@ -1152,11 +1152,12 @@ private void executeQuery(
.setProjectId(executingProject)
.setJobId(jobId);

// When changing options here, consider whether to change the defaults from
// #createBasicQueryConfig instead.
JobConfigurationQuery queryConfig = createBasicQueryConfig()
.setAllowLargeResults(true)
.setCreateDisposition("CREATE_IF_NEEDED")
.setDestinationTable(destinationTable)
.setPriority("BATCH")
.setWriteDisposition("WRITE_EMPTY");

jobService.startQueryJob(jobRef, queryConfig);
Expand All @@ -1167,9 +1168,12 @@ private void executeQuery(
}

private JobConfigurationQuery createBasicQueryConfig() {
// Due to deprecated functionality, if this function is updated
// then the similar code in BigQueryTableRowIterator#fromQuery should be updated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that better to have a
public static createBasicQueryConfig(String query, boolean flattenResults, boolean useLegacySql)
and, use it in two places?

return new JobConfigurationQuery()
.setQuery(query.get())
.setFlattenResults(flattenResults)
.setPriority("BATCH")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert BATCH

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.setQuery(query.get())
.setUseLegacySql(useLegacySql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable {
* Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
*/
BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
@Nullable Boolean useLegacySql);
BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId);

/**
* An interface for the Cloud BigQuery load service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.NoSuchElementException;

import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef

@Override
public BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
@Nullable Boolean useLegacySql) {
return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql);
BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) {
return BigQueryJsonReaderImpl.fromQuery(bqOptions, queryConfig, projectId);
}

@VisibleForTesting
Expand Down Expand Up @@ -521,14 +517,11 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {

private static BigQueryJsonReader fromQuery(
BigQueryOptions bqOptions,
String query,
String projectId,
@Nullable Boolean flattenResults,
@Nullable Boolean useLegacySql) {
JobConfigurationQuery queryConfig,
String projectId) {
return new BigQueryJsonReaderImpl(
BigQueryTableRowIterator.fromQuery(
query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults,
useLegacySql));
queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build()));
}

private static BigQueryJsonReader fromTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -61,7 +59,6 @@
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

/**
Expand All @@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
@Nullable private TableReference ref;
@Nullable private final String projectId;
@Nullable private TableSchema schema;
@Nullable private final JobConfigurationQuery queryConfig;
private final Bigquery client;
private String pageToken;
private Iterator<TableRow> iteratorOverCurrentBatch;
Expand All @@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable {
// following interval to check the status of query execution job
private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);

private final String query;
// Whether to flatten query results.
private final boolean flattenResults;
// Whether to use the BigQuery legacy SQL dialect..
private final boolean useLegacySql;
// Temporary dataset used to store query results.
private String temporaryDatasetId = null;
// Temporary table used to store query results.
private String temporaryTableId = null;

private BigQueryTableRowIterator(
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
Bigquery client, boolean flattenResults, boolean useLegacySql) {
@Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig,
@Nullable String projectId, Bigquery client) {
this.ref = ref;
this.query = query;
this.queryConfig = queryConfig;
this.projectId = projectId;
this.client = checkNotNull(client, "client");
this.flattenResults = flattenResults;
this.useLegacySql = useLegacySql;
}

/**
Expand All @@ -116,7 +107,7 @@ private BigQueryTableRowIterator(
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
checkNotNull(ref, "ref");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true);
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client);
}

/**
Expand All @@ -135,23 +126,39 @@ public static BigQueryTableRowIterator fromQuery(
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
* specified query in the specified project.
*/
@Deprecated
public static BigQueryTableRowIterator fromQuery(
String query, String projectId, Bigquery client, @Nullable Boolean flattenResults,
@Nullable Boolean useLegacySql) {
checkNotNull(query, "query");
checkNotNull(projectId, "projectId");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(null, query, projectId, client,
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
JobConfigurationQuery queryConfig = new JobConfigurationQuery()
.setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE))
.setPriority("BATCH")
.setQuery(query)
.setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
null /* ref */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not refactoring in dataflow 1.x, this is a minimal fix only.

}

/**
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
* specified query in the specified project.
*/
public static BigQueryTableRowIterator fromQuery(
JobConfigurationQuery queryConfig, String projectId, Bigquery client) {
checkNotNull(queryConfig, "queryConfig");
checkNotNull(projectId, "projectId");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
null /* ref */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not refactoring in dataflow 1.x, this is a minimal fix only.

}

/**
* Opens the table for read.
* @throws IOException on failure
*/
public void open() throws IOException, InterruptedException {
if (query != null) {
if (queryConfig != null) {
ref = executeQueryAndWaitForCompletion();
}
// Get table schema.
Expand Down Expand Up @@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce
*/
private TableReference executeQueryAndWaitForCompletion()
throws IOException, InterruptedException {
checkState(projectId != null, "Cannot dryRun a query in unknown (null) project");
checkState(queryConfig != null, "Cannot dryRun a null query");
// Dry run query to get source table location
Job dryRunJob = new Job()
.setConfiguration(new JobConfiguration()
.setQuery(new JobConfigurationQuery()
.setQuery(query))
.setQuery(queryConfig)
.setDryRun(true));
JobStatistics jobStats = executeWithBackOff(
client.jobs().insert(projectId, dryRunJob),
String.format("Error when trying to dry run query %s.", query)).getStatistics();
String.format("Error when trying to dry run query %s.",
queryConfig.toPrettyString())).getStatistics();

// Let BigQuery to pick default location if the query does not read any tables.
String location = null;
Expand All @@ -428,30 +437,27 @@ private TableReference executeQueryAndWaitForCompletion()
createDataset(temporaryDatasetId, location);
Job job = new Job();
JobConfiguration config = new JobConfiguration();
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
config.setQuery(queryConfig);
job.setConfiguration(config);
queryConfig.setQuery(query);
queryConfig.setAllowLargeResults(true);
queryConfig.setFlattenResults(flattenResults);
queryConfig.setUseLegacySql(useLegacySql);


TableReference destinationTable = new TableReference();
destinationTable.setProjectId(projectId);
destinationTable.setDatasetId(temporaryDatasetId);
destinationTable.setTableId(temporaryTableId);
queryConfig.setDestinationTable(destinationTable);
queryConfig.setAllowLargeResults(Boolean.TRUE);

Job queryJob = executeWithBackOff(
client.jobs().insert(projectId, job),
String.format("Error when trying to execute the job for query %s.", query));
String.format("Error when trying to execute the job for query %s.",
queryConfig.toPrettyString()));
JobReference jobId = queryJob.getJobReference();

while (true) {
Job pollJob = executeWithBackOff(
client.jobs().get(projectId, jobId.getJobId()),
String.format("Error when trying to get status of the job for query %s.", query));
String.format("Error when trying to get status of the job for query %s.",
queryConfig.toPrettyString()));
JobStatus status = pollJob.getStatus();
if (status.getState().equals("DONE")) {
// Job is DONE, but did not necessarily succeed.
Expand All @@ -461,7 +467,9 @@ private TableReference executeQueryAndWaitForCompletion()
} else {
// There will be no temporary table to delete, so null out the reference.
temporaryTableId = null;
throw new IOException("Executing query " + query + " failed: " + error.getMessage());
throw new IOException(
String.format("Executing query %s failed: %s",
queryConfig.toPrettyString(), error.getMessage()));
}
}
Uninterruptibles.sleepUninterruptibly(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
Expand All @@ -122,7 +121,6 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
Expand All @@ -135,8 +133,6 @@
import java.util.NoSuchElementException;
import java.util.Set;

import javax.annotation.Nullable;

/**
* Tests for BigQueryIO.
*/
Expand Down Expand Up @@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable(

@Override
public BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
@Nullable Boolean useLegacySql) {
BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) {
return new FakeBigQueryReader(jsonTableRowReturns);
}

Expand Down Expand Up @@ -1749,3 +1744,4 @@ public boolean accept(File pathname) {
}}).length);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,18 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException

// Mock job polling.
JobStatus status = new JobStatus().setState("DONE");
TableReference tableRef =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef);
JobConfigurationQuery resultQueryConfig =
new JobConfigurationQuery().setDestinationTable(
new TableReference()
.setProjectId("project")
.setDatasetId("tempdataset")
.setTableId("temptable")
);
Job getJob =
new Job()
.setJobReference(new JobReference())
.setStatus(status)
.setConfiguration(new JobConfiguration().setQuery(queryConfig));
.setConfiguration(new JobConfiguration().setQuery(resultQueryConfig));
when(mockJobsGet.execute()).thenReturn(getJob);

// Mock table schema fetch.
Expand All @@ -281,8 +285,9 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
String query = String.format(
"SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo",
photoBytesEncoded);
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) {
BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) {
iterator.open();
assertTrue(iterator.advance());
TableRow row = iterator.getCurrent();
Expand Down Expand Up @@ -317,7 +322,7 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
verify(mockTablesDelete).execute();
// Table data read.
verify(mockClient).tabledata();
verify(mockTabledata).list("project", "dataset", "table");
verify(mockTabledata).list("project", "tempdataset", "temptable");
verify(mockTabledataList).execute();
}

Expand All @@ -334,18 +339,16 @@ public void testQueryFailed() throws IOException {
when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception);

String query = "NOT A QUERY";
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) {

try {
iterator.open();
fail();
} catch (Exception expected) {
// Verify message explains cause and reports the query.
assertThat(expected.getMessage(), containsString("Error"));
assertThat(expected.getMessage(), containsString(query));
assertThat(expected.getCause().getMessage(), containsString(errorReason));
}
BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) {
iterator.open();
fail();
} catch (Exception expected) {
// Verify message explains cause and reports the query.
assertThat(expected.getMessage(), containsString("Error"));
assertThat(expected.getMessage(), containsString(query));
assertThat(expected.getCause().getMessage(), containsString(errorReason));
}

// Job inserted to run the query, then polled once.
Expand Down