From dbe464440011decfc6d1a11ff6af9cc7b5ee3b68 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 30 Jan 2017 12:25:27 -0800 Subject: [PATCH 1/4] BigQuery: fix an issue with option propagation and refactor to future-proof * We created a helper in BigQueryIO to create a JobConfigurationQuery capturing all options, but we had not yet propagated this cleanup into the Services abstraction or helper classes. Refactor BigQueryServices and BigQueryTableRowIterator to propagate the same configuration. Adds a new deprecated constructor to BigQueryTableRowIterator for backwards-compatibility. This fixes GoogleCloudPlatform/DataflowJavaSDK#539. --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 10 ++- .../dataflow/sdk/util/BigQueryServices.java | 3 +- .../sdk/util/BigQueryServicesImpl.java | 17 ++--- .../sdk/util/BigQueryTableRowIterator.java | 67 ++++++++++--------- .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 8 +-- 5 files changed, 52 insertions(+), 53 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ace18eff5e..3472a8afed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { public BoundedReader createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, query.get(), executingProject.get(), flattenResults, useLegacySql)); + bqOptions, createBasicQueryConfig(), executingProject.get())); } @Override @@ -1152,11 +1152,12 @@ private void executeQuery( .setProjectId(executingProject) .setJobId(jobId); + // When changing options here, consider whether to change the defaults from + // #createBasicQueryConfig instead. JobConfigurationQuery queryConfig = createBasicQueryConfig() .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) - .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); jobService.startQueryJob(jobRef, queryConfig); @@ -1167,9 +1168,12 @@ private void executeQuery( } private JobConfigurationQuery createBasicQueryConfig() { + // Due to deprecated functionality, if this function is updated + // then the similar code in BigQueryTableRowIterator#fromQuery should be updated. return new JobConfigurationQuery() - .setQuery(query.get()) .setFlattenResults(flattenResults) + .setPriority("BATCH") + .setQuery(query.get()) .setUseLegacySql(useLegacySql); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index ec96009494..df247629f5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -58,8 +58,7 @@ public interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql); + BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId); /** * An interface for the Cloud BigQuery load service. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index 1a37e01375..af2ed9e73b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -39,14 +39,11 @@ import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.NoSuchElementException; - import javax.annotation.Nullable; /** @@ -83,9 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten, useLegacySql); + BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, queryConfig, projectId); } @VisibleForTesting @@ -521,14 +517,11 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { private static BigQueryJsonReader fromQuery( BigQueryOptions bqOptions, - String query, - String projectId, - @Nullable Boolean flattenResults, - @Nullable Boolean useLegacySql) { + JobConfigurationQuery queryConfig, + String projectId) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( - query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults, - useLegacySql)); + queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build())); } private static BigQueryJsonReader fromTable( diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 8f4ff793dc..63bd025099 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -46,11 +46,9 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -61,7 +59,6 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; - import javax.annotation.Nullable; /** @@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable { @Nullable private TableReference ref; @Nullable private final String projectId; @Nullable private TableSchema schema; + @Nullable private final JobConfigurationQuery queryConfig; private final Bigquery client; private String pageToken; private Iterator iteratorOverCurrentBatch; @@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable { // following interval to check the status of query execution job private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1); - private final String query; - // Whether to flatten query results. - private final boolean flattenResults; - // Whether to use the BigQuery legacy SQL dialect.. - private final boolean useLegacySql; // Temporary dataset used to store query results. private String temporaryDatasetId = null; // Temporary table used to store query results. private String temporaryTableId = null; private BigQueryTableRowIterator( - @Nullable TableReference ref, @Nullable String query, @Nullable String projectId, - Bigquery client, boolean flattenResults, boolean useLegacySql) { + @Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig, + @Nullable String projectId, Bigquery client) { this.ref = ref; - this.query = query; + this.queryConfig = queryConfig; this.projectId = projectId; this.client = checkNotNull(client, "client"); - this.flattenResults = flattenResults; - this.useLegacySql = useLegacySql; } /** @@ -116,7 +107,7 @@ private BigQueryTableRowIterator( public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { checkNotNull(ref, "ref"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); + return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client); } /** @@ -135,15 +126,31 @@ public static BigQueryTableRowIterator fromQuery( * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the * specified query in the specified project. */ + @Deprecated public static BigQueryTableRowIterator fromQuery( String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql) { checkNotNull(query, "query"); checkNotNull(projectId, "projectId"); checkNotNull(client, "client"); - return new BigQueryTableRowIterator(null, query, projectId, client, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), - MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + JobConfigurationQuery queryConfig = new JobConfigurationQuery() + .setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)) + .setPriority("BATCH") + .setQuery(query) + .setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); + return new BigQueryTableRowIterator(null, queryConfig, projectId, client); + } + + /** + * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the + * specified query in the specified project. + */ + public static BigQueryTableRowIterator fromQuery( + JobConfigurationQuery queryConfig, String projectId, Bigquery client) { + checkNotNull(queryConfig, "queryConfig"); + checkNotNull(projectId, "projectId"); + checkNotNull(client, "client"); + return new BigQueryTableRowIterator(null, queryConfig, projectId, client); } /** @@ -151,7 +158,7 @@ public static BigQueryTableRowIterator fromQuery( * @throws IOException on failure */ public void open() throws IOException, InterruptedException { - if (query != null) { + if (queryConfig != null) { ref = executeQueryAndWaitForCompletion(); } // Get table schema. @@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { + checkState(projectId != null, "Cannot dryRun a query in unknown (null) project"); + checkState(queryConfig != null, "Cannot dryRun a null query"); // Dry run query to get source table location Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() - .setQuery(new JobConfigurationQuery() - .setQuery(query)) + .setQuery(queryConfig) .setDryRun(true)); JobStatistics jobStats = executeWithBackOff( client.jobs().insert(projectId, dryRunJob), - String.format("Error when trying to dry run query %s.", query)).getStatistics(); + String.format("Error when trying to dry run query %s.", + queryConfig.toPrettyString())).getStatistics(); // Let BigQuery to pick default location if the query does not read any tables. String location = null; @@ -428,14 +437,8 @@ private TableReference executeQueryAndWaitForCompletion() createDataset(temporaryDatasetId, location); Job job = new Job(); JobConfiguration config = new JobConfiguration(); - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); config.setQuery(queryConfig); job.setConfiguration(config); - queryConfig.setQuery(query); - queryConfig.setAllowLargeResults(true); - queryConfig.setFlattenResults(flattenResults); - queryConfig.setUseLegacySql(useLegacySql); - TableReference destinationTable = new TableReference(); destinationTable.setProjectId(projectId); @@ -445,13 +448,15 @@ private TableReference executeQueryAndWaitForCompletion() Job queryJob = executeWithBackOff( client.jobs().insert(projectId, job), - String.format("Error when trying to execute the job for query %s.", query)); + String.format("Error when trying to execute the job for query %s.", + queryConfig.toPrettyString())); JobReference jobId = queryJob.getJobReference(); while (true) { Job pollJob = executeWithBackOff( client.jobs().get(projectId, jobId.getJobId()), - String.format("Error when trying to get status of the job for query %s.", query)); + String.format("Error when trying to get status of the job for query %s.", + queryConfig.toPrettyString())); JobStatus status = pollJob.getStatus(); if (status.getState().equals("DONE")) { // Job is DONE, but did not necessarily succeed. @@ -461,7 +466,9 @@ private TableReference executeQueryAndWaitForCompletion() } else { // There will be no temporary table to delete, so null out the reference. temporaryTableId = null; - throw new IOException("Executing query " + query + " failed: " + error.getMessage()); + throw new IOException( + String.format("Executing query %s failed: %s", + queryConfig.toPrettyString(), error.getMessage())); } } Uninterruptibles.sleepUninterruptibly( diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index a5bddec315..61356e18d0 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -106,7 +106,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; - import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; @@ -122,7 +121,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; - import java.io.File; import java.io.FileFilter; import java.io.IOException; @@ -135,8 +133,6 @@ import java.util.NoSuchElementException; import java.util.Set; -import javax.annotation.Nullable; - /** * Tests for BigQueryIO. */ @@ -187,8 +183,7 @@ public BigQueryJsonReader getReaderFromTable( @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten, - @Nullable Boolean useLegacySql) { + BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { return new FakeBigQueryReader(jsonTableRowReturns); } @@ -1749,3 +1744,4 @@ public boolean accept(File pathname) { }}).length); } } + From 502f99f6dfb233b2681ea6be55b8f63d900c4e67 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 30 Jan 2017 14:20:15 -0800 Subject: [PATCH 2/4] fixups --- .../sdk/util/BigQueryTableRowIterator.java | 1 + .../util/BigQueryTableRowIteratorTest.java | 37 ++++++++++--------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 63bd025099..a518032228 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -445,6 +445,7 @@ private TableReference executeQueryAndWaitForCompletion() destinationTable.setDatasetId(temporaryDatasetId); destinationTable.setTableId(temporaryTableId); queryConfig.setDestinationTable(destinationTable); + queryConfig.setAllowLargeResults(Boolean.TRUE); Job queryJob = executeWithBackOff( client.jobs().insert(projectId, job), diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java index d6ac5b36ba..94d858aebb 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java @@ -258,14 +258,18 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); - TableReference tableRef = - new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); - JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef); + JobConfigurationQuery resultQueryConfig = + new JobConfigurationQuery().setDestinationTable( + new TableReference() + .setProjectId("project") + .setDatasetId("tempdataset") + .setTableId("temptable") + ); Job getJob = new Job() .setJobReference(new JobReference()) .setStatus(status) - .setConfiguration(new JobConfiguration().setQuery(queryConfig)); + .setConfiguration(new JobConfiguration().setQuery(resultQueryConfig)); when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. @@ -281,8 +285,9 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException String query = String.format( "SELECT \"Arthur\" as name, 42 as count, \"%s\" as photo", photoBytesEncoded); + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { iterator.open(); assertTrue(iterator.advance()); TableRow row = iterator.getCurrent(); @@ -317,7 +322,7 @@ public void testReadFromQueryNoTables() throws IOException, InterruptedException verify(mockTablesDelete).execute(); // Table data read. verify(mockClient).tabledata(); - verify(mockTabledata).list("project", "dataset", "table"); + verify(mockTabledata).list("project", "tempdataset", "temptable"); verify(mockTabledataList).execute(); } @@ -334,18 +339,16 @@ public void testQueryFailed() throws IOException { when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception); String query = "NOT A QUERY"; + JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query); try (BigQueryTableRowIterator iterator = - BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null, null)) { - - try { - iterator.open(); - fail(); - } catch (Exception expected) { - // Verify message explains cause and reports the query. - assertThat(expected.getMessage(), containsString("Error")); - assertThat(expected.getMessage(), containsString(query)); - assertThat(expected.getCause().getMessage(), containsString(errorReason)); - } + BigQueryTableRowIterator.fromQuery(queryConfig, "project", mockClient)) { + iterator.open(); + fail(); + } catch (Exception expected) { + // Verify message explains cause and reports the query. + assertThat(expected.getMessage(), containsString("Error")); + assertThat(expected.getMessage(), containsString(query)); + assertThat(expected.getCause().getMessage(), containsString(errorReason)); } // Job inserted to run the query, then polled once. From 5625ffbf5ab9cfee4debbc38afc6a1fe5c69c892 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 6 Feb 2017 14:48:43 -0800 Subject: [PATCH 3/4] fixups --- .../main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index 3472a8afed..a6d9871287 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1158,6 +1158,7 @@ private void executeQuery( .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) + .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); jobService.startQueryJob(jobRef, queryConfig); @@ -1172,7 +1173,6 @@ private JobConfigurationQuery createBasicQueryConfig() { // then the similar code in BigQueryTableRowIterator#fromQuery should be updated. return new JobConfigurationQuery() .setFlattenResults(flattenResults) - .setPriority("BATCH") .setQuery(query.get()) .setUseLegacySql(useLegacySql); } From 9c59d78112a51451eb9e6ae64a966b0ee2677fb8 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 6 Feb 2017 14:56:50 -0800 Subject: [PATCH 4/4] fixups --- .../java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java | 2 +- .../google/cloud/dataflow/sdk/util/BigQueryServices.java | 2 +- .../cloud/dataflow/sdk/util/BigQueryServicesImpl.java | 8 +++----- .../cloud/dataflow/sdk/util/BigQueryTableRowIterator.java | 4 ++-- .../com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index a6d9871287..f844f49aed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -1075,7 +1075,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { public BoundedReader createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); return new BigQueryReader(this, bqServices.getReaderFromQuery( - bqOptions, createBasicQueryConfig(), executingProject.get())); + bqOptions, executingProject.get(), createBasicQueryConfig())); } @Override diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index df247629f5..43232f699a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -58,7 +58,7 @@ public interface BigQueryServices extends Serializable { * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. */ BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig); /** * An interface for the Cloud BigQuery load service. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index af2ed9e73b..84e718addd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -80,8 +80,8 @@ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableRef @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { - return BigQueryJsonReaderImpl.fromQuery(bqOptions, queryConfig, projectId); + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig); } @VisibleForTesting @@ -516,9 +516,7 @@ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { } private static BigQueryJsonReader fromQuery( - BigQueryOptions bqOptions, - JobConfigurationQuery queryConfig, - String projectId) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new BigQueryJsonReaderImpl( BigQueryTableRowIterator.fromQuery( queryConfig, projectId, Transport.newBigQueryClient(bqOptions).build())); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index a518032228..5ab5c897ed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -408,8 +408,8 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce */ private TableReference executeQueryAndWaitForCompletion() throws IOException, InterruptedException { - checkState(projectId != null, "Cannot dryRun a query in unknown (null) project"); - checkState(queryConfig != null, "Cannot dryRun a null query"); + checkState(projectId != null, "Unable to execute a query without a configured project id"); + checkState(queryConfig != null, "Unable to execute a query without a configured query"); // Dry run query to get source table location Job dryRunJob = new Job() .setConfiguration(new JobConfiguration() diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 61356e18d0..7f9d2e95ce 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -183,7 +183,7 @@ public BigQueryJsonReader getReaderFromTable( @Override public BigQueryJsonReader getReaderFromQuery( - BigQueryOptions bqOptions, JobConfigurationQuery queryConfig, String projectId) { + BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) { return new FakeBigQueryReader(jsonTableRowReturns); }