From 7d06c3ee994853c6e4d58eb2a36a1fedcb5e5284 Mon Sep 17 00:00:00 2001 From: pawankashyapollion Date: Tue, 24 Dec 2024 14:27:35 +0530 Subject: [PATCH] Metadata config and pipeline options (#31) * Added Connection Helper * Added Formatter * Added File Reader * Refectored * Added Changes for Schema and Spanner Schema * Added Schema Changes to read Spanner Table in Schema object * Added Schema Changes * Added Changes for Schma * Added Source Writn Fn Changes * Added Source Factory Changes * Added Fixed for the Source factory and Casssandra Connection helper * Added Cassandra Schema Reader * Added Pipeline Process * Removed Unwanted Validation * Added Access validator * removed unwanted Return * Added Thread safe optimization in cassandra Connection helper * Applied spotless:apply * spotless:apply * Added Constructor for Test case * Added DUMMY Generator For UT * Fixed UT for Metadata config (#30) Co-authored-by: Narendra Rajput --------- Co-authored-by: Narendra Rajput --- v2/spanner-common/pom.xml | 5 + .../migrations/constants/Constants.java | 3 + .../metadata/CassandraSourceMetadata.java | 186 +++++++++++++++++ .../v2/spanner/migrations/schema/Schema.java | 12 +- .../schema/cassandra/SourceColumn.java | 70 +++++++ .../schema/cassandra/SourceSchema.java | 44 ++++ .../schema/cassandra/SourceTable.java | 53 +++++ .../migrations/shard/CassandraShard.java | 173 ++++++++++++++++ .../migrations/spanner/SpannerSchema.java | 72 +++++++ .../utils/CassandraConfigFileReader.java | 76 +++++++ .../v2/templates/SpannerToSourceDb.java | 71 +++++-- .../v2/templates/constants/Constants.java | 2 + .../connection/CassandraConnectionHelper.java | 192 ++++++++++++++++++ .../dbutils/dao/source/CassandraDao.java | 20 ++ .../processor/InputRecordProcessor.java | 3 +- .../processor/SourceProcessorFactory.java | 75 +++++-- .../templates/transforms/SourceWriterFn.java | 16 +- .../CassandraConnectionHelperTest.java | 183 +++++++++++++++++ .../processor/SourceProcessorFactoryTest.java | 36 ++++ 19 files changed, 1261 insertions(+), 31 deletions(-) create mode 100644 v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/metadata/CassandraSourceMetadata.java create mode 100644 v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceColumn.java create mode 100644 v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceSchema.java create mode 100644 v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceTable.java create mode 100644 v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/CassandraShard.java create mode 100644 v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/utils/CassandraConfigFileReader.java create mode 100644 v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java create mode 100644 v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java diff --git a/v2/spanner-common/pom.xml b/v2/spanner-common/pom.xml index b46b606e13..78327e7067 100644 --- a/v2/spanner-common/pom.xml +++ b/v2/spanner-common/pom.xml @@ -44,6 +44,11 @@ 1.0-SNAPSHOT compile + + com.datastax.oss + java-driver-core + 4.17.0 + diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/constants/Constants.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/constants/Constants.java index 2c212a5789..afd7e534a6 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/constants/Constants.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/constants/Constants.java @@ -21,6 +21,9 @@ public class Constants { /* The source type value for MySql databases */ public static final String MYSQL_SOURCE_TYPE = "mysql"; + /* The source type value for CASSANDRA databases */ + public static final String CASSANDRA_SOURCE_TYPE = "cassandra"; + /* The value for Oracle databases in the source type key */ public static final String ORACLE_SOURCE_TYPE = "oracle"; diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/metadata/CassandraSourceMetadata.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/metadata/CassandraSourceMetadata.java new file mode 100644 index 0000000000..9634e5b082 --- /dev/null +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/metadata/CassandraSourceMetadata.java @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.spanner.migrations.metadata; + +import autovalue.shaded.com.google.common.collect.ImmutableList; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.google.cloud.teleport.v2.spanner.migrations.schema.*; +import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceColumn; +import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceSchema; +import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceTable; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CassandraSourceMetadata { + + private static final String SCHEMA_NAME = "cassandra"; // Constant for schema name + + /** + * Generates a SourceSchema from a ResultSet. + * + * @param resultSet The ResultSet containing schema information. + * @return A SourceSchema instance. + */ + private static SourceSchema generateSourceSchema(ResultSet resultSet) { + Map> schema = new HashMap<>(); + + resultSet.forEach( + row -> { + String tableName = row.getString("table_name"); + String columnName = row.getString("column_name"); + String dataType = row.getString("type"); + String kind = row.getString("kind"); + + boolean isPrimaryKey = isPrimaryKey(kind); + SourceColumn sourceColumn = SourceColumn.create(columnName, kind, dataType, isPrimaryKey); + + schema.computeIfAbsent(tableName, k -> new HashMap<>()).put(columnName, sourceColumn); + }); + + Map tables = + schema.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + SourceTable.create( + entry.getKey(), ImmutableList.copyOf(entry.getValue().values())))); + + return SourceSchema.create(Map.copyOf(tables)); + } + + /** + * Converts a ResultSet to a Schema object, updating the provided schema. + * + * @param schema The schema to update. + * @param resultSet The ResultSet containing schema information. + */ + public static void generateSourceSchema(Schema schema, ResultSet resultSet) { + SourceSchema sourceSchema = generateSourceSchema(resultSet); + Map sourceTableMap = + convertSourceSchemaToMap(sourceSchema); + schema.setSrcSchema(sourceTableMap); + schema.setToSource(convertSourceToNameAndColsTable(sourceSchema.tables().values())); + } + + /** + * Converts a SourceSchema to a map of Spanner table names to SourceTable objects. + * + * @param sourceSchema The SourceSchema to convert. + * @return A map where the key is the table name and the value is the corresponding SourceTable. + */ + private static Map + convertSourceSchemaToMap(SourceSchema sourceSchema) { + return sourceSchema.tables().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> convertSourceTableToSourceTable(entry.getValue()))); + } + + /** + * Converts a SourceTable to a SourceTable for Spanner. + * + * @param sourceTable The SourceTable to convert. + * @return A converted SourceTable object. + */ + private static com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable + convertSourceTableToSourceTable(SourceTable sourceTable) { + List columns = sourceTable.columns(); + + String[] colIds = columns.stream().map(SourceColumn::name).toArray(String[]::new); + + Map colDefs = + columns.stream() + .collect( + Collectors.toMap( + SourceColumn::name, + col -> + new SourceColumnDefinition( + col.name(), + new SourceColumnType(col.sourceType(), new Long[0], new Long[0])))); + + ColumnPK[] primaryKeys = + columns.stream() + .filter(SourceColumn::isPrimaryKey) + .map(col -> new ColumnPK(col.name(), getPrimaryKeyOrder(col))) + .toArray(ColumnPK[]::new); + + return new com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable( + sourceTable.name(), + SCHEMA_NAME, // Use constant schema name + colIds, + colDefs, + primaryKeys); + } + + /** + * Determines if a column is a primary key. + * + * @param kind The column kind. + * @return true if the column is a primary key, false otherwise. + */ + private static boolean isPrimaryKey(String kind) { + return "partition_key".equals(kind) || "clustering".equals(kind); + } + + /** + * Gets the order of the primary key column. + * + * @param col The SourceColumn. + * @return The order of the primary key. + */ + private static int getPrimaryKeyOrder(SourceColumn col) { + switch (col.kind()) { + case "partition_key": + return 1; + case "clustering": + return 2; + default: + return 0; + } + } + + /** + * Converts a collection of SourceTables to a map of table names to NameAndCols. + * + * @param tables A collection of SourceTables. + * @return A map where the key is the table name and the value is a NameAndCols object. + */ + private static Map convertSourceToNameAndColsTable( + Collection tables) { + return tables.stream() + .collect( + Collectors.toMap( + SourceTable::name, CassandraSourceMetadata::convertSourceTableToNameAndCols)); + } + + /** + * Converts a SourceTable to a NameAndCols object. + * + * @param sourceTable The SourceTable to convert. + * @return A NameAndCols object representing the table and column names. + */ + private static NameAndCols convertSourceTableToNameAndCols(SourceTable sourceTable) { + Map columnNames = + sourceTable.columns().stream() + .collect(Collectors.toMap(SourceColumn::name, SourceColumn::name)); + + return new NameAndCols(sourceTable.name(), columnNames); + } +} diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java index 72da70d584..46e7dafe02 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java @@ -30,13 +30,13 @@ */ public class Schema implements Serializable { /** Maps the HarbourBridge table ID to the Spanner table details. */ - private final Map spSchema; + private Map spSchema; /** Maps the Spanner table ID to the synthetic PK. */ private final Map syntheticPKeys; /** Maps the HarbourBridge table ID to the Source table details. */ - private final Map srcSchema; + private Map srcSchema; // The columns below are not part of the session file. They are computed based on the fields // above. @@ -80,6 +80,10 @@ public Map getSpSchema() { return spSchema; } + public Map setSpSchema(Map spSchema) { + return this.spSchema = spSchema; + } + public Map getSyntheticPks() { return syntheticPKeys; } @@ -88,6 +92,10 @@ public Map getSrcSchema() { return srcSchema; } + public Map setSrcSchema(Map srcSchema) { + return this.srcSchema = srcSchema; + } + public Map getToSpanner() { return toSpanner; } diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceColumn.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceColumn.java new file mode 100644 index 0000000000..6a2c019cec --- /dev/null +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceColumn.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; + +/** + * Represents a source column schema with type, primary key information, and column kind. This class + * provides immutable representation of a source column with necessary metadata. + */ +@AutoValue +public abstract class SourceColumn implements Serializable { + + /** + * Creates a new instance of SourceColumn. + * + * @param name the name of the column, should not be null or empty. + * @param kind the kind/type of the column (e.g., partition key, clustering key, regular). + * @param sourceType the source type of the column (e.g., String, Int). + * @param isPrimaryKey whether the column is a primary key. + * @return a new immutable SourceColumn instance. + */ + public static SourceColumn create( + String name, String kind, String sourceType, boolean isPrimaryKey) { + + return new AutoValue_SourceColumn(name, kind, sourceType, isPrimaryKey); + } + + /** + * Gets the name of the column. + * + * @return the name of the column. + */ + public abstract String name(); + + /** + * Gets the kind/type of the column. + * + * @return the kind of the column. + */ + public abstract String kind(); + + /** + * Gets the source type of the column. + * + * @return the source type of the column. + */ + public abstract String sourceType(); + + /** + * Checks if the column is a primary key. + * + * @return {@code true} if the column is a primary key, otherwise {@code false}. + */ + public abstract boolean isPrimaryKey(); +} diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceSchema.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceSchema.java new file mode 100644 index 0000000000..5314ed9d45 --- /dev/null +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceSchema.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Map; + +/** Represents an immutable source schema. */ +@AutoValue +public abstract class SourceSchema implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Factory method to create a new instance of SourceSchema. + * + * @param tables the map of source tables to initialize. + * @return a new immutable SourceSchema instance. + */ + public static SourceSchema create(Map tables) { + return new AutoValue_SourceSchema(Map.copyOf(tables)); + } + + /** + * Gets the source schema as an unmodifiable map. + * + * @return the unmodifiable source schema map. + */ + public abstract Map tables(); +} diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceTable.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceTable.java new file mode 100644 index 0000000000..bef500b084 --- /dev/null +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/cassandra/SourceTable.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.List; + +/** Represents a source table schema with its name and associated columns. */ +@AutoValue +public abstract class SourceTable implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of SourceTable. + * + * @param name the name of the source table. + * @param columns the list of columns in the source table. + * @return a new immutable SourceTable instance. + */ + public static SourceTable create(String name, List columns) { + return new AutoValue_SourceTable(name, ImmutableList.copyOf(columns)); + } + + /** + * Gets the name of the source table. + * + * @return the source table name. + */ + public abstract String name(); + + /** + * Gets the list of columns in the source table. + * + * @return an immutable list of SourceColumn objects. + */ + public abstract ImmutableList columns(); +} diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/CassandraShard.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/CassandraShard.java new file mode 100644 index 0000000000..0ad6b09c7c --- /dev/null +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/CassandraShard.java @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.spanner.migrations.shard; + +import java.util.Objects; + +public class CassandraShard extends Shard { + private String keyspace; + private String consistencyLevel = "LOCAL_QUORUM"; + private boolean sslOptions = false; + private String protocolVersion = "v5"; + private String dataCenter = "datacenter1"; + private int localPoolSize = 1024; + private int remotePoolSize = 256; + + public CassandraShard( + String logicalShardId, + String host, + String port, + String user, + String password, + String keyspace, + String consistencyLevel, + Boolean sslOptions, + String protocolVersion, + String dataCenter, + Integer localPoolSize, + Integer remotePoolSize) { + super(logicalShardId, host, port, user, password, null, null, null, null); + this.keyspace = keyspace; + this.consistencyLevel = consistencyLevel; + this.sslOptions = sslOptions; + this.protocolVersion = protocolVersion; + this.dataCenter = dataCenter; + this.localPoolSize = localPoolSize; + this.remotePoolSize = remotePoolSize; + } + + // Getters + public String getKeySpaceName() { + return keyspace; + } + + public String getConsistencyLevel() { + return consistencyLevel; + } + + public boolean getSSLOptions() { + return sslOptions; + } + + public String getProtocolVersion() { + return protocolVersion; + } + + public String getDataCenter() { + return dataCenter; + } + + public int getLocalPoolSize() { + return localPoolSize; + } + + public int getRemotePoolSize() { + return remotePoolSize; + } + + // Setters + public void setKeySpaceName(String keySpaceName) { + this.keyspace = keySpaceName; + } + + public void setConsistencyLevel(String consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + public void setSslOptions(boolean sslOptions) { + this.sslOptions = sslOptions; + } + + public void setProtocolVersion(String protocolVersion) { + this.protocolVersion = protocolVersion; + } + + public void setDataCenter(String dataCenter) { + this.dataCenter = dataCenter; + } + + public void setLocalPoolSize(int localPoolSize) { + this.localPoolSize = localPoolSize; + } + + public void setRemotePoolSize(int remotePoolSize) { + this.remotePoolSize = remotePoolSize; + } + + public void validate() { + validateField(getHost(), "Host"); + validateField(getPort(), "Port"); + validateField(getUserName(), "Username"); + validateField(getPassword(), "Password"); + validateField(getKeySpaceName(), "Keyspace"); + } + + private void validateField(String value, String fieldName) { + if (value == null || value.isEmpty()) { + throw new IllegalArgumentException(fieldName + " is required"); + } + } + + @Override + public String toString() { + return String.format( + "CassandraShard{logicalShardId='%s', host='%s', port='%s', user='%s', keySpaceName='%s', datacenter='%s', consistencyLevel='%s', protocolVersion='%s'}", + getLogicalShardId(), + getHost(), + getPort(), + getUserName(), + getKeySpaceName(), + getDataCenter(), + getConsistencyLevel(), + getProtocolVersion()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CassandraShard)) return false; + CassandraShard that = (CassandraShard) o; + return sslOptions == that.sslOptions + && localPoolSize == that.localPoolSize + && remotePoolSize == that.remotePoolSize + && Objects.equals(getLogicalShardId(), that.getLogicalShardId()) + && Objects.equals(getHost(), that.getHost()) + && Objects.equals(getPort(), that.getPort()) + && Objects.equals(getUserName(), that.getUserName()) + && Objects.equals(getPassword(), that.getPassword()) + && Objects.equals(keyspace, that.keyspace) + && Objects.equals(dataCenter, that.dataCenter) + && Objects.equals(consistencyLevel, that.consistencyLevel) + && Objects.equals(protocolVersion, that.protocolVersion); + } + + @Override + public int hashCode() { + return Objects.hash( + getLogicalShardId(), + getHost(), + getPort(), + getUserName(), + getPassword(), + keyspace, + dataCenter, + consistencyLevel, + protocolVersion, + sslOptions, + localPoolSize, + remotePoolSize); + } +} diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/spanner/SpannerSchema.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/spanner/SpannerSchema.java index 0b2357b34f..3e15b14a37 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/spanner/SpannerSchema.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/spanner/SpannerSchema.java @@ -20,8 +20,19 @@ import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.teleport.v2.spanner.ddl.Column; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; import com.google.cloud.teleport.v2.spanner.ddl.InformationSchemaScanner; +import com.google.cloud.teleport.v2.spanner.ddl.Table; +import com.google.cloud.teleport.v2.spanner.migrations.schema.ColumnPK; +import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols; +import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerColumnDefinition; +import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerColumnType; +import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerTable; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -43,4 +54,65 @@ public static Ddl getInformationSchemaAsDdl(SpannerConfig spannerConfig) { spannerAccessor.close(); return ddl; } + + public static Map convertDDLTableToSpannerTable(Collection tables) { + return tables.stream() + .collect( + Collectors.toMap( + Table::name, // Use the table name as the key + SpannerSchema::convertTableToSpannerTable // Convert Table to SpannerTable + )); + } + + public static Map convertDDLTableToSpannerNameAndColsTable( + Collection
tables) { + return tables.stream() + .collect( + Collectors.toMap( + Table::name, // Use the table name as the key + SpannerSchema + ::convertTableToSpannerTableNameAndCols // Convert Table to SpannerTable + )); + } + + private static NameAndCols convertTableToSpannerTableNameAndCols(Table table) { + return new NameAndCols( + table.name(), + table.columns().stream() + .collect( + Collectors.toMap( + Column::name, // Use column IDs as keys + Column::name))); + } + + private static SpannerTable convertTableToSpannerTable(Table table) { + String name = table.name(); // Table name + // Extract column IDs + String[] colIds = + table.columns().stream() + .map(Column::name) // Assuming Column name as ID + .toArray(String[]::new); + + // Build column definitions + Map colDefs = + table.columns().stream() + .collect( + Collectors.toMap( + Column::name, // Use column IDs as keys + column -> + new SpannerColumnDefinition( + column.name(), + new SpannerColumnType( + column.typeString(), // Type Code name (e.g., STRING, INT64) + false)))); + + // Extract primary keys + AtomicInteger orderCounter = new AtomicInteger(1); + ColumnPK[] primaryKeys = + table.primaryKeys().stream() + .map(pk -> new ColumnPK(pk.name(), orderCounter.getAndIncrement())) + .toArray(ColumnPK[]::new); + + return new SpannerTable(name, colIds, colDefs, primaryKeys, table.name()); + } } diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/utils/CassandraConfigFileReader.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/utils/CassandraConfigFileReader.java new file mode 100644 index 0000000000..0e05cc7a57 --- /dev/null +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/utils/CassandraConfigFileReader.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.spanner.migrations.utils; + +import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; +import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for reading and parsing a Cassandra configuration file from GCS into a + * CassandraShard object. + */ +public class CassandraConfigFileReader { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraConfigFileReader.class); + private static final Gson GSON = + new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.IDENTITY).create(); + + /** + * Reads the Cassandra configuration file from the specified GCS path and converts it into a list + * of CassandraShard objects. + * + * @param cassandraConfigFilePath the GCS path of the Cassandra configuration file. + * @return a list containing the parsed CassandraShard. + */ + public List getCassandraShard(String cassandraConfigFilePath) { + try (InputStream stream = getFileInputStream(cassandraConfigFilePath)) { + String configContent = IOUtils.toString(stream, StandardCharsets.UTF_8); + CassandraShard shard = GSON.fromJson(configContent, CassandraShard.class); + + LOG.info("Successfully read Cassandra config: {}", shard); + return Collections.singletonList(shard); + } catch (IOException e) { + String errorMessage = + "Failed to read Cassandra config file. Ensure it is ASCII or UTF-8 encoded and contains a well-formed JSON string."; + LOG.error(errorMessage, e); + throw new RuntimeException(errorMessage, e); + } + } + + /** + * Retrieves an InputStream for the specified GCS file path. + * + * @param filePath the GCS file path. + * @return an InputStream for the file. + * @throws IOException if the file cannot be accessed or opened. + */ + private InputStream getFileInputStream(String filePath) throws IOException { + return Channels.newInputStream(FileSystems.open(FileSystems.matchNewResource(filePath, false))); + } +} diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java index d33ec69b4e..f17367394d 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java @@ -15,6 +15,9 @@ */ package com.google.cloud.teleport.v2.templates; +import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE; +import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE; + import com.google.cloud.Timestamp; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; @@ -30,6 +33,7 @@ import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema; import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; +import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader; import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl; import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader; import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader; @@ -415,6 +419,15 @@ public interface Options extends PipelineOptions, StreamingOptions { String getFilterEventsDirectoryName(); void setFilterEventsDirectoryName(String value); + + @TemplateParameter.GcsReadFile( + order = 10, + optional = false, + description = "Path to GCS file containing the the Cassandra Config details", + helpText = "Path to GCS file containing connection profile info for cassandra.") + String getCassandraConfigFilePath(); + + void setCassandraConfigFilePath(String value); } /** @@ -472,8 +485,12 @@ public static PipelineResult run(Options options) { + " incease the max shard connections"); } - // Read the session file - Schema schema = SessionFileReader.read(options.getSessionFilePath()); + // Read the session file for Mysql Only + Schema schema = + MYSQL_SOURCE_TYPE.equals(options.getSourceType()) + ? SessionFileReader.read( + options.getSessionFilePath()) // Read from session file for MYSQL source type + : new Schema(); // Prepare Spanner config SpannerConfig spannerConfig = @@ -511,17 +528,41 @@ public static PipelineResult run(Options options) { shadowTableCreator.createShadowTablesInSpanner(); Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig); - ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl()); - List shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath()); - String shardingMode = Constants.SHARDING_MODE_MULTI_SHARD; - if (shards.size() == 1) { - shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD; - - Shard singleShard = shards.get(0); - if (singleShard.getLogicalShardId() == null) { - singleShard.setLogicalShardId(Constants.DEFAULT_SHARD_ID); - LOG.info( - "Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID); + if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) { + schema.setSpSchema(SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables())); + schema.setToSpanner(SpannerSchema.convertDDLTableToSpannerNameAndColsTable(ddl.allTables())); + } + List shards = new ArrayList<>(); + String shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD; + if ("mysql".equals(options.getSourceType())) { + ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl()); + shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath()); + shardingMode = Constants.SHARDING_MODE_MULTI_SHARD; + if (shards.size() == 1) { + shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD; + Shard singleMySqlShard = shards.get(0); + if (singleMySqlShard.getLogicalShardId() == null) { + singleMySqlShard.setLogicalShardId(Constants.DEFAULT_SHARD_ID); + LOG.info( + "Logical shard id was not found, hence setting it to : " + + Constants.DEFAULT_SHARD_ID); + } + } + } else { + CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader(); + shards = cassandraConfigFileReader.getCassandraShard(options.getCassandraConfigFilePath()); + LOG.info("Cassandra config is: {}", shards.get(0)); + if (shards.size() == 1) { + shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD; + Shard singleCassandraShard = shards.get(0); + if (singleCassandraShard.getLogicalShardId() == null) { + singleCassandraShard.setLogicalShardId(Constants.DEFAULT_SHARD_ID); + LOG.info( + "Logical shard id was not found, hence setting it to : " + + Constants.DEFAULT_SHARD_ID); + } + } else { + throw new IllegalArgumentException("We have no options of shards in cassandra"); } } boolean isRegularMode = "regular".equals(options.getRunMode()); @@ -614,7 +655,9 @@ public static PipelineResult run(Options options) { options.getShardingCustomClassName(), options.getShardingCustomParameters(), options.getMaxShardConnections() - * shards.size()))) // currently assuming that all shards accept the same + * shards.size()))) // currently assuming that all mySqlShards + // accept the same// currently assuming + // that all shards accept the same // number of max connections .setCoder( KvCoder.of( diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/constants/Constants.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/constants/Constants.java index 1368a46fe3..476d199d46 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/constants/Constants.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/constants/Constants.java @@ -76,6 +76,8 @@ public class Constants { public static final String SOURCE_MYSQL = "mysql"; + public static final String SOURCE_CASSANDRA = "cassandra"; + // Message written to the file for filtered records public static final String FILTERED_TAG_MESSAGE = "Filtered record from custom transformation in reverse replication"; diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java new file mode 100644 index 0000000000..d7fe8eb556 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java @@ -0,0 +1,192 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.dbutils.connection; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; +import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; +import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; +import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException; +import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code CassandraConnectionHelper} class provides methods to manage and maintain connections + * to a Cassandra database in a multi-shard environment. It implements the {@link IConnectionHelper} + * interface for {@link CqlSession}. + * + *

This class initializes and maintains a connection pool for multiple Cassandra shards and + * provides utilities to retrieve connections based on a unique key. + * + *

Typical usage: + * + *

+ *   CassandraConnectionHelper helper = new CassandraConnectionHelper();
+ *   helper.init(connectionHelperRequest);
+ *   CqlSession session = helper.getConnection(connectionKey);
+ * 
+ */ +public class CassandraConnectionHelper implements IConnectionHelper { + + /** Logger for logging information and errors. */ + private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectionHelper.class); + + /** A thread-safe connection pool storing {@link CqlSession} instances mapped to unique keys. */ + private static Map connectionPoolMap = new ConcurrentHashMap<>(); + + /** + * Initializes the connection pool with connections for the provided Cassandra shards. + * + * @param connectionHelperRequest The request object containing shard details and connection + * settings. + * @throws IllegalArgumentException if any shard validation fails or invalid shard types are + * provided. + */ + @Override + public synchronized void init(ConnectionHelperRequest connectionHelperRequest) { + if (connectionPoolMap != null && !connectionPoolMap.isEmpty()) { + LOG.info("Connection pool is already initialized."); + return; + } + + LOG.info( + "Initializing Cassandra connection pool with size: {}", + connectionHelperRequest.getMaxConnections()); + + List shards = connectionHelperRequest.getShards(); + for (Shard shard : shards) { + if (!(shard instanceof CassandraShard)) { + LOG.error("Invalid shard type: {}", shard.getClass().getSimpleName()); + throw new IllegalArgumentException("Invalid shard object"); + } + + CassandraShard cassandraShard = (CassandraShard) shard; + try { + cassandraShard.validate(); + CqlSession session = createCqlSession(cassandraShard); + String connectionKey = generateConnectionKey(cassandraShard); + connectionPoolMap.put(connectionKey, session); + LOG.info("Connection initialized for key: {}", connectionKey); + } catch (Exception e) { + LOG.error("Failed to initialize connection for shard: {}", cassandraShard, e); + } + } + } + + /** + * Retrieves a {@link CqlSession} connection from the connection pool. + * + * @param connectionRequestKey The unique key identifying the connection in the pool. + * @return The {@link CqlSession} instance associated with the given key. + * @throws ConnectionException If the connection pool is not initialized or no connection is found + * for the key. + */ + @Override + public CqlSession getConnection(String connectionRequestKey) throws ConnectionException { + if (connectionPoolMap == null || connectionPoolMap.isEmpty()) { + LOG.warn("Connection pool not initialized."); + throw new ConnectionException("Connection pool is not initialized."); + } + + CqlSession session = connectionPoolMap.get(connectionRequestKey); + if (session == null) { + LOG.warn("No connection found for key: {}", connectionRequestKey); + throw new ConnectionException( + "No connection available for the given key: " + connectionRequestKey); + } + + return session; + } + + /** + * Checks if the connection pool is initialized and contains connections. + * + * @return {@code true} if the connection pool is initialized and not empty; {@code false} + * otherwise. + */ + @Override + public boolean isConnectionPoolInitialized() { + return connectionPoolMap != null && !connectionPoolMap.isEmpty(); + } + + /** + * Creates a {@link CqlSession} for the given {@link CassandraShard}. + * + * @param cassandraShard The shard containing connection details. + * @return A {@link CqlSession} instance. + */ + private CqlSession createCqlSession(CassandraShard cassandraShard) { + CqlSessionBuilder builder = + CqlSession.builder() + .addContactPoint( + new InetSocketAddress( + cassandraShard.getHost(), Integer.parseInt(cassandraShard.getPort()))) + .withAuthCredentials(cassandraShard.getUserName(), cassandraShard.getPassword()) + .withKeyspace(cassandraShard.getKeySpaceName()); + + DriverConfigLoader configLoader = createConfigLoader(cassandraShard); + builder.withConfigLoader(configLoader); + + return builder.build(); + } + + /** + * Generates a unique connection key for the given {@link CassandraShard}. + * + * @param shard The shard containing connection details. + * @return A string key uniquely identifying the connection. + */ + private String generateConnectionKey(CassandraShard shard) { + return String.format( + "%s:%s/%s/%s", + shard.getHost(), shard.getPort(), shard.getUserName(), shard.getKeySpaceName()); + } + + /** + * Creates a driver configuration loader for the given {@link CassandraShard}. + * + * @param cassandraShard The shard containing configuration details. + * @return A {@link DriverConfigLoader} instance. + */ + private DriverConfigLoader createConfigLoader(CassandraShard cassandraShard) { + ProgrammaticDriverConfigLoaderBuilder configLoaderBuilder = + DriverConfigLoader.programmaticBuilder(); + + configLoaderBuilder + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, cassandraShard.getLocalPoolSize()) + .withInt( + DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, cassandraShard.getRemotePoolSize()); + + return configLoaderBuilder.build(); + } + + /** + * Sets the connection pool for testing purposes. + * + * @param inputMap A map containing pre-configured connections for testing. + */ + public void setConnectionPoolMap(Map inputMap) { + connectionPoolMap = inputMap; + } +} diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java index 5960a413c2..d9a1739ae2 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java @@ -18,6 +18,7 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper; import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException; import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse; @@ -56,4 +57,23 @@ public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception { session.execute(boundStatement); } } + + public ResultSet readMetadata(String keyspace) throws Exception { + if (keyspace == null || keyspace.isEmpty()) { + throw new IllegalArgumentException("Keyspace name cannot be null or empty."); + } + + String query = + "SELECT table_name, column_name, type, kind FROM system_schema.columns WHERE keyspace_name = ?"; + + try (CqlSession session = (CqlSession) connectionHelper.getConnection(this.cassandraUrl)) { + if (session == null) { + throw new ConnectionException("Failed to establish a connection."); + } + + PreparedStatement preparedStatement = session.prepare(query); + BoundStatement boundStatement = preparedStatement.bind(keyspace); + return session.execute(boundStatement); + } + } } diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/InputRecordProcessor.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/InputRecordProcessor.java index 9bdfe2bcda..ba2d6b24b0 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/InputRecordProcessor.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/InputRecordProcessor.java @@ -53,7 +53,8 @@ public static boolean processRecord( String shardId, String sourceDbTimezoneOffset, IDMLGenerator dmlGenerator, - ISpannerMigrationTransformer spannerToSourceTransformer) + ISpannerMigrationTransformer spannerToSourceTransformer, + String source) throws Exception { try { diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java index bde4b10fe7..651a8427cc 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java @@ -15,16 +15,22 @@ */ package com.google.cloud.teleport.v2.templates.dbutils.processor; +import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; import com.google.cloud.teleport.v2.templates.constants.Constants; +import com.google.cloud.teleport.v2.templates.dbutils.connection.CassandraConnectionHelper; import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper; import com.google.cloud.teleport.v2.templates.dbutils.connection.JdbcConnectionHelper; +import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao; import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao; import com.google.cloud.teleport.v2.templates.dbutils.dao.source.JdbcDao; +// import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraDMLGenerator; import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator; import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator; import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException; import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest; +import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest; +import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,20 +39,53 @@ import java.util.function.Function; public class SourceProcessorFactory { - private static Map dmlGeneratorMap = - Map.of(Constants.SOURCE_MYSQL, new MySQLDMLGenerator()); + private static Map dmlGeneratorMap = new HashMap<>(); - private static Map connectionHelperMap = - Map.of(Constants.SOURCE_MYSQL, new JdbcConnectionHelper()); + private static Map connectionHelperMap = new HashMap<>(); - private static Map driverMap = - Map.of(Constants.SOURCE_MYSQL, "com.mysql.cj.jdbc.Driver"); - - private static Map> connectionUrl = + private static final Map driverMap = Map.of( Constants.SOURCE_MYSQL, - shard -> - "jdbc:mysql://" + shard.getHost() + ":" + shard.getPort() + "/" + shard.getDbName()); + "com.mysql.cj.jdbc.Driver", // MySQL JDBC Driver + Constants.SOURCE_CASSANDRA, + "com.datastax.oss.driver.api.core.CqlSession" // Cassandra Session Class + ); + + private static Map> connectionUrl = new HashMap<>(); + + static { + dmlGeneratorMap.put(Constants.SOURCE_MYSQL, new MySQLDMLGenerator()); + dmlGeneratorMap.put( + Constants.SOURCE_CASSANDRA, + new IDMLGenerator() { + // TODO It will get removed in DML PR added Now for Test case eg: new + // CassandraDMLGenerator() + @Override + public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) { + return new DMLGeneratorResponse(""); + } + }); + + connectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper()); + connectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper()); + + connectionUrl.put( + Constants.SOURCE_MYSQL, + shard -> + "jdbc:mysql://" + shard.getHost() + ":" + shard.getPort() + "/" + shard.getDbName()); + connectionUrl.put( + Constants.SOURCE_CASSANDRA, + shard -> { + CassandraShard cassandraShard = (CassandraShard) shard; + return cassandraShard.getHost() + + ":" + + cassandraShard.getPort() + + "/" + + cassandraShard.getUserName() + + "/" + + cassandraShard.getKeySpaceName(); + }); + } private static Map, Integer, ConnectionHelperRequest>> connectionHelperRequestFactory = @@ -58,7 +97,16 @@ public class SourceProcessorFactory { null, maxConnections, driverMap.get(Constants.SOURCE_MYSQL), - "SET SESSION net_read_timeout=1200" // to avoid timeouts at network layer + "SET SESSION net_read_timeout=1200" // To avoid timeouts at the network layer + ), + Constants.SOURCE_CASSANDRA, + (shards, maxConnections) -> + new ConnectionHelperRequest( + shards, + null, + maxConnections, + driverMap.get(Constants.SOURCE_CASSANDRA), + null // No specific initialization query for Cassandra )); // for unit testing purposes @@ -132,7 +180,10 @@ private static Map createSourceDaoMap(String source, List s Map sourceDaoMap = new HashMap<>(); for (Shard shard : shards) { String connectionUrl = urlGenerator.apply(shard); - IDao sqlDao = new JdbcDao(connectionUrl, shard.getUserName(), getConnectionHelper(source)); + IDao sqlDao = + source.equals(Constants.SOURCE_MYSQL) + ? new JdbcDao(connectionUrl, shard.getUserName(), getConnectionHelper(source)) + : new CassandraDao(connectionUrl, shard.getUserName(), getConnectionHelper(source)); sourceDaoMap.put(shard.getLogicalShardId(), sqlDao); } return sourceDaoMap; diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java index 6b00511bf8..e521f70589 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFn.java @@ -26,7 +26,9 @@ import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; +import com.google.cloud.teleport.v2.spanner.migrations.metadata.CassandraSourceMetadata; import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema; +import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; import com.google.cloud.teleport.v2.spanner.migrations.utils.CustomTransformationImplFetcher; @@ -34,6 +36,7 @@ import com.google.cloud.teleport.v2.templates.changestream.ChangeStreamErrorRecord; import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord; import com.google.cloud.teleport.v2.templates.constants.Constants; +import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao; import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao; import com.google.cloud.teleport.v2.templates.dbutils.dao.spanner.SpannerDao; import com.google.cloud.teleport.v2.templates.dbutils.processor.InputRecordProcessor; @@ -47,6 +50,7 @@ import java.io.Serializable; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -202,7 +206,14 @@ public void processElement(ProcessContext c) { if (!isSourceAhead) { IDao sourceDao = sourceProcessor.getSourceDao(shardId); - + if (Objects.equals(this.source, Constants.SOURCE_CASSANDRA)) { + if (schema.getSrcSchema().isEmpty()) { + CassandraSourceMetadata.generateSourceSchema( + schema, + ((CassandraDao) sourceDao) + .readMetadata(((CassandraShard) shards.get(0)).getKeySpaceName())); + } + } boolean isEventFiltered = InputRecordProcessor.processRecord( spannerRec, @@ -211,7 +222,8 @@ public void processElement(ProcessContext c) { shardId, sourceDbTimezoneOffset, sourceProcessor.getDmlGenerator(), - spannerToSourceTransformer); + spannerToSourceTransformer, + this.source); if (isEventFiltered) { outputWithTag(c, Constants.FILTERED_TAG, Constants.FILTERED_TAG_MESSAGE, spannerRec); } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java new file mode 100644 index 0000000000..5a0ce23037 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java @@ -0,0 +1,183 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.dbutils.connection; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; +import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; +import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException; +import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest; +import java.util.Arrays; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +class CassandraConnectionHelperTest { + + @Mock private CassandraShard cassandraShard; + @Mock private CassandraConnectionHelper connectionHelper; + + @BeforeEach + void setUp() { + connectionHelper = new CassandraConnectionHelper(); + cassandraShard = mock(CassandraShard.class); + } + + @Test + void testInit_ShouldInitializeConnectionPool() { + when(cassandraShard.getHost()).thenReturn("localhost"); + when(cassandraShard.getPort()).thenReturn("9042"); + when(cassandraShard.getUserName()).thenReturn("user"); + when(cassandraShard.getPassword()).thenReturn("password"); + when(cassandraShard.getKeySpaceName()).thenReturn("mykeyspace"); + + ConnectionHelperRequest request = mock(ConnectionHelperRequest.class); + when(request.getShards()).thenReturn(Arrays.asList(cassandraShard)); + when(request.getMaxConnections()).thenReturn(10); + connectionHelper.init(request); + assertTrue(connectionHelper.isConnectionPoolInitialized()); + } + + @Test + void testGetConnection_ShouldReturnValidSession() throws ConnectionException { + String connectionKey = "localhost:9042/user/mykeyspace"; + CqlSession mockSession = mock(CqlSession.class); + connectionHelper.setConnectionPoolMap(Map.of(connectionKey, mockSession)); + + CqlSession session = connectionHelper.getConnection(connectionKey); + + assertNotNull(session); + assertEquals(mockSession, session); + } + + @Test + void testGetConnection_ShouldThrowException_WhenConnectionNotFound() { + assertThrows( + ConnectionException.class, + () -> { + connectionHelper.getConnection("invalidKey"); + }); + } + + @Test + void testIsConnectionPoolInitialized_ShouldReturnTrue_WhenInitialized() { + ConnectionHelperRequest request = mock(ConnectionHelperRequest.class); + when(request.getShards()).thenReturn(Arrays.asList(mock(CassandraShard.class))); + when(request.getMaxConnections()).thenReturn(10); + + connectionHelper.init(request); + + assertTrue(connectionHelper.isConnectionPoolInitialized()); + } + + @Test + void testGetConnection_ShouldThrowConnectionException_WhenPoolNotInitialized() { + assertThrows( + ConnectionException.class, + () -> { + connectionHelper.getConnection("anyKey"); + }); + } + + @Test + void testInit_ShouldHandleException_WhenCqlSessionCreationFails() { + CassandraShard invalidShard = mock(CassandraShard.class); + when(invalidShard.getHost()).thenReturn("localhost"); + when(invalidShard.getPort()).thenReturn("9042"); + when(invalidShard.getUserName()).thenReturn("invalidUser"); + when(invalidShard.getPassword()).thenReturn("invalidPassword"); + when(invalidShard.getKeySpaceName()).thenReturn("mykeyspace"); + + ConnectionHelperRequest request = mock(ConnectionHelperRequest.class); + when(request.getShards()).thenReturn(Arrays.asList(invalidShard)); + when(request.getMaxConnections()).thenReturn(10); + + connectionHelper.init(request); + assertFalse(connectionHelper.isConnectionPoolInitialized()); + } + + @Test + void testSetConnectionPoolMap_ShouldOverrideConnectionPoolMap() throws ConnectionException { + CqlSession mockSession = mock(CqlSession.class); + connectionHelper.setConnectionPoolMap(Map.of("localhost:9042/user/mykeyspace", mockSession)); + + CqlSession session = connectionHelper.getConnection("localhost:9042/user/mykeyspace"); + assertNotNull(session); + assertEquals(mockSession, session); + } + + @Test + void testGetConnectionPoolNotFound() { + connectionHelper.setConnectionPoolMap(Map.of()); + + ConnectionException exception = + assertThrows( + ConnectionException.class, + () -> { + connectionHelper.getConnection("nonexistentKey"); + }); + + assertEquals("Connection pool is not initialized.", exception.getMessage()); + } + + @Test + void testGetConnectionWhenPoolNotInitialized() { + connectionHelper.setConnectionPoolMap(null); + ConnectionException exception = + assertThrows( + ConnectionException.class, + () -> { + connectionHelper.getConnection("localhost:9042/testuser/testKeyspace"); + }); + assertEquals("Connection pool is not initialized.", exception.getMessage()); + } + + @Test + void testGetConnectionWithValidKey() throws ConnectionException { + CqlSession mockSession = mock(CqlSession.class); + + String connectionKey = "localhost:9042/testuser/testKeyspace"; + connectionHelper.setConnectionPoolMap(Map.of(connectionKey, mockSession)); + + CqlSession session = connectionHelper.getConnection(connectionKey); + + assertEquals(mockSession, session, "The returned connection should match the mock session."); + } + + @Test + void testInit_ShouldThrowIllegalArgumentException_WhenInvalidShardTypeIsProvideds() { + Shard invalidShard = mock(Shard.class); + CassandraConnectionHelper connectionHelper = new CassandraConnectionHelper(); + ConnectionHelperRequest request = mock(ConnectionHelperRequest.class); + when(request.getShards()).thenReturn(java.util.Collections.singletonList(invalidShard)); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + connectionHelper.init(request); + }); + assertEquals("Invalid shard object", exception.getMessage()); + } +} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java index 3b9c0e64bf..5746637892 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java @@ -18,9 +18,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; import com.google.cloud.teleport.v2.templates.constants.Constants; +import com.google.cloud.teleport.v2.templates.dbutils.connection.CassandraConnectionHelper; import com.google.cloud.teleport.v2.templates.dbutils.connection.JdbcConnectionHelper; +import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao; import com.google.cloud.teleport.v2.templates.dbutils.dao.source.JdbcDao; import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator; import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException; @@ -82,4 +85,37 @@ public void testCreateSourceProcessor_invalidSource() throws Exception { SourceProcessorFactory.createSourceProcessor("invalid_source", shards, maxConnections); } + + @Test + public void testCreateSourceProcessor_cassandra_validSource() throws Exception { + List shards = + Arrays.asList( + new CassandraShard( + "shard1", + "localhost", + "3306", + "myuser", + "mypassword", + "mydatabase", + "LOCAL_QUORUM", + false, + "v5", + "mynamespace", + 1024, + 1024)); + int maxConnections = 10; + CassandraConnectionHelper mockConnectionHelper = Mockito.mock(CassandraConnectionHelper.class); + doNothing().when(mockConnectionHelper).init(any()); + SourceProcessorFactory.setConnectionHelperMap( + Map.of(Constants.SOURCE_CASSANDRA, mockConnectionHelper)); + SourceProcessor processor = + SourceProcessorFactory.createSourceProcessor( + Constants.SOURCE_CASSANDRA, shards, maxConnections); + + Assert.assertNotNull(processor); + // ToDo this Particular line will get enable in DML PR + // Assert.assertTrue(processor.getDmlGenerator() instanceof CassandraDMLGenerator); + Assert.assertEquals(1, processor.getSourceDaoMap().size()); + Assert.assertTrue(processor.getSourceDaoMap().get("shard1") instanceof CassandraDao); + } }