diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ace18eff5e..f844f49aed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { public BoundedReader createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql)); + bqOptions, executingProject.get(), createBasicQueryConfig())); } @Override @@ -1152,6 +1152,8 @@ private void executeQuery( .setProjectId(executingProject) .setJobId(jobId); + // When changing options here, consider whether to change the defaults from + // #createBasicQueryConfig instead. JobConfigurationQuery queryConfig = createBasicQueryConfig() .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") @@ -1167,9 +1169,11 @@ private void executeQuery( } private JobConfigurationQuery createBasicQueryConfig() { + // Due to deprecated functionality, if this function is updated + // then the similar code in BigQueryTableRowIterator#fromQuery should be updated. return new JobConfigurationQuery() - .setQuery(query.get()) .setFlattenResults(flattenResults) + .setQuery(query.get()) .setUseLegacySql(useLegacySql); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index ec96009494..43232f699a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig); /** * An interface for the Cloud BigQuery load service. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index 1a37e01375..84e718addd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -39,14 +39,11 @@ import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.NoSuchElementException; - import javax.annotation.Nullable; /** @@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig); } @VisibleForTesting @@ -520,15 +516,10 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { } private static BigQueryJsonReader fromQuery( - BigQueryOptions bqOptions, - String query, - String projectId, - @Nullable Boolean flattenResults, - @Nullable Boolean useLegacySql) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( - query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults, - useLegacySql)); + queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build())); } private static BigQueryJsonReader fromTable( diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 8f4ff793dc..5ab5c897ed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -46,11 +46,9 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -61,7 +59,6 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; - import javax.annotation.Nullable; /** @@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable { @Nullable private TableReference ref; @Nullable private final String projectId; @Nullable private TableSchema schema; + @Nullable private final JobConfigurationQuery queryConfig; private final Bigquery client; private String pageToken; private Iterator iteratorOverCurrentBatch; @@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable { // following interval to check the status of query execution job private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1); - private final String query; - // Whether to flatten query results. - private final boolean flattenResults; - // Whether to use the BigQuery legacy SQL dialect.. - private final boolean useLegacySql; // Temporary dataset used to store query results. private String temporaryDatasetId = null; // Temporary table used to store query results. private String temporaryTableId = null; private BigQueryTableRowIterator( - @Nullable TableReference ref, @Nullable String query, @Nullable String projectId, - Bigquery client, boolean flattenResults, boolean useLegacySql) { + @Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig, + @Nullable String projectId, Bigquery client) { this.ref = ref; - this.query = query; + this.queryConfig = queryConfig; this.projectId = projectId; this.client = checkNotNull(client, "client"); - this.flattenResults = flattenResults; - this.useLegacySql = useLegacySql; } /** @@ -116,7 +107,7 @@ private BigQueryTableRowIterator( public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { checkNotNull(ref, "ref"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); + return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client); } /** @@ -135,15 +126,31 @@ public static BigQueryTableRowIterator fromQuery( * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the * specified query in the specified project. */ + @Deprecated public static BigQueryTableRowIterator fromQuery( String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql) { checkNotNull(query, "query"); checkNotNull(projectId, "projectId"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(null, query, projectId, client, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), - MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + JobConfigurationQuery queryConfig = new JobConfigurationQuery() + .setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)) + .setPriority("BATCH") + .setQuery(query) + .setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + return new BigQueryTableRowIterator(null, queryConfig, projectId, client); + } + + /** + * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the + * specified query in the specified project. + */ + public static BigQueryTableRowIterator fromQuery( + JobConfigurationQuery queryConfig, String projectId, Bigquery client) { + checkNotNull(queryConfig, "queryConfig"); + checkNotNull(projectId, "projectId"); + checkNotNull(client, "client"); + return new BigQueryTableRowIterator(null, queryConfig, projectId, client); } /** @@ -151,7 +158,7 @@ public static BigQueryTableRowIterator fromQuery( * @throws IOException on failure */ public void open() throws IOException, InterruptedException { - if (query != null) { + if (queryConfig != null) { ref = executeQueryAndWaitForCompletion(); } // Get table schema. @@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { + checkState(projectId != null, "Unable to execute a query without a configured project id"); + checkState(queryConfig != null, "Unable to execute a query without a configured query"); // Dry run query to get source table location Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() - .setQuery(new JobConfigurationQuery() - .setQuery(query)) + .setQuery(queryConfig) .setDryRun(true)); JobStatistics jobStats = executeWithBackOff( client.jobs().insert(projectId, dryRunJob), - String.format("Error when trying to dry run query %s.", query)).getStatistics(); + String.format("Error when trying to dry run query %s.", + queryConfig.toPrettyString())).getStatistics(); // Let BigQuery to pick default location if the query does not read any tables. String location = null; @@ -428,30 +437,27 @@ private TableReference executeQueryAndWaitForCompletion() createDataset(temporaryDatasetId, location); Job job = new Job(); JobConfiguration config = new JobConfiguration(); - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); config.setQuery(queryConfig); job.setConfiguration(config); - queryConfig.setQuery(query); - queryConfig.setAllowLargeResults(true); - queryConfig.setFlattenResults(flattenResults); - queryConfig.setUseLegacySql(useLegacySql); - TableReference destinationTable = new TableReference(); destinationTable.setProjectId(projectId); destinationTable.setDatasetId(temporaryDatasetId); destinationTable.setTableId(temporaryTableId); queryConfig.setDestinationTable(destinationTable); + queryConfig.setAllowLargeResults(Boolean.TRUE); Job queryJob = executeWithBackOff( client.jobs().insert(projectId, job), - String.format("Error when trying to execute the job for query %s.", query)); + String.format("Error when trying to execute the job for query %s.", + queryConfig.toPrettyString())); JobReference jobId = queryJob.getJobReference(); while (true) { Job pollJob = executeWithBackOff( client.jobs().get(projectId, jobId.getJobId()), - String.format("Error when trying to get status of the job for query %s.", query)); + String.format("Error when trying to get status of the job for query %s.", + queryConfig.toPrettyString())); JobStatus status = pollJob.getStatus(); if (status.getState().equals("DONE")) { // Job is DONE, but did not necessarily succeed. @@ -461,7 +467,9 @@ private TableReference executeQueryAndWaitForCompletion() } else { // There will be no temporary table to delete, so null out the reference. temporaryTableId = null; - throw new IOException("Executing query " + query + " failed: " + error.getMessage()); + throw new IOException( + String.format("Executing query %s failed: %s", + queryConfig.toPrettyString(), error.getMessage())); } } Uninterruptibles.sleepUninterruptibly( diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index a5bddec315..7f9d2e95ce 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -106,7 +106,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; - import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; @@ -122,7 +121,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; - import java.io.File; import java.io.FileFilter; import java.io.IOException; @@ -135,8 +133,6 @@ import java.util.NoSuchElementException; import java.util.Set; -import javax.annotation.Nullable; - /** * Tests for BigQueryIO. */ @@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable( @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new FakeBigQueryReader(jsonTableRowReturns); } @@ -1749,3 +1744,4 @@ public boolean accept(File pathname) { }}).length); } } + diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java index d6ac5b36ba..94d858aebb 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java @@ -258,14 +258,18 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); - TableReference tableRef = - new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef); + JobConfigurationQuery resultQueryConfig = + new JobConfigurationQuery().setDestinationTable( + new TableReference() + .setProjectId("project") + .setDatasetId("tempdataset") + .setTableId("temptable") + ); Job getJob = new Job() .setJobReference(new JobReference()) .setStatus(status) - .setConfiguration(new JobConfiguration().setQuery(queryConfig)); + .setConfiguration(new JobConfiguration().setQuery(resultQueryConfig)); when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. @@ -281,8 +285,9 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException String query = String.format( "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo", photoBytesEncoded); + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -317,7 +322,7 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException verify(mockTablesDelete).execute(); // Table data read. verify(mockClient).tabledata(); - verify(mockTabledata).list("project", "dataset", "table"); + verify(mockTabledata).list("project", "tempdataset", "temptable"); verify(mockTabledataList).execute(); } @@ -334,18 +339,16 @@ public void testQueryFailed() throws IOException { when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception); String query = "NOT A QUERY"; + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { - - try { - iterator.open(); - fail(); - } catch (Exception expected) { - // Verify message explains cause and reports the query. - assertThat(expected.getMessage(), containsString("Error")); - assertThat(expected.getMessage(), containsString(query)); - assertThat(expected.getCause().getMessage(), containsString(errorReason)); - } + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { + iterator.open(); + fail(); + } catch (Exception expected) { + // Verify message explains cause and reports the query. + assertThat(expected.getMessage(), containsString("Error")); + assertThat(expected.getMessage(), containsString(query)); + assertThat(expected.getCause().getMessage(), containsString(errorReason)); } // Job inserted to run the query, then polled once.