diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java index bed139c110..64fc68f592 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java @@ -42,6 +42,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineOperator; @@ -677,6 +678,96 @@ private void writeAllDataTypeRowsInSpanner() { .build(); spannerResourceManager.write(mutation); + + Mutation mutationAllNull = + Mutation.newInsertOrUpdateBuilder(ALL_DATA_TYPES_TABLE) + .set("varchar_column") + .to("SampleVarcharForNull") // Only this column has a value + .set("tinyint_column") + .to(Value.int64(null)) + .set("text_column") + .to(Value.string(null)) + .set("date_column") + .to(Value.date(null)) + .set("smallint_column") + .to(Value.int64(null)) + .set("mediumint_column") + .to(Value.int64(null)) + .set("int_column") + .to(Value.int64(null)) + .set("bigint_column") + .to(Value.int64(null)) + .set("float_column") + .to(Value.float64(null)) + .set("double_column") + .to(Value.float64(null)) + .set("decimal_column") + .to(Value.numeric(null)) + .set("datetime_column") + .to(Value.timestamp(null)) + .set("timestamp_column") + .to(Value.timestamp(null)) + .set("time_column") + .to(Value.string(null)) + .set("year_column") + .to(Value.string(null)) + .set("char_column") + .to(Value.string(null)) + .set("tinytext_column") + .to(Value.string(null)) + .set("mediumtext_column") + .to(Value.string(null)) + .set("longtext_column") + .to(Value.string(null)) + .set("enum_column") + .to(Value.string(null)) + .set("bool_column") + .to(Value.bool(null)) + .set("other_bool_column") + .to(Value.bool(null)) + .set("bytes_column") + .to(Value.bytes(null)) + .set("list_text_column") + .to(Value.json(null)) + .set("list_int_column") + .to(Value.json(null)) + .set("frozen_list_bigint_column") + .to(Value.json(null)) + .set("set_text_column") + .to(Value.json(null)) + .set("set_date_column") + .to(Value.json(null)) + .set("frozen_set_bool_column") + .to(Value.json(null)) + .set("map_text_to_int_column") + .to(Value.json(null)) + .set("map_date_to_text_column") + .to(Value.json(null)) + .set("frozen_map_int_to_bool_column") + .to(Value.json(null)) + .set("map_text_to_list_column") + .to(Value.json(null)) + .set("map_text_to_set_column") + .to(Value.json(null)) + .set("set_of_maps_column") + .to(Value.json(null)) + .set("list_of_sets_column") + .to(Value.json(null)) + .set("frozen_map_text_to_list_column") + .to(Value.json(null)) + .set("frozen_map_text_to_set_column") + .to(Value.json(null)) + .set("frozen_set_of_maps_column") + .to(Value.json(null)) + .set("frozen_list_of_sets_column") + .to(Value.json(null)) + .set("varint_column") + .to(Value.string(null)) + .set("inet_column") + .to(Value.string(null)) + .build(); + + spannerResourceManager.write(mutationAllNull); } /** @@ -729,8 +820,8 @@ private void assertAllDataTypeRowsInCassandraDB() PipelineOperator.Result result = pipelineOperator() .waitForCondition( - createConfig(jobInfo, Duration.ofMinutes(10)), - () -> getRowCount(ALL_DATA_TYPES_TABLE) == 1); + createConfig(jobInfo, Duration.ofMinutes(20)), + () -> getRowCount(ALL_DATA_TYPES_TABLE) == 2); assertThatResult(result).meetsConditions(); Iterable rows; try { @@ -739,140 +830,190 @@ private void assertAllDataTypeRowsInCassandraDB() throw new RuntimeException("Failed to read from Cassandra table: " + ALL_DATA_TYPES_TABLE, e); } - assertThat(rows).hasSize(1); + assertThat(rows).hasSize(2); + for (Row row : rows) { + LOG.info("Cassandra Row to Assert for All Data Types: {}", row.getFormattedContents()); + String varcharColumn = row.getString("varchar_column"); + if (Objects.equals(varcharColumn, "SampleVarchar")) { + assertAll( + () -> assertThat(row.getLong("bigint_column")).isEqualTo(9223372036854775807L), + () -> assertThat(row.getBoolean("bool_column")).isTrue(), + () -> assertThat(row.getString("char_column")).isEqualTo("CHAR_DATA"), + () -> + assertThat(row.getLocalDate("date_column")) + .isEqualTo(java.time.LocalDate.of(2025, 1, 27)), + () -> + assertThat(row.getInstant("datetime_column")) + .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), + () -> + assertThat(row.getBigDecimal("decimal_column")) + .isEqualTo(new BigDecimal("12345.6789")), + () -> assertThat(row.getDouble("double_column")).isEqualTo(2.718281828459045), + () -> assertThat(row.getFloat("float_column")).isEqualTo(3.14159f), - Row row = rows.iterator().next(); + // Collections (frozen, list, set, map) + () -> + assertThat(row.getList("frozen_list_bigint_column", Long.class)) + .isEqualTo(Arrays.asList(123456789012345L, 987654321012345L)), + () -> + assertThat(row.getSet("frozen_set_bool_column", Boolean.class)) + .isEqualTo(new HashSet<>(Arrays.asList(false, true))), + () -> + assertThat( + row.getMap("frozen_map_int_to_bool_column", Integer.class, Boolean.class)) + .isEqualTo(Map.of(1, true, 2, false)), + () -> + assertThat(row.getMap("frozen_map_text_to_list_column", String.class, List.class)) + .isEqualTo(Map.of("fruits", Arrays.asList("apple", "banana"))), + () -> + assertThat(row.getMap("frozen_map_text_to_set_column", String.class, Set.class)) + .isEqualTo( + Map.of("vegetables", new HashSet<>(Arrays.asList("carrot", "spinach")))), + () -> + assertThat(row.getSet("frozen_set_of_maps_column", Map.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - assertThat(rows).hasSize(1); - assertAll( - // Basic Data Types - () -> assertThat(row.getString("varchar_column")).isEqualTo("SampleVarchar"), - () -> assertThat(row.getLong("bigint_column")).isEqualTo(9223372036854775807L), - () -> assertThat(row.getBoolean("bool_column")).isTrue(), - () -> assertThat(row.getString("char_column")).isEqualTo("CHAR_DATA"), - () -> - assertThat(row.getLocalDate("date_column")) - .isEqualTo(java.time.LocalDate.of(2025, 1, 27)), - () -> - assertThat(row.getInstant("datetime_column")) - .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), - () -> - assertThat(row.getBigDecimal("decimal_column")).isEqualTo(new BigDecimal("12345.6789")), - () -> assertThat(row.getDouble("double_column")).isEqualTo(2.718281828459045), - () -> assertThat(row.getFloat("float_column")).isEqualTo(3.14159f), - - // Collections (frozen, list, set, map) - () -> - assertThat(row.getList("frozen_list_bigint_column", Long.class)) - .isEqualTo(Arrays.asList(123456789012345L, 987654321012345L)), - () -> - assertThat(row.getSet("frozen_set_bool_column", Boolean.class)) - .isEqualTo(new HashSet<>(Arrays.asList(false, true))), - () -> - assertThat(row.getMap("frozen_map_int_to_bool_column", Integer.class, Boolean.class)) - .isEqualTo(Map.of(1, true, 2, false)), - () -> - assertThat(row.getMap("frozen_map_text_to_list_column", String.class, List.class)) - .isEqualTo(Map.of("fruits", Arrays.asList("apple", "banana"))), - () -> - assertThat(row.getMap("frozen_map_text_to_set_column", String.class, Set.class)) - .isEqualTo(Map.of("vegetables", new HashSet<>(Arrays.asList("carrot", "spinach")))), - () -> - assertThat(row.getSet("frozen_set_of_maps_column", Map.class)) - .isEqualTo( - new HashSet<>( + // Lists and Sets + () -> + assertThat(row.getList("list_int_column", Integer.class)) + .isEqualTo(Arrays.asList(1, 2, 3, 4, 5)), + () -> + assertThat(row.getList("list_text_column", String.class)) + .isEqualTo(Arrays.asList("apple", "banana", "cherry")), + () -> + assertThat(row.getList("list_of_sets_column", Set.class)) + .isEqualTo( Arrays.asList( - Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - - // Lists and Sets - () -> - assertThat(row.getList("list_int_column", Integer.class)) - .isEqualTo(Arrays.asList(1, 2, 3, 4, 5)), - () -> - assertThat(row.getList("list_text_column", String.class)) - .isEqualTo(Arrays.asList("apple", "banana", "cherry")), - () -> - assertThat(row.getList("list_of_sets_column", Set.class)) - .isEqualTo( - Arrays.asList( - new HashSet<>(Arrays.asList("apple", "banana")), - new HashSet<>(Arrays.asList("carrot", "spinach")))), + new HashSet<>(Arrays.asList("apple", "banana")), + new HashSet<>(Arrays.asList("carrot", "spinach")))), - // Maps - () -> - assertThat( - row.getMap("map_date_to_text_column", java.time.LocalDate.class, String.class)) - .isEqualTo( - Map.of( - java.time.LocalDate.parse("2025-01-27"), "event1", - java.time.LocalDate.parse("2025-02-01"), "event2")), - () -> - assertThat(row.getMap("map_text_to_int_column", String.class, Integer.class)) - .isEqualTo(Map.of("key1", 10, "key2", 20)), - () -> - assertThat(row.getMap("map_text_to_list_column", String.class, List.class)) - .isEqualTo( - Map.of( - "color", - Arrays.asList("red", "green"), - "fruit", - Arrays.asList("apple", "banana"))), - () -> - assertThat(row.getMap("map_text_to_set_column", String.class, Set.class)) - .isEqualTo( - Map.of( - "fruit", - new HashSet<>(Arrays.asList("apple", "banana")), - "vegetables", - new HashSet<>(Arrays.asList("carrot", "spinach")))), + // Maps + () -> + assertThat( + row.getMap( + "map_date_to_text_column", java.time.LocalDate.class, String.class)) + .isEqualTo( + Map.of( + java.time.LocalDate.parse("2025-01-27"), "event1", + java.time.LocalDate.parse("2025-02-01"), "event2")), + () -> + assertThat(row.getMap("map_text_to_int_column", String.class, Integer.class)) + .isEqualTo(Map.of("key1", 10, "key2", 20)), + () -> + assertThat(row.getMap("map_text_to_list_column", String.class, List.class)) + .isEqualTo( + Map.of( + "color", + Arrays.asList("red", "green"), + "fruit", + Arrays.asList("apple", "banana"))), + () -> + assertThat(row.getMap("map_text_to_set_column", String.class, Set.class)) + .isEqualTo( + Map.of( + "fruit", + new HashSet<>(Arrays.asList("apple", "banana")), + "vegetables", + new HashSet<>(Arrays.asList("carrot", "spinach")))), - // Sets - () -> - assertThat(row.getSet("set_date_column", java.time.LocalDate.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - java.time.LocalDate.parse("2025-01-27"), - java.time.LocalDate.parse("2025-02-01")))), - () -> - assertThat(row.getSet("set_text_column", String.class)) - .isEqualTo(new HashSet<>(Arrays.asList("apple", "orange", "banana"))), - () -> - assertThat(row.getSet("set_of_maps_column", Map.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), + // Sets + () -> + assertThat(row.getSet("set_date_column", java.time.LocalDate.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + java.time.LocalDate.parse("2025-01-27"), + java.time.LocalDate.parse("2025-02-01")))), + () -> + assertThat(row.getSet("set_text_column", String.class)) + .isEqualTo(new HashSet<>(Arrays.asList("apple", "orange", "banana"))), + () -> + assertThat(row.getSet("set_of_maps_column", Map.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - // Other Basic Types - () -> assertThat(row.getShort("smallint_column")).isEqualTo((short) 32767), - () -> assertThat(row.getInt("mediumint_column")).isEqualTo(8388607), - () -> assertThat(row.getInt("int_column")).isEqualTo(2147483647), - () -> assertThat(row.getString("enum_column")).isEqualTo("OptionA"), - () -> assertThat(row.getString("year_column")).isEqualTo("2025"), - () -> - assertThat(row.getString("longtext_column")) - .isEqualTo( - "Very long text data that exceeds the medium text column length for long text."), - () -> assertThat(row.getString("tinytext_column")).isEqualTo("Short text for tinytext."), - () -> - assertThat(row.getString("mediumtext_column")) - .isEqualTo("Longer text data for mediumtext column."), - () -> - assertThat(row.getString("text_column")) - .isEqualTo("This is some sample text data for the text column."), - () -> - assertThat(row.getLocalTime("time_column")) - .isEqualTo(java.time.LocalTime.parse("12:30:00.000000000")), - () -> - assertThat(row.getInstant("timestamp_column")) - .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00Z")), - () -> - assertThat(row.getBigInteger("varint_column")) - .isEqualTo(java.math.BigInteger.valueOf(123456789L)), - () -> - assertThat(row.getBytesUnsafe("bytes_column")) - .isEqualTo(ByteBuffer.wrap(ByteArray.copyFrom("Hello world").toByteArray()))); + // Other Basic Types + () -> assertThat(row.getShort("smallint_column")).isEqualTo((short) 32767), + () -> assertThat(row.getInt("mediumint_column")).isEqualTo(8388607), + () -> assertThat(row.getInt("int_column")).isEqualTo(2147483647), + () -> assertThat(row.getString("enum_column")).isEqualTo("OptionA"), + () -> assertThat(row.getString("year_column")).isEqualTo("2025"), + () -> + assertThat(row.getString("longtext_column")) + .isEqualTo( + "Very long text data that exceeds the medium text column length for long text."), + () -> + assertThat(row.getString("tinytext_column")).isEqualTo("Short text for tinytext."), + () -> + assertThat(row.getString("mediumtext_column")) + .isEqualTo("Longer text data for mediumtext column."), + () -> + assertThat(row.getString("text_column")) + .isEqualTo("This is some sample text data for the text column."), + () -> + assertThat(row.getLocalTime("time_column")) + .isEqualTo(java.time.LocalTime.parse("12:30:00.000000000")), + () -> + assertThat(row.getInstant("timestamp_column")) + .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00Z")), + () -> + assertThat(row.getBigInteger("varint_column")) + .isEqualTo(java.math.BigInteger.valueOf(123456789L)), + () -> + assertThat(row.getBytesUnsafe("bytes_column")) + .isEqualTo(ByteBuffer.wrap(ByteArray.copyFrom("Hello world").toByteArray()))); + } else if (Objects.equals(varcharColumn, "SampleVarcharForNull")) { + assertAll( + () -> assertThat(row.isNull("tinyint_column")).isTrue(), + () -> assertThat(row.isNull("text_column")).isTrue(), + () -> assertThat(row.isNull("date_column")).isTrue(), + () -> assertThat(row.isNull("smallint_column")).isTrue(), + () -> assertThat(row.isNull("mediumint_column")).isTrue(), + () -> assertThat(row.isNull("int_column")).isTrue(), + () -> assertThat(row.isNull("bigint_column")).isTrue(), + () -> assertThat(row.isNull("float_column")).isTrue(), + () -> assertThat(row.isNull("double_column")).isTrue(), + () -> assertThat(row.isNull("decimal_column")).isTrue(), + () -> assertThat(row.isNull("datetime_column")).isTrue(), + () -> assertThat(row.isNull("timestamp_column")).isTrue(), + () -> assertThat(row.isNull("time_column")).isTrue(), + () -> assertThat(row.isNull("year_column")).isTrue(), + () -> assertThat(row.isNull("char_column")).isTrue(), + () -> assertThat(row.isNull("tinytext_column")).isTrue(), + () -> assertThat(row.isNull("mediumtext_column")).isTrue(), + () -> assertThat(row.isNull("longtext_column")).isTrue(), + () -> assertThat(row.isNull("enum_column")).isTrue(), + () -> assertThat(row.isNull("bool_column")).isTrue(), + () -> assertThat(row.isNull("other_bool_column")).isTrue(), + () -> assertThat(row.isNull("bytes_column")).isTrue(), + () -> assertThat(row.isNull("list_text_column")).isTrue(), + () -> assertThat(row.isNull("list_int_column")).isTrue(), + () -> assertThat(row.isNull("frozen_list_bigint_column")).isTrue(), + () -> assertThat(row.isNull("set_text_column")).isTrue(), + () -> assertThat(row.isNull("set_date_column")).isTrue(), + () -> assertThat(row.isNull("frozen_set_bool_column")).isTrue(), + () -> assertThat(row.isNull("map_text_to_int_column")).isTrue(), + () -> assertThat(row.isNull("map_date_to_text_column")).isTrue(), + () -> assertThat(row.isNull("frozen_map_int_to_bool_column")).isTrue(), + () -> assertThat(row.isNull("map_text_to_list_column")).isTrue(), + () -> assertThat(row.isNull("map_text_to_set_column")).isTrue(), + () -> assertThat(row.isNull("set_of_maps_column")).isTrue(), + () -> assertThat(row.isNull("list_of_sets_column")).isTrue(), + () -> assertThat(row.isNull("frozen_map_text_to_list_column")).isTrue(), + () -> assertThat(row.isNull("frozen_map_text_to_set_column")).isTrue(), + () -> assertThat(row.isNull("frozen_set_of_maps_column")).isTrue(), + () -> assertThat(row.isNull("frozen_list_of_sets_column")).isTrue(), + () -> assertThat(row.isNull("varint_column")).isTrue(), + () -> assertThat(row.isNull("inet_column")).isTrue()); + } else { + throw new AssertionError("Unexpected row found: " + varcharColumn); + } + } } /** diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java index 3d7da5a2bd..0ebeb3af34 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java @@ -334,12 +334,18 @@ private void writeRowInSpanner() { spannerResourceManager.write(m); } - private void assertRowInMySQL() { + private void assertRowInMySQL() throws InterruptedException { PipelineOperator.Result result = pipelineOperator() .waitForCondition( createConfig(jobInfo, Duration.ofMinutes(15)), () -> jdbcResourceManager.getRowCount(TABLE) == 1); + /* + * Added to handle updates. + * TODO(khajanchi@), explore if this sleep be replaced with something more definite. + */ + Thread.sleep(Duration.ofMinutes(1L).toMillis()); + assertThatResult(result).meetsConditions(); result = @@ -347,6 +353,11 @@ private void assertRowInMySQL() { .waitForCondition( createConfig(jobInfo, Duration.ofMinutes(15)), () -> jdbcResourceManager.getRowCount(TABLE2) == 2); + /* + * Added to handle updates. + * TODO(khajanchi@), explore if this sleep be replaced with something more definite. + */ + Thread.sleep(Duration.ofMinutes(1L).toMillis()); assertThatResult(result).meetsConditions(); List> rows = jdbcResourceManager.readTable(TABLE); diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql index 6e68b9af51..4e6af6f651 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/mysql-schema.sql @@ -1,3 +1,6 @@ +DROP TABLE IF EXISTS Users1; +DROP TABLE IF EXISTS AllDatatypeTransformation; + CREATE TABLE Users1 ( id INT NOT NULL, first_name VARCHAR(25), diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql index a5a1f125ff..684f128124 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql @@ -1,4 +1,7 @@ -CREATE TABLE IF NOT EXISTS Users1 ( +DROP TABLE IF EXISTS Users1; +DROP TABLE IF EXISTS AllDatatypeTransformation; + +CREATE TABLE Users1 ( id INT64 NOT NULL, name STRING(25), ) PRIMARY KEY(id);