diff --git a/README.md b/README.md index fccedd29a7..ebb2b9cedb 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file: com.google.cloud libraries-bom - 25.4.0 + 26.1.3 pom import diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index f68820771c..937b1f693f 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -31,7 +31,7 @@ com.google.cloud libraries-bom - 25.4.0 + 26.1.3 pom import diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 3e74e84694..a95388b47f 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -23,10 +23,13 @@ import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableResult; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; @@ -37,6 +40,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import java.io.IOException; +import java.util.Map; import java.util.concurrent.Phaser; import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; @@ -69,7 +73,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St JSONArray jsonArr = new JSONArray(); for (int j = 0; j < 10; j++) { JSONObject record = new JSONObject(); - record.put("test_string", String.format("record %03d-%03d", i, j)); + StringBuilder sbSuffix = new StringBuilder(); + for (int k = 0; k < j; k++) { + sbSuffix.append(k); + } + record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString())); jsonArr.put(record); } @@ -78,9 +86,31 @@ public static void writeToDefaultStream(String projectId, String datasetName, St // Final cleanup for the stream during worker teardown. writer.cleanup(); + verifyExpectedRowCount(parentTable, 12); System.out.println("Appended records successfully."); } + private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + TableResult results = bigquery.query(queryConfig); + int countRowsActual = + Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + if (countRowsActual != expectedRowCount) { + throw new RuntimeException( + "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); + } + } + private static class AppendContext { JSONArray data; @@ -170,7 +200,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { } public void onSuccess(AppendRowsResponse response) { - System.out.format("Append success\n"); + System.out.format("Append success%n"); done(); } @@ -182,16 +212,56 @@ public void onFailure(Throwable throwable) { if (appendContext.retryCount < MAX_RETRY_COUNT && RETRIABLE_ERROR_CODES.contains(status.getCode())) { appendContext.retryCount++; - try { - // Since default stream appends are not ordered, we can simply retry the appends. - // Retrying with exclusive streams requires more careful consideration. - this.parent.append(appendContext); - // Mark the existing attempt as done since it's being retried. + // Use a separate thread to avoid potentially blocking while we are in a callback. + new Thread( + () -> { + try { + // Since default stream appends are not ordered, we can simply retry the + // appends. + // Retrying with exclusive streams requires more careful consideration. + this.parent.append(appendContext); + } catch (Exception e) { + // Fall through to return error. + System.out.format("Failed to retry append: %s%n", e); + } + }) + .start(); + // Mark the existing attempt as done since it's being retried. + done(); + return; + } + + if (throwable instanceof AppendSerializtionError) { + AppendSerializtionError ase = (AppendSerializtionError) throwable; + Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); + if (rowIndexToErrorMessage.size() > 0) { + // Omit the faulty rows + JSONArray dataNew = new JSONArray(); + for (int i = 0; i < appendContext.data.length(); i++) { + if (!rowIndexToErrorMessage.containsKey(i)) { + dataNew.put(appendContext.data.get(i)); + } else { + // process faulty rows by placing them on a dead-letter-queue, for instance + } + } + + // Mark the existing attempt as done since we got a response for it done(); + + // Retry the remaining valid rows, but using a separate thread to + // avoid potentially blocking while we are in a callback. + if (dataNew.length() > 0) { + new Thread( + () -> { + try { + this.parent.append(new AppendContext(dataNew, 0)); + } catch (Exception e2) { + System.out.format("Failed to retry append with filtered rows: %s%n", e2); + } + }) + .start(); + } return; - } catch (Exception e) { - // Fall through to return error. - System.out.format("Failed to retry append: %s\n", e); } } @@ -202,7 +272,7 @@ public void onFailure(Throwable throwable) { (storageException != null) ? storageException : new RuntimeException(throwable); } } - System.out.format("Error: %s\n", throwable); + System.out.format("Error that arrived: %s%n", throwable); done(); } diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java index 871902e0ae..56740eb61a 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java @@ -24,7 +24,6 @@ import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; @@ -73,7 +72,11 @@ public void setUp() { // Create a new dataset and table for each test. datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); - Schema schema = Schema.of(Field.of("test_string", StandardSQLTypeName.STRING)); + Schema schema = + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("test_string", StandardSQLTypeName.STRING) + .setMaxLength(20L) + .build()); bigquery.create(DatasetInfo.newBuilder(datasetName).build()); TableInfo tableInfo = TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))