diff --git a/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithCassandraForLiveIT.java b/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithCassandraForLiveIT.java new file mode 100644 index 0000000000..1731166cce --- /dev/null +++ b/v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithCassandraForLiveIT.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.custom; + +import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException; +import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer; +import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest; +import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO: Rename the class since its being used in both Live and Reverse replication tests and in +// both ITs and LTs +public class CustomTransformationWithCassandraForLiveIT implements ISpannerMigrationTransformer { + + private static final Logger LOG = LoggerFactory.getLogger(CustomShardIdFetcher.class); + + @Override + public void init(String parameters) { + LOG.info("init called with {}", parameters); + } + + @Override + public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request) + throws InvalidTransformationException { + if (request.getTableName().equals("Customers")) { + Map row = new HashMap<>(request.getRequestRow()); + row.put("full_name", row.get("first_name") + " " + row.get("last_name")); + row.put("migration_shard_id", request.getShardId() + "_" + row.get("id")); + MigrationTransformationResponse response = new MigrationTransformationResponse(row, false); + return response; + } + return new MigrationTransformationResponse(null, false); + } + + @Override + public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request) + throws InvalidTransformationException { + if (request.getTableName().equals("customers")) { + Map requestRow = request.getRequestRow(); + Map row = new HashMap<>(); + row.put("full_name", requestRow.get("first_name") + " " + requestRow.get("last_name")); + MigrationTransformationResponse response = new MigrationTransformationResponse(row, false); + return response; + } + return new MigrationTransformationResponse(null, false); + } +} diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java index a1effa9852..0919cf09c5 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java @@ -200,7 +200,7 @@ public interface Options extends PipelineOptions, StreamingOptions { optional = true, description = "Cloud Spanner shadow table prefix.", helpText = "The prefix used to name shadow tables. Default: `shadow_`.") - @Default.String("shadow_") + @Default.String("rev_shadow_") String getShadowTablePrefix(); void setShadowTablePrefix(String value); diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java index 990acfc64d..641e47b18e 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java @@ -116,7 +116,8 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ sourceTable, dmlGeneratorRequest.getNewValuesJson(), dmlGeneratorRequest.getKeyValuesJson(), - dmlGeneratorRequest.getSourceDbTimezoneOffset()); + dmlGeneratorRequest.getSourceDbTimezoneOffset(), + dmlGeneratorRequest.getCustomTransformationResponse()); if (pkColumnNameValues == null) { LOG.warn( "Failed to generate primary key values for table {}. Skipping the record.", @@ -166,7 +167,8 @@ private static DMLGeneratorResponse generateDMLResponse( sourceTable, dmlGeneratorRequest.getNewValuesJson(), dmlGeneratorRequest.getKeyValuesJson(), - dmlGeneratorRequest.getSourceDbTimezoneOffset()); + dmlGeneratorRequest.getSourceDbTimezoneOffset(), + dmlGeneratorRequest.getCustomTransformationResponse()); Map> allColumnNamesAndValues = ImmutableMap.>builder() .putAll(pkColumnNameValues) @@ -287,6 +289,7 @@ private static DMLGeneratorResponse getDeleteStatementCQL( * @param newValuesJson the JSON object containing new values for columns. * @param keyValuesJson the JSON object containing key values for columns. * @param sourceDbTimezoneOffset the timezone offset of the source database. + * @param customTransformationResponse the custom transformation * @return a map of column names to their corresponding prepared statement value objects. *

This method: 1. Iterates over the non-primary key column definitions in the source table * schema. 2. Maps each column in the source table schema to its corresponding column in the @@ -299,9 +302,14 @@ private static Map> getColumnValues( SourceTable sourceTable, JSONObject newValuesJson, JSONObject keyValuesJson, - String sourceDbTimezoneOffset) { + String sourceDbTimezoneOffset, + Map customTransformationResponse) { Map> response = new HashMap<>(); Set sourcePKs = sourceTable.getPrimaryKeySet(); + Set customTransformColumns = null; + if (customTransformationResponse != null) { + customTransformColumns = customTransformationResponse.keySet(); + } for (Map.Entry entry : sourceTable.getColDefs().entrySet()) { SourceColumnDefinition sourceColDef = entry.getValue(); @@ -317,7 +325,14 @@ private static Map> getColumnValues( } String spannerColumnName = spannerColDef.getName(); PreparedStatementValueObject columnValue; - if (keyValuesJson.has(spannerColumnName)) { + if (customTransformColumns != null + && customTransformColumns.contains(sourceColDef.getName())) { + String cassandraType = sourceColDef.getType().getName().toLowerCase(); + String columnName = spannerColDef.getName(); + columnValue = + PreparedStatementValueObject.create( + cassandraType, customTransformationResponse.get(columnName)); + } else if (keyValuesJson.has(spannerColumnName)) { columnValue = getMappedColumnValue( spannerColDef, sourceColDef, keyValuesJson, sourceDbTimezoneOffset); @@ -344,6 +359,7 @@ private static Map> getColumnValues( * @param newValuesJson the JSON object containing new values for columns. * @param keyValuesJson the JSON object containing key values for columns. * @param sourceDbTimezoneOffset the timezone offset of the source database. + * @param customTransformationResponse the user defined transformation. * @return a map of primary key column names to their corresponding prepared statement value * objects, or null if a required column is missing. *

This method: 1. Iterates over the primary key definitions in the source table schema. 2. @@ -357,10 +373,14 @@ private static Map> getPkColumnValues( SourceTable sourceTable, JSONObject newValuesJson, JSONObject keyValuesJson, - String sourceDbTimezoneOffset) { + String sourceDbTimezoneOffset, + Map customTransformationResponse) { Map> response = new HashMap<>(); ColumnPK[] sourcePKs = sourceTable.getPrimaryKeys(); - + Set customTransformColumns = null; + if (customTransformationResponse != null) { + customTransformColumns = customTransformationResponse.keySet(); + } for (ColumnPK currentSourcePK : sourcePKs) { String colId = currentSourcePK.getColId(); SourceColumnDefinition sourceColDef = sourceTable.getColDefs().get(colId); @@ -373,7 +393,14 @@ private static Map> getPkColumnValues( } String spannerColumnName = spannerColDef.getName(); PreparedStatementValueObject columnValue; - if (keyValuesJson.has(spannerColumnName)) { + if (customTransformColumns != null + && customTransformColumns.contains(sourceColDef.getName())) { + String cassandraType = sourceColDef.getType().getName().toLowerCase(); + String columnName = spannerColDef.getName(); + columnValue = + PreparedStatementValueObject.create( + cassandraType, customTransformationResponse.get(columnName)); + } else if (keyValuesJson.has(spannerColumnName)) { columnValue = getMappedColumnValue( spannerColDef, sourceColDef, keyValuesJson, sourceDbTimezoneOffset); diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java index 544adc6728..e4be1ff3b6 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandler.java @@ -172,8 +172,13 @@ private static ByteBuffer parseBlobType(Object colValue) { return ByteBuffer.wrap((byte[]) colValue); } else if (colValue instanceof ByteBuffer) { return (ByteBuffer) colValue; + } else { + String strVal = (String) colValue; + if (!strVal.matches("^[01]+$")) { + return ByteBuffer.wrap(java.util.Base64.getDecoder().decode(strVal)); + } } - return ByteBuffer.wrap(java.util.Base64.getDecoder().decode((String) colValue)); + throw new IllegalArgumentException("Invalid colValue: " + colValue); } /** @@ -322,18 +327,22 @@ private static Object handleSpannerColumnType( String spannerType, String columnName, JSONObject valuesJson) { try { if (spannerType.contains("string")) { - return valuesJson.optString(columnName); + String value = valuesJson.optString(columnName); + return value.isEmpty() ? null : value; } else if (spannerType.contains("bytes")) { if (valuesJson.isNull(columnName)) { return null; } String hexEncodedString = valuesJson.optString(columnName); + if (hexEncodedString.isEmpty()) { + return null; + } return safeHandle( () -> { try { - return safeHandle(() -> convertBinaryEncodedStringToByteArray(hexEncodedString)); + return safeHandle(() -> parseBlobType(hexEncodedString)); } catch (IllegalArgumentException e) { - return parseBlobType(hexEncodedString); + return convertBinaryEncodedStringToByteArray(hexEncodedString); } }); } else { diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDBCustomTransformationIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDBCustomTransformationIT.java new file mode 100644 index 0000000000..654e8eaac7 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDBCustomTransformationIT.java @@ -0,0 +1,255 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE; +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; +import com.google.pubsub.v1.SubscriptionName; +import java.io.IOException; +import java.time.Duration; +import java.util.HashSet; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.storage.GcsResourceManager; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(SpannerToSourceDb.class) +@RunWith(JUnit4.class) +public class SpannerToCassandraSourceDBCustomTransformationIT extends SpannerToSourceDbITBase { + private static final Logger LOG = + LoggerFactory.getLogger(SpannerToCassandraSourceDBCustomTransformationIT.class); + private static final String SPANNER_DDL_RESOURCE = + "SpannerToCassandraSourceIT/spanner-transformation-schema.sql"; + private static final String CASSANDRA_SCHEMA_FILE_RESOURCE = + "SpannerToCassandraSourceIT/cassandra-transformation-schema.sql"; + private static final String CASSANDRA_CONFIG_FILE_RESOURCE = + "SpannerToCassandraSourceIT/cassandra-config-template.conf"; + + private static final String CUSTOMER_TABLE = "Customers"; + private static final HashSet testInstances = + new HashSet<>(); + private static PipelineLauncher.LaunchInfo jobInfo; + public static SpannerResourceManager spannerResourceManager; + private static SpannerResourceManager spannerMetadataResourceManager; + public static CassandraResourceManager cassandraResourceManager; + private static GcsResourceManager gcsResourceManager; + private static PubsubResourceManager pubsubResourceManager; + private SubscriptionName subscriptionName; + + /** + * Setup resource managers and Launch dataflow job once during the execution of this test class. + * + * @throws IOException + */ + @Before + public void setUp() throws IOException, InterruptedException { + skipBaseCleanup = true; + synchronized (SpannerToCassandraSourceDBCustomTransformationIT.class) { + testInstances.add(this); + if (jobInfo == null) { + spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE); + spannerMetadataResourceManager = createSpannerMetadataDatabase(); + + cassandraResourceManager = generateKeyspaceAndBuildCassandraResource(); + gcsResourceManager = + GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials) + .build(); + createAndUploadCassandraConfigToGcs( + gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE); + createCassandraSchema(cassandraResourceManager, CASSANDRA_SCHEMA_FILE_RESOURCE); + pubsubResourceManager = setUpPubSubResourceManager(); + CustomTransformation customTransformation = + CustomTransformation.builder( + "input/customShard.jar", + "com.custom.CustomTransformationWithCassandraForLiveIT") + .build(); + subscriptionName = + createPubsubResources( + getClass().getSimpleName(), + pubsubResourceManager, + getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, "")); + createAndUploadJarToGcs(gcsResourceManager); + jobInfo = + launchDataflowJob( + gcsResourceManager, + spannerResourceManager, + spannerMetadataResourceManager, + subscriptionName.toString(), + null, + null, + null, + null, + customTransformation, + CASSANDRA_SOURCE_TYPE); + } + } + } + + /** + * Cleanup dataflow job and all the resources and resource managers. + * + * @throws IOException + */ + @AfterClass + public static void cleanUp() throws IOException { + for (SpannerToCassandraSourceDBCustomTransformationIT instance : testInstances) { + instance.tearDownBase(); + } + ResourceManagerUtils.cleanResources( + spannerResourceManager, + cassandraResourceManager, + spannerMetadataResourceManager, + gcsResourceManager, + pubsubResourceManager); + } + + /** + * Tests the data flow from Spanner to Cassandra. + * + *

This test ensures that a basic row is successfully written to Spanner and subsequently + * appears in Cassandra, validating end-to-end data consistency. + * + * @throws InterruptedException if the thread is interrupted during execution. + * @throws IOException if an I/O error occurs during the test execution. + */ + @Test + public void testCustomTransformationForCassandra() throws InterruptedException, IOException { + assertThatPipeline(jobInfo).isRunning(); + writeBasicRowInSpanner(); + assertBasicRowInCassandraDB(); + } + + /** + * Writes basic rows to multiple tables in Google Cloud Spanner. + * + *

This method performs the following operations: + * + *

The transaction uses a Spanner client with a specific transaction tag + * ("txBy=forwardMigration"). + */ + private void writeBasicRowInSpanner() { + Mutation m1 = + Mutation.newInsertOrUpdateBuilder(CUSTOMER_TABLE) + .set("id") + .to(1) + .set("first_name") + .to("Jone") + .set("last_name") + .to("Woe") + .build(); + spannerResourceManager.write(m1); + } + + /** + * Asserts that a basic row exists in the Cassandra database. + * + *

This method performs the following steps: + * + *

    + *
  • Waits for the condition that ensures one row exists in the Cassandra table {@code + * USER_TABLE}. + *
  • Retrieves and logs rows from the Cassandra table. + *
  • Checks if exactly one row is present in the table. + *
  • Verifies that the row contains expected values for columns: {@code id}, {@code + * full_name}, and {@code from}. + *
+ * + * @throws InterruptedException if the thread is interrupted while waiting for the row count + * condition. + * @throws RuntimeException if reading from the Cassandra table fails. + */ + private void assertBasicRowInCassandraDB() throws InterruptedException { + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition( + createConfig(jobInfo, Duration.ofMinutes(10)), + () -> getRowCount(CUSTOMER_TABLE) == 1); + + /* + * Added to handle updates. + * TODO(khajanchi@), explore if this sleep be replaced with something more definite. + */ + Thread.sleep(Duration.ofMinutes(1L).toMillis()); + + assertThatResult(result).meetsConditions(); + + Iterable rows; + try { + LOG.info("Reading from Cassandra table: {}", CUSTOMER_TABLE); + rows = cassandraResourceManager.readTable(CUSTOMER_TABLE); + LOG.info("Cassandra Rows: {}", rows.toString()); + } catch (Exception e) { + throw new RuntimeException("Failed to read from Cassandra table: " + CUSTOMER_TABLE, e); + } + + /* + * Added to handle updates. + * TODO(khajanchi@), explore if this sleep be replaced with something more definite. + */ + Thread.sleep(Duration.ofMinutes(1L).toMillis()); + + assertThat(rows).hasSize(1); + + for (Row row : rows) { + LOG.info("Cassandra Row to Assert: {}", row.getFormattedContents()); + assertThat(row.getString("full_name")).isEqualTo("Jone Woe"); + assertThat(row.getString("first_name")).isEqualTo("Jone"); + assertThat(row.getString("last_name")).isEqualTo("Woe"); + assertThat(row.getInt("id")).isEqualTo(1); + } + } + + /** + * Retrieves the total row count of a specified table in Cassandra. + * + *

This method executes a `SELECT COUNT(*)` query on the given table and returns the number of + * rows present in it. + * + * @param tableName the name of the table whose row count is to be retrieved. + * @return the total number of rows in the specified table. + * @throws RuntimeException if the query does not return a result. + */ + private long getRowCount(String tableName) { + String query = String.format("SELECT COUNT(*) FROM %s", tableName); + ResultSet resultSet = cassandraResourceManager.executeStatement(query); + Row row = resultSet.one(); + if (row != null) { + return row.getLong(0); + } else { + throw new RuntimeException("Query did not return a result for table: " + tableName); + } + } +} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java index 496c10a560..f6bd755aa5 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java @@ -27,8 +27,6 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Options; -import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; @@ -42,6 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineOperator; @@ -49,8 +48,6 @@ import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; import org.apache.beam.it.gcp.storage.GcsResourceManager; -import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; -import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -210,7 +207,8 @@ private void assertDeleteRowInCassandraDB() throws InterruptedException { PipelineOperator.Result result = pipelineOperator() .waitForCondition( - createConfig(jobInfo, Duration.ofMinutes(10)), () -> getRowCount(USER_TABLE) == 0); + createConfig(jobInfo, Duration.ofMinutes(10)), + () -> getRowCount(USER_TABLE_2) == 0); assertThatResult(result).meetsConditions(); } @@ -271,21 +269,6 @@ private long getRowCount(String tableName) { } } - /** - * Writes basic rows to multiple tables in Google Cloud Spanner. - * - *

This method performs the following operations: - * - *

    - *
  • Inserts or updates a row in the "users" table with an ID of 1. - *
  • Inserts or updates a row in the "users2" table with an ID of 2. - *
  • Executes a transactionally buffered insert/update operation in the "users" table with an - * ID of 3, using a transaction tag for tracking. - *
- * - * The transaction uses a Spanner client with a specific transaction tag - * ("txBy=forwardMigration"). - */ private void writeBasicRowInSpanner() { Mutation m1 = Mutation.newInsertOrUpdateBuilder(USER_TABLE) @@ -306,53 +289,8 @@ private void writeBasicRowInSpanner() { .to("BB") .build(); spannerResourceManager.write(m2); - - // Write a single record to Spanner for the given logical shard - // Add the record with the transaction tag as txBy= - SpannerConfig spannerConfig = - SpannerConfig.create() - .withProjectId(PROJECT) - .withInstanceId(spannerResourceManager.getInstanceId()) - .withDatabaseId(spannerResourceManager.getDatabaseId()); - SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); - spannerAccessor - .getDatabaseClient() - .readWriteTransaction( - Options.tag("txBy=forwardMigration"), - Options.priority(spannerConfig.getRpcPriority().get())) - .run( - (TransactionRunner.TransactionCallable) - transaction -> { - Mutation m3 = - Mutation.newInsertOrUpdateBuilder(USER_TABLE_2) - .set("id") - .to(3) - .set("full_name") - .to("GG") - .build(); - transaction.buffer(m3); - return null; - }); } - /** - * Asserts that a basic row exists in the Cassandra database. - * - *

This method performs the following steps: - * - *

    - *
  • Waits for the condition that ensures one row exists in the Cassandra table {@code - * USER_TABLE}. - *
  • Retrieves and logs rows from the Cassandra table. - *
  • Checks if exactly one row is present in the table. - *
  • Verifies that the row contains expected values for columns: {@code id}, {@code - * full_name}, and {@code from}. - *
- * - * @throws InterruptedException if the thread is interrupted while waiting for the row count - * condition. - * @throws RuntimeException if reading from the Cassandra table fails. - */ private void assertBasicRowInCassandraDB() throws InterruptedException { PipelineOperator.Result result = pipelineOperator() @@ -486,6 +424,104 @@ private void writeAllDataTypeRowsInSpanner() { .build(); spannerResourceManager.write(mutation); + + Mutation mutationAllNull = + Mutation.newInsertOrUpdateBuilder(ALL_DATA_TYPES_TABLE) + .set("varchar_column") + .to("ForNull") // Only this column has a value + .set("tinyint_column") + .to(Value.int64(null)) + .set("text_column") + .to(Value.string(null)) + .set("date_column") + .to(Value.date(null)) + .set("smallint_column") + .to(Value.int64(null)) + .set("mediumint_column") + .to(Value.int64(null)) + .set("int_column") + .to(Value.int64(null)) + .set("bigint_column") + .to(Value.int64(null)) + .set("float_column") + .to(Value.float64(null)) + .set("double_column") + .to(Value.float64(null)) + .set("decimal_column") + .to(Value.numeric(null)) + .set("datetime_column") + .to(Value.timestamp(null)) + .set("timestamp_column") + .to(Value.timestamp(null)) + .set("time_column") + .to(Value.string(null)) + .set("year_column") + .to(Value.string(null)) + .set("char_column") + .to(Value.string(null)) + .set("tinytext_column") + .to(Value.string(null)) + .set("mediumtext_column") + .to(Value.string(null)) + .set("longtext_column") + .to(Value.string(null)) + .set("enum_column") + .to(Value.string(null)) + .set("bool_column") + .to(Value.bool(null)) + .set("other_bool_column") + .to(Value.bool(null)) + .set("bytes_column") + .to(Value.bytes(null)) + .set("list_text_column") + .to(Value.json(null)) + .set("list_int_column") + .to(Value.json(null)) + .set("frozen_list_bigint_column") + .to(Value.json(null)) + .set("set_text_column") + .to(Value.json(null)) + .set("set_date_column") + .to(Value.json(null)) + .set("frozen_set_bool_column") + .to(Value.json(null)) + .set("map_text_to_int_column") + .to(Value.json(null)) + .set("map_date_to_text_column") + .to(Value.json(null)) + .set("frozen_map_int_to_bool_column") + .to(Value.json(null)) + .set("map_text_to_list_column") + .to(Value.json(null)) + .set("map_text_to_set_column") + .to(Value.json(null)) + .set("set_of_maps_column") + .to(Value.json(null)) + .set("list_of_sets_column") + .to(Value.json(null)) + .set("frozen_map_text_to_list_column") + .to(Value.json(null)) + .set("frozen_map_text_to_set_column") + .to(Value.json(null)) + .set("frozen_set_of_maps_column") + .to(Value.json(null)) + .set("frozen_list_of_sets_column") + .to(Value.json(null)) + .set("varint_column") + .to(Value.string(null)) + .set("inet_column") + .to(Value.string(null)) + .build(); + + spannerResourceManager.write(mutationAllNull); + + Mutation mutationForInsertOrUpdatePrimaryKey = + Mutation.newInsertOrUpdateBuilder(ALL_DATA_TYPES_TABLE) + .set("varchar_column") + .to("PKey") + .build(); + + spannerResourceManager.write(mutationForInsertOrUpdatePrimaryKey); } /** @@ -511,35 +547,13 @@ private void assertAll(Runnable... assertions) throws MultipleFailureException { } } - /** - * Validates that all data type rows inserted in Spanner have been correctly migrated and stored - * in Cassandra. - * - *

This method ensures that the data in the Cassandra table {@code ALL_DATA_TYPES_TABLE} - * matches the expected values after migration. It waits for the pipeline to process the data, - * reads the data from Cassandra, and asserts all column values. - * - *

Assertions: - * - *

    - *
  • Basic Data Types - Ensures correct values for varchar, bigint, bool, char, date, - * datetime, decimal, double, float. - *
  • Collections - Validates frozen lists, sets, and maps including nested structures. - *
  • Lists and Sets - Ensures list and set columns contain expected elements. - *
  • Maps - Validates various map column structures including text-to-int, date-to-text, and - * list/set mappings. - *
- * - * @throws InterruptedException if the thread is interrupted while waiting for pipeline execution. - * @throws MultipleFailureException if multiple assertion failures occur. - */ private void assertAllDataTypeRowsInCassandraDB() throws InterruptedException, MultipleFailureException { PipelineOperator.Result result = pipelineOperator() .waitForCondition( createConfig(jobInfo, Duration.ofMinutes(10)), - () -> getRowCount(ALL_DATA_TYPES_TABLE) == 1); + () -> getRowCount(ALL_DATA_TYPES_TABLE) == 3); assertThatResult(result).meetsConditions(); Iterable rows; try { @@ -548,158 +562,193 @@ private void assertAllDataTypeRowsInCassandraDB() throw new RuntimeException("Failed to read from Cassandra table: " + ALL_DATA_TYPES_TABLE, e); } - assertThat(rows).hasSize(1); - - Row row = rows.iterator().next(); + assertThat(rows).hasSize(3); + for (Row row : rows) { + LOG.info("Cassandra Row to Assert for All Data Types: {}", row.getFormattedContents()); + String varcharColumn = row.getString("varchar_column"); + if (Objects.equals(varcharColumn, "SampleVarchar")) { + assertAll( + () -> assertThat(row.getLong("bigint_column")).isEqualTo(9223372036854775807L), + () -> assertThat(row.getBoolean("bool_column")).isTrue(), + () -> assertThat(row.getString("char_column")).isEqualTo("CHAR_DATA"), + () -> + assertThat(row.getLocalDate("date_column")) + .isEqualTo(java.time.LocalDate.of(2025, 1, 27)), + () -> + assertThat(row.getInstant("datetime_column")) + .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), + () -> + assertThat(row.getBigDecimal("decimal_column")) + .isEqualTo(new BigDecimal("12345.6789")), + () -> assertThat(row.getDouble("double_column")).isEqualTo(2.718281828459045), + () -> assertThat(row.getFloat("float_column")).isEqualTo(3.14159f), - assertThat(rows).hasSize(1); - assertAll( - // Basic Data Types - () -> assertThat(row.getString("varchar_column")).isEqualTo("SampleVarchar"), - () -> assertThat(row.getLong("bigint_column")).isEqualTo(9223372036854775807L), - () -> assertThat(row.getBoolean("bool_column")).isTrue(), - () -> assertThat(row.getString("char_column")).isEqualTo("CHAR_DATA"), - () -> - assertThat(row.getLocalDate("date_column")) - .isEqualTo(java.time.LocalDate.of(2025, 1, 27)), - () -> - assertThat(row.getInstant("datetime_column")) - .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), - () -> - assertThat(row.getBigDecimal("decimal_column")).isEqualTo(new BigDecimal("12345.6789")), - () -> assertThat(row.getDouble("double_column")).isEqualTo(2.718281828459045), - () -> assertThat(row.getFloat("float_column")).isEqualTo(3.14159f), + // Collections (frozen, list, set, map) + () -> + assertThat(row.getList("frozen_list_bigint_column", Long.class)) + .isEqualTo(Arrays.asList(123456789012345L, 987654321012345L)), + () -> + assertThat(row.getSet("frozen_set_bool_column", Boolean.class)) + .isEqualTo(new HashSet<>(Arrays.asList(false, true))), + () -> + assertThat( + row.getMap("frozen_map_int_to_bool_column", Integer.class, Boolean.class)) + .isEqualTo(Map.of(1, true, 2, false)), + () -> + assertThat(row.getMap("frozen_map_text_to_list_column", String.class, List.class)) + .isEqualTo(Map.of("fruits", Arrays.asList("apple", "banana"))), + () -> + assertThat(row.getMap("frozen_map_text_to_set_column", String.class, Set.class)) + .isEqualTo( + Map.of("vegetables", new HashSet<>(Arrays.asList("carrot", "spinach")))), + () -> + assertThat(row.getSet("frozen_set_of_maps_column", Map.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - // Collections (frozen, list, set, map) - () -> - assertThat(row.getList("frozen_list_bigint_column", Long.class)) - .isEqualTo(Arrays.asList(123456789012345L, 987654321012345L)), - () -> - assertThat(row.getSet("frozen_set_bool_column", Boolean.class)) - .isEqualTo(new HashSet<>(Arrays.asList(false, true))), - () -> - assertThat(row.getMap("frozen_map_int_to_bool_column", Integer.class, Boolean.class)) - .isEqualTo(Map.of(1, true, 2, false)), - () -> - assertThat(row.getMap("frozen_map_text_to_list_column", String.class, List.class)) - .isEqualTo(Map.of("fruits", Arrays.asList("apple", "banana"))), - () -> - assertThat(row.getMap("frozen_map_text_to_set_column", String.class, Set.class)) - .isEqualTo(Map.of("vegetables", new HashSet<>(Arrays.asList("carrot", "spinach")))), - () -> - assertThat(row.getSet("frozen_set_of_maps_column", Map.class)) - .isEqualTo( - new HashSet<>( + // Lists and Sets + () -> + assertThat(row.getList("list_int_column", Integer.class)) + .isEqualTo(Arrays.asList(1, 2, 3, 4, 5)), + () -> + assertThat(row.getList("list_text_column", String.class)) + .isEqualTo(Arrays.asList("apple", "banana", "cherry")), + () -> + assertThat(row.getList("list_of_sets_column", Set.class)) + .isEqualTo( Arrays.asList( - Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - - // Lists and Sets - () -> - assertThat(row.getList("list_int_column", Integer.class)) - .isEqualTo(Arrays.asList(1, 2, 3, 4, 5)), - () -> - assertThat(row.getList("list_text_column", String.class)) - .isEqualTo(Arrays.asList("apple", "banana", "cherry")), - () -> - assertThat(row.getList("list_of_sets_column", Set.class)) - .isEqualTo( - Arrays.asList( - new HashSet<>(Arrays.asList("apple", "banana")), - new HashSet<>(Arrays.asList("carrot", "spinach")))), + new HashSet<>(Arrays.asList("apple", "banana")), + new HashSet<>(Arrays.asList("carrot", "spinach")))), - // Maps - () -> - assertThat( - row.getMap("map_date_to_text_column", java.time.LocalDate.class, String.class)) - .isEqualTo( - Map.of( - java.time.LocalDate.parse("2025-01-27"), "event1", - java.time.LocalDate.parse("2025-02-01"), "event2")), - () -> - assertThat(row.getMap("map_text_to_int_column", String.class, Integer.class)) - .isEqualTo(Map.of("key1", 10, "key2", 20)), - () -> - assertThat(row.getMap("map_text_to_list_column", String.class, List.class)) - .isEqualTo( - Map.of( - "color", - Arrays.asList("red", "green"), - "fruit", - Arrays.asList("apple", "banana"))), - () -> - assertThat(row.getMap("map_text_to_set_column", String.class, Set.class)) - .isEqualTo( - Map.of( - "fruit", - new HashSet<>(Arrays.asList("apple", "banana")), - "vegetables", - new HashSet<>(Arrays.asList("carrot", "spinach")))), + // Maps + () -> + assertThat( + row.getMap( + "map_date_to_text_column", java.time.LocalDate.class, String.class)) + .isEqualTo( + Map.of( + java.time.LocalDate.parse("2025-01-27"), "event1", + java.time.LocalDate.parse("2025-02-01"), "event2")), + () -> + assertThat(row.getMap("map_text_to_int_column", String.class, Integer.class)) + .isEqualTo(Map.of("key1", 10, "key2", 20)), + () -> + assertThat(row.getMap("map_text_to_list_column", String.class, List.class)) + .isEqualTo( + Map.of( + "color", + Arrays.asList("red", "green"), + "fruit", + Arrays.asList("apple", "banana"))), + () -> + assertThat(row.getMap("map_text_to_set_column", String.class, Set.class)) + .isEqualTo( + Map.of( + "fruit", + new HashSet<>(Arrays.asList("apple", "banana")), + "vegetables", + new HashSet<>(Arrays.asList("carrot", "spinach")))), - // Sets - () -> - assertThat(row.getSet("set_date_column", java.time.LocalDate.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - java.time.LocalDate.parse("2025-01-27"), - java.time.LocalDate.parse("2025-02-01")))), - () -> - assertThat(row.getSet("set_text_column", String.class)) - .isEqualTo(new HashSet<>(Arrays.asList("apple", "orange", "banana"))), - () -> - assertThat(row.getSet("set_of_maps_column", Map.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), + // Sets + () -> + assertThat(row.getSet("set_date_column", java.time.LocalDate.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + java.time.LocalDate.parse("2025-01-27"), + java.time.LocalDate.parse("2025-02-01")))), + () -> + assertThat(row.getSet("set_text_column", String.class)) + .isEqualTo(new HashSet<>(Arrays.asList("apple", "orange", "banana"))), + () -> + assertThat(row.getSet("set_of_maps_column", Map.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - // Other Basic Types - () -> assertThat(row.getShort("smallint_column")).isEqualTo((short) 32767), - () -> assertThat(row.getInt("mediumint_column")).isEqualTo(8388607), - () -> assertThat(row.getInt("int_column")).isEqualTo(2147483647), - () -> assertThat(row.getString("enum_column")).isEqualTo("OptionA"), - () -> assertThat(row.getString("year_column")).isEqualTo("2025"), - () -> - assertThat(row.getString("longtext_column")) - .isEqualTo( - "Very long text data that exceeds the medium text column length for long text."), - () -> assertThat(row.getString("tinytext_column")).isEqualTo("Short text for tinytext."), - () -> - assertThat(row.getString("mediumtext_column")) - .isEqualTo("Longer text data for mediumtext column."), - () -> - assertThat(row.getString("text_column")) - .isEqualTo("This is some sample text data for the text column."), - () -> - assertThat(row.getLocalTime("time_column")) - .isEqualTo(java.time.LocalTime.parse("12:30:00.000000000")), - () -> - assertThat(row.getInstant("timestamp_column")) - .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00Z")), - () -> - assertThat(row.getBigInteger("varint_column")) - .isEqualTo(java.math.BigInteger.valueOf(123456789L)), - () -> - assertThat(row.getBytesUnsafe("bytes_column")) - .isEqualTo(ByteBuffer.wrap(ByteArray.copyFrom("Hello world").toByteArray()))); + // Other Basic Types + () -> assertThat(row.getShort("smallint_column")).isEqualTo((short) 32767), + () -> assertThat(row.getInt("mediumint_column")).isEqualTo(8388607), + () -> assertThat(row.getInt("int_column")).isEqualTo(2147483647), + () -> assertThat(row.getString("enum_column")).isEqualTo("OptionA"), + () -> assertThat(row.getString("year_column")).isEqualTo("2025"), + () -> + assertThat(row.getString("longtext_column")) + .isEqualTo( + "Very long text data that exceeds the medium text column length for long text."), + () -> + assertThat(row.getString("tinytext_column")).isEqualTo("Short text for tinytext."), + () -> + assertThat(row.getString("mediumtext_column")) + .isEqualTo("Longer text data for mediumtext column."), + () -> + assertThat(row.getString("text_column")) + .isEqualTo("This is some sample text data for the text column."), + () -> + assertThat(row.getLocalTime("time_column")) + .isEqualTo(java.time.LocalTime.parse("12:30:00.000000000")), + () -> + assertThat(row.getInstant("timestamp_column")) + .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00Z")), + () -> + assertThat(row.getBigInteger("varint_column")) + .isEqualTo(java.math.BigInteger.valueOf(123456789L)), + () -> + assertThat(row.getBytesUnsafe("bytes_column")) + .isEqualTo(ByteBuffer.wrap(ByteArray.copyFrom("Hello world").toByteArray()))); + } else if (Objects.equals(varcharColumn, "PKey") + || Objects.equals(varcharColumn, "ForNull")) { + assertAll( + () -> assertThat(row.isNull("tinyint_column")).isTrue(), + () -> assertThat(row.isNull("text_column")).isTrue(), + () -> assertThat(row.isNull("date_column")).isTrue(), + () -> assertThat(row.isNull("smallint_column")).isTrue(), + () -> assertThat(row.isNull("mediumint_column")).isTrue(), + () -> assertThat(row.isNull("int_column")).isTrue(), + () -> assertThat(row.isNull("bigint_column")).isTrue(), + () -> assertThat(row.isNull("float_column")).isTrue(), + () -> assertThat(row.isNull("double_column")).isTrue(), + () -> assertThat(row.isNull("decimal_column")).isTrue(), + () -> assertThat(row.isNull("datetime_column")).isTrue(), + () -> assertThat(row.isNull("timestamp_column")).isTrue(), + () -> assertThat(row.isNull("time_column")).isTrue(), + () -> assertThat(row.isNull("year_column")).isTrue(), + () -> assertThat(row.isNull("char_column")).isTrue(), + () -> assertThat(row.isNull("tinytext_column")).isTrue(), + () -> assertThat(row.isNull("mediumtext_column")).isTrue(), + () -> assertThat(row.isNull("longtext_column")).isTrue(), + () -> assertThat(row.isNull("enum_column")).isTrue(), + () -> assertThat(row.isNull("bool_column")).isTrue(), + () -> assertThat(row.isNull("other_bool_column")).isTrue(), + () -> assertThat(row.isNull("bytes_column")).isTrue(), + () -> assertThat(row.isNull("list_text_column")).isTrue(), + () -> assertThat(row.isNull("list_int_column")).isTrue(), + () -> assertThat(row.isNull("frozen_list_bigint_column")).isTrue(), + () -> assertThat(row.isNull("set_text_column")).isTrue(), + () -> assertThat(row.isNull("set_date_column")).isTrue(), + () -> assertThat(row.isNull("frozen_set_bool_column")).isTrue(), + () -> assertThat(row.isNull("map_text_to_int_column")).isTrue(), + () -> assertThat(row.isNull("map_date_to_text_column")).isTrue(), + () -> assertThat(row.isNull("frozen_map_int_to_bool_column")).isTrue(), + () -> assertThat(row.isNull("map_text_to_list_column")).isTrue(), + () -> assertThat(row.isNull("map_text_to_set_column")).isTrue(), + () -> assertThat(row.isNull("set_of_maps_column")).isTrue(), + () -> assertThat(row.isNull("list_of_sets_column")).isTrue(), + () -> assertThat(row.isNull("frozen_map_text_to_list_column")).isTrue(), + () -> assertThat(row.isNull("frozen_map_text_to_set_column")).isTrue(), + () -> assertThat(row.isNull("frozen_set_of_maps_column")).isTrue(), + () -> assertThat(row.isNull("frozen_list_of_sets_column")).isTrue(), + () -> assertThat(row.isNull("varint_column")).isTrue(), + () -> assertThat(row.isNull("inet_column")).isTrue()); + } else { + throw new AssertionError("Unexpected row found: " + varcharColumn); + } + } } - /** - * Inserts multiple rows into the Spanner table {@code ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE}, - * ensuring that all values are stored as strings, regardless of their original data type. - * - *

This method writes sample data to the Spanner table, converting all numerical, boolean, and - * date/time values to their string representations. This ensures compatibility for scenarios - * requiring string-based storage. - * - *

Columns and Data Mapping: - * - *

    - *
  • Basic Types: Strings, numbers (converted to strings), booleans. - *
  • Complex Types: JSON representations for lists, sets, and maps. - *
  • Temporal Types: Date, datetime, timestamp values stored as strings. - *
- */ private void writeAllRowsAsStringInSpanner() { Mutation m; m = @@ -967,28 +1016,6 @@ private void writeAllRowsAsStringInSpanner() { spannerResourceManager.write(m); } - /** - * Validates that string-based data stored in Spanner is correctly converted to its actual data - * types when retrieved from Cassandra. - * - *

This method ensures that values stored as strings in Spanner are properly transformed into - * their expected data types in Cassandra. It performs the following: - * - *

    - *
  • Waits for the migration process to complete. - *
  • Reads and verifies that two rows are present in Cassandra. - *
  • Checks specific column values to confirm correct data type conversion. - *
- * - *

Assertions Performed: - * - *

    - *
  • Verifies that {@code varchar_column} retains its expected string value. - *
  • Confirms that {@code tinyint_column} is correctly converted to a {@code byte}. - *
- * - * @throws MultipleFailureException if multiple assertions fail during validation. - */ private void assertStringToActualRowsInCassandraDB() throws MultipleFailureException { PipelineOperator.Result result = pipelineOperator() diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java index 092fe0e00d..e524790440 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java @@ -35,6 +35,7 @@ import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse; import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -144,6 +145,44 @@ public void tableAndAllColumnNameTypesForNullValueMatch() { assertTrue(values.get(1).value() instanceof CassandraTypeHandler.NullClass); } + @Test + public void tableAndAllColumnNameTypesForCustomTransformation() { + Schema schema = SessionFileReader.read("src/test/resources/cassandraSession.json"); + String tableName = "Singers"; + String newValuesString = "{\"Bday\":\"1995-12-12\",\"LastName\":\"ll\"}"; + JSONObject newValuesJson = new JSONObject(newValuesString); + String keyValueString = "{\"SingerId\":\"999\"}"; + JSONObject keyValuesJson = new JSONObject(keyValueString); + String modType = "INSERT"; + Map customTransformation = new HashMap<>(); + customTransformation.put("SingerId", "1000"); + customTransformation.put("LastName", "kk ll"); + CassandraDMLGenerator cassandraDMLGenerator = new CassandraDMLGenerator(); + DMLGeneratorResponse dmlGeneratorResponse = + cassandraDMLGenerator.getDMLStatement( + new DMLGeneratorRequest.Builder( + modType, tableName, newValuesJson, keyValuesJson, "+00:00") + .setSchema(schema) + .setCommitTimestamp(Timestamp.now()) + .setCustomTransformationResponse(customTransformation) + .build()); + String sql = dmlGeneratorResponse.getDmlStatement(); + + assertTrue(sql.contains("LastName")); + assertEquals(4, ((PreparedStatementGeneratedResponse) dmlGeneratorResponse).getValues().size()); + List> values = + ((PreparedStatementGeneratedResponse) dmlGeneratorResponse).getValues(); + assertEquals( + 1000, + CassandraTypeHandler.castToExpectedType(values.get(0).dataType(), values.get(0).value())); + assertEquals( + "kk ll", + CassandraTypeHandler.castToExpectedType(values.get(2).dataType(), values.get(2).value())); + assertEquals( + Instant.parse("1995-12-12T00:00:00Z"), + CassandraTypeHandler.castToExpectedType(values.get(1).dataType(), values.get(1).value())); + } + @Test public void tableNameMatchSourceColumnNotPresentInSpanner() { Schema schema = SessionFileReader.read("src/test/resources/cassandraSession.json"); diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-transformation-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-transformation-schema.sql new file mode 100644 index 0000000000..149f2b0329 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-transformation-schema.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS customers; +CREATE TABLE customers ( + id int PRIMARY KEY, + full_name text, + first_name text, + last_name text +); \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql index 0b6ae0b7a6..c2f365384a 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql @@ -1,14 +1,20 @@ +DROP TABLE IF EXISTS users; + CREATE TABLE IF NOT EXISTS users ( id INT64 NOT NULL, full_name STRING(25), `from` STRING(25) ) PRIMARY KEY(id); +DROP TABLE IF EXISTS users2; + CREATE TABLE IF NOT EXISTS users2 ( id INT64 NOT NULL, full_name STRING(25), ) PRIMARY KEY(id); +DROP TABLE IF EXISTS alldatatypetransformation; + CREATE TABLE IF NOT EXISTS alldatatypetransformation ( varchar_column STRING(20) NOT NULL, tinyint_column STRING(MAX), @@ -52,6 +58,8 @@ CREATE TABLE IF NOT EXISTS alldatatypetransformation ( varint_column STRING(MAX) ) PRIMARY KEY(varchar_column); +DROP TABLE IF EXISTS alldatatypecolumns; + CREATE TABLE IF NOT EXISTS alldatatypecolumns ( varchar_column STRING(20) NOT NULL, tinyint_column INT64, diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-transformation-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-transformation-schema.sql new file mode 100644 index 0000000000..9e2db19e8f --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-transformation-schema.sql @@ -0,0 +1,13 @@ +DROP TABLE IF EXISTS customers; +CREATE TABLE IF NOT EXISTS customers ( + id INT64 NOT NULL, + full_name STRING(125), + first_name STRING(25), + last_name STRING(25) +) PRIMARY KEY(id); + +CREATE CHANGE STREAM allstream + FOR ALL OPTIONS ( + value_capture_type = 'NEW_ROW', + retention_period = '7d' +);