Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #540 from dhalperi/bigquery-direct-standard-sql
Browse files Browse the repository at this point in the history
BigQuery: fix an issue with option propagation and refactor to future-proof
  • Loading branch information
dhalperi authored Feb 16, 2017
2 parents 9b9ee0b + 9c59d78 commit fc5fee2
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
return new BigQueryReader(this, bqServices.getReaderFromQuery(
bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql));
bqOptions, executingProject.get(), createBasicQueryConfig()));
}

@Override
Expand Down Expand Up @@ -1152,6 +1152,8 @@ private void executeQuery(
.setProjectId(executingProject)
.setJobId(jobId);

// When changing options here, consider whether to change the defaults from
// #createBasicQueryConfig instead.
JobConfigurationQuery queryConfig = createBasicQueryConfig()
.setAllowLargeResults(true)
.setCreateDisposition("CREATE_IF_NEEDED")
Expand All @@ -1167,9 +1169,11 @@ private void executeQuery(
}

private JobConfigurationQuery createBasicQueryConfig() {
// Due to deprecated functionality, if this function is updated
// then the similar code in BigQueryTableRowIterator#fromQuery should be updated.
return new JobConfigurationQuery()
.setQuery(query.get())
.setFlattenResults(flattenResults)
.setQuery(query.get())
.setUseLegacySql(useLegacySql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable {
* Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
*/
BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
@Nullable Boolean useLegacySql);
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig);

/**
* An interface for the Cloud BigQuery load service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.NoSuchElementException;

import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef

@Override
public BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
@Nullable Boolean useLegacySql) {
return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql);
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig);
}

@VisibleForTesting
Expand Down Expand Up @@ -520,15 +516,10 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
}

private static BigQueryJsonReader fromQuery(
BigQueryOptions bqOptions,
String query,
String projectId,
@Nullable Boolean flattenResults,
@Nullable Boolean useLegacySql) {
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
return new BigQueryJsonReaderImpl(
BigQueryTableRowIterator.fromQuery(
query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults,
useLegacySql));
queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build()));
}

private static BigQueryJsonReader fromTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -61,7 +59,6 @@
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

/**
Expand All @@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
@Nullable private TableReference ref;
@Nullable private final String projectId;
@Nullable private TableSchema schema;
@Nullable private final JobConfigurationQuery queryConfig;
private final Bigquery client;
private String pageToken;
private Iterator<TableRow> iteratorOverCurrentBatch;
Expand All @@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable {
// following interval to check the status of query execution job
private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);

private final String query;
// Whether to flatten query results.
private final boolean flattenResults;
// Whether to use the BigQuery legacy SQL dialect..
private final boolean useLegacySql;
// Temporary dataset used to store query results.
private String temporaryDatasetId = null;
// Temporary table used to store query results.
private String temporaryTableId = null;

private BigQueryTableRowIterator(
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
Bigquery client, boolean flattenResults, boolean useLegacySql) {
@Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig,
@Nullable String projectId, Bigquery client) {
this.ref = ref;
this.query = query;
this.queryConfig = queryConfig;
this.projectId = projectId;
this.client = checkNotNull(client, "client");
this.flattenResults = flattenResults;
this.useLegacySql = useLegacySql;
}

/**
Expand All @@ -116,7 +107,7 @@ private BigQueryTableRowIterator(
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
checkNotNull(ref, "ref");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true);
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client);
}

/**
Expand All @@ -135,23 +126,39 @@ public static BigQueryTableRowIterator fromQuery(
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
* specified query in the specified project.
*/
@Deprecated
public static BigQueryTableRowIterator fromQuery(
String query, String projectId, Bigquery client, @Nullable Boolean flattenResults,
@Nullable Boolean useLegacySql) {
checkNotNull(query, "query");
checkNotNull(projectId, "projectId");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(null, query, projectId, client,
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
JobConfigurationQuery queryConfig = new JobConfigurationQuery()
.setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE))
.setPriority("BATCH")
.setQuery(query)
.setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE));
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
}

/**
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
* specified query in the specified project.
*/
public static BigQueryTableRowIterator fromQuery(
JobConfigurationQuery queryConfig, String projectId, Bigquery client) {
checkNotNull(queryConfig, "queryConfig");
checkNotNull(projectId, "projectId");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(null, queryConfig, projectId, client);
}

/**
* Opens the table for read.
* @throws IOException on failure
*/
public void open() throws IOException, InterruptedException {
if (query != null) {
if (queryConfig != null) {
ref = executeQueryAndWaitForCompletion();
}
// Get table schema.
Expand Down Expand Up @@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce
*/
private TableReference executeQueryAndWaitForCompletion()
throws IOException, InterruptedException {
checkState(projectId != null, "Unable to execute a query without a configured project id");
checkState(queryConfig != null, "Unable to execute a query without a configured query");
// Dry run query to get source table location
Job dryRunJob = new Job()
.setConfiguration(new JobConfiguration()
.setQuery(new JobConfigurationQuery()
.setQuery(query))
.setQuery(queryConfig)
.setDryRun(true));
JobStatistics jobStats = executeWithBackOff(
client.jobs().insert(projectId, dryRunJob),
String.format("Error when trying to dry run query %s.", query)).getStatistics();
String.format("Error when trying to dry run query %s.",
queryConfig.toPrettyString())).getStatistics();

// Let BigQuery to pick default location if the query does not read any tables.
String location = null;
Expand All @@ -428,30 +437,27 @@ private TableReference executeQueryAndWaitForCompletion()
createDataset(temporaryDatasetId, location);
Job job = new Job();
JobConfiguration config = new JobConfiguration();
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
config.setQuery(queryConfig);
job.setConfiguration(config);
queryConfig.setQuery(query);
queryConfig.setAllowLargeResults(true);
queryConfig.setFlattenResults(flattenResults);
queryConfig.setUseLegacySql(useLegacySql);


TableReference destinationTable = new TableReference();
destinationTable.setProjectId(projectId);
destinationTable.setDatasetId(temporaryDatasetId);
destinationTable.setTableId(temporaryTableId);
queryConfig.setDestinationTable(destinationTable);
queryConfig.setAllowLargeResults(Boolean.TRUE);

Job queryJob = executeWithBackOff(
client.jobs().insert(projectId, job),
String.format("Error when trying to execute the job for query %s.", query));
String.format("Error when trying to execute the job for query %s.",
queryConfig.toPrettyString()));
JobReference jobId = queryJob.getJobReference();

while (true) {
Job pollJob = executeWithBackOff(
client.jobs().get(projectId, jobId.getJobId()),
String.format("Error when trying to get status of the job for query %s.", query));
String.format("Error when trying to get status of the job for query %s.",
queryConfig.toPrettyString()));
JobStatus status = pollJob.getStatus();
if (status.getState().equals("DONE")) {
// Job is DONE, but did not necessarily succeed.
Expand All @@ -461,7 +467,9 @@ private TableReference executeQueryAndWaitForCompletion()
} else {
// There will be no temporary table to delete, so null out the reference.
temporaryTableId = null;
throw new IOException("Executing query " + query + " failed: " + error.getMessage());
throw new IOException(
String.format("Executing query %s failed: %s",
queryConfig.toPrettyString(), error.getMessage()));
}
}
Uninterruptibles.sleepUninterruptibly(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
Expand All @@ -122,7 +121,6 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
Expand All @@ -135,8 +133,6 @@
import java.util.NoSuchElementException;
import java.util.Set;

import javax.annotation.Nullable;

/**
* Tests for BigQueryIO.
*/
Expand Down Expand Up @@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable(

@Override
public BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten,
@Nullable Boolean useLegacySql) {
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
return new FakeBigQueryReader(jsonTableRowReturns);
}

Expand Down Expand Up @@ -1749,3 +1744,4 @@ public boolean accept(File pathname) {
}}).length);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,18 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException

// Mock job polling.
JobStatus status = new JobStatus().setState("DONE");
TableReference tableRef =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef);
JobConfigurationQuery resultQueryConfig =
new JobConfigurationQuery().setDestinationTable(
new TableReference()
.setProjectId("project")
.setDatasetId("tempdataset")
.setTableId("temptable")
);
Job getJob =
new Job()
.setJobReference(new JobReference())
.setStatus(status)
.setConfiguration(new JobConfiguration().setQuery(queryConfig));
.setConfiguration(new JobConfiguration().setQuery(resultQueryConfig));
when(mockJobsGet.execute()).thenReturn(getJob);

// Mock table schema fetch.
Expand All @@ -281,8 +285,9 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
String query = String.format(
"SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo",
photoBytesEncoded);
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) {
BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) {
iterator.open();
assertTrue(iterator.advance());
TableRow row = iterator.getCurrent();
Expand Down Expand Up @@ -317,7 +322,7 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException
verify(mockTablesDelete).execute();
// Table data read.
verify(mockClient).tabledata();
verify(mockTabledata).list("project", "dataset", "table");
verify(mockTabledata).list("project", "tempdataset", "temptable");
verify(mockTabledataList).execute();
}

Expand All @@ -334,18 +339,16 @@ public void testQueryFailed() throws IOException {
when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception);

String query = "NOT A QUERY";
JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) {

try {
iterator.open();
fail();
} catch (Exception expected) {
// Verify message explains cause and reports the query.
assertThat(expected.getMessage(), containsString("Error"));
assertThat(expected.getMessage(), containsString(query));
assertThat(expected.getCause().getMessage(), containsString(errorReason));
}
BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) {
iterator.open();
fail();
} catch (Exception expected) {
// Verify message explains cause and reports the query.
assertThat(expected.getMessage(), containsString("Error"));
assertThat(expected.getMessage(), containsString(query));
assertThat(expected.getCause().getMessage(), containsString(errorReason));
}

// Job inserted to run the query, then polled once.
Expand Down

0 comments on commit fc5fee2

Please sign in to comment.