Skip to content

Commit

Permalink
Metadata config and pipeline options (#31)
Browse files Browse the repository at this point in the history
* Added Connection Helper

* Added Formatter

* Added File Reader

* Refectored

* Added Changes for Schema and Spanner Schema

* Added Schema Changes to read Spanner Table in Schema object

* Added Schema Changes

* Added Changes for Schma

* Added Source Writn Fn Changes

* Added Source Factory Changes

* Added Fixed for the Source factory and Casssandra Connection helper

* Added Cassandra Schema Reader

* Added Pipeline Process

* Removed Unwanted Validation

* Added Access validator

* removed unwanted Return

* Added Thread safe optimization in cassandra Connection helper

* Applied  spotless:apply

* spotless:apply

* Added Constructor for Test case

* Added DUMMY Generator For UT

* Fixed UT for Metadata config (#30)

Co-authored-by: Narendra Rajput <narendra.rajput@ollion.com>

---------

Co-authored-by: Narendra Rajput <narendra.rajput@ollion.com>
  • Loading branch information
pawankashyapollion and narendra3488 authored Dec 24, 2024
1 parent f618128 commit 7d06c3e
Show file tree
Hide file tree
Showing 19 changed files with 1,261 additions and 31 deletions.
5 changes: 5 additions & 0 deletions v2/spanner-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version> <!-- Use the latest version -->
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class Constants {
/* The source type value for MySql databases */
public static final String MYSQL_SOURCE_TYPE = "mysql";

/* The source type value for CASSANDRA databases */
public static final String CASSANDRA_SOURCE_TYPE = "cassandra";

/* The value for Oracle databases in the source type key */
public static final String ORACLE_SOURCE_TYPE = "oracle";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.metadata;

import autovalue.shaded.com.google.common.collect.ImmutableList;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.google.cloud.teleport.v2.spanner.migrations.schema.*;
import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceColumn;
import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceSchema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceTable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class CassandraSourceMetadata {

private static final String SCHEMA_NAME = "cassandra"; // Constant for schema name

/**
* Generates a SourceSchema from a ResultSet.
*
* @param resultSet The ResultSet containing schema information.
* @return A SourceSchema instance.
*/
private static SourceSchema generateSourceSchema(ResultSet resultSet) {
Map<String, Map<String, SourceColumn>> schema = new HashMap<>();

resultSet.forEach(
row -> {
String tableName = row.getString("table_name");
String columnName = row.getString("column_name");
String dataType = row.getString("type");
String kind = row.getString("kind");

boolean isPrimaryKey = isPrimaryKey(kind);
SourceColumn sourceColumn = SourceColumn.create(columnName, kind, dataType, isPrimaryKey);

schema.computeIfAbsent(tableName, k -> new HashMap<>()).put(columnName, sourceColumn);
});

Map<String, SourceTable> tables =
schema.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
SourceTable.create(
entry.getKey(), ImmutableList.copyOf(entry.getValue().values()))));

return SourceSchema.create(Map.copyOf(tables));
}

/**
* Converts a ResultSet to a Schema object, updating the provided schema.
*
* @param schema The schema to update.
* @param resultSet The ResultSet containing schema information.
*/
public static void generateSourceSchema(Schema schema, ResultSet resultSet) {
SourceSchema sourceSchema = generateSourceSchema(resultSet);
Map<String, com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable> sourceTableMap =
convertSourceSchemaToMap(sourceSchema);
schema.setSrcSchema(sourceTableMap);
schema.setToSource(convertSourceToNameAndColsTable(sourceSchema.tables().values()));
}

/**
* Converts a SourceSchema to a map of Spanner table names to SourceTable objects.
*
* @param sourceSchema The SourceSchema to convert.
* @return A map where the key is the table name and the value is the corresponding SourceTable.
*/
private static Map<String, com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable>
convertSourceSchemaToMap(SourceSchema sourceSchema) {
return sourceSchema.tables().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, entry -> convertSourceTableToSourceTable(entry.getValue())));
}

/**
* Converts a SourceTable to a SourceTable for Spanner.
*
* @param sourceTable The SourceTable to convert.
* @return A converted SourceTable object.
*/
private static com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable
convertSourceTableToSourceTable(SourceTable sourceTable) {
List<SourceColumn> columns = sourceTable.columns();

String[] colIds = columns.stream().map(SourceColumn::name).toArray(String[]::new);

Map<String, SourceColumnDefinition> colDefs =
columns.stream()
.collect(
Collectors.toMap(
SourceColumn::name,
col ->
new SourceColumnDefinition(
col.name(),
new SourceColumnType(col.sourceType(), new Long[0], new Long[0]))));

ColumnPK[] primaryKeys =
columns.stream()
.filter(SourceColumn::isPrimaryKey)
.map(col -> new ColumnPK(col.name(), getPrimaryKeyOrder(col)))
.toArray(ColumnPK[]::new);

return new com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable(
sourceTable.name(),
SCHEMA_NAME, // Use constant schema name
colIds,
colDefs,
primaryKeys);
}

/**
* Determines if a column is a primary key.
*
* @param kind The column kind.
* @return true if the column is a primary key, false otherwise.
*/
private static boolean isPrimaryKey(String kind) {
return "partition_key".equals(kind) || "clustering".equals(kind);
}

/**
* Gets the order of the primary key column.
*
* @param col The SourceColumn.
* @return The order of the primary key.
*/
private static int getPrimaryKeyOrder(SourceColumn col) {
switch (col.kind()) {
case "partition_key":
return 1;
case "clustering":
return 2;
default:
return 0;
}
}

/**
* Converts a collection of SourceTables to a map of table names to NameAndCols.
*
* @param tables A collection of SourceTables.
* @return A map where the key is the table name and the value is a NameAndCols object.
*/
private static Map<String, NameAndCols> convertSourceToNameAndColsTable(
Collection<SourceTable> tables) {
return tables.stream()
.collect(
Collectors.toMap(
SourceTable::name, CassandraSourceMetadata::convertSourceTableToNameAndCols));
}

/**
* Converts a SourceTable to a NameAndCols object.
*
* @param sourceTable The SourceTable to convert.
* @return A NameAndCols object representing the table and column names.
*/
private static NameAndCols convertSourceTableToNameAndCols(SourceTable sourceTable) {
Map<String, String> columnNames =
sourceTable.columns().stream()
.collect(Collectors.toMap(SourceColumn::name, SourceColumn::name));

return new NameAndCols(sourceTable.name(), columnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
*/
public class Schema implements Serializable {
/** Maps the HarbourBridge table ID to the Spanner table details. */
private final Map<String, SpannerTable> spSchema;
private Map<String, SpannerTable> spSchema;

/** Maps the Spanner table ID to the synthetic PK. */
private final Map<String, SyntheticPKey> syntheticPKeys;

/** Maps the HarbourBridge table ID to the Source table details. */
private final Map<String, SourceTable> srcSchema;
private Map<String, SourceTable> srcSchema;

// The columns below are not part of the session file. They are computed based on the fields
// above.
Expand Down Expand Up @@ -80,6 +80,10 @@ public Map<String, SpannerTable> getSpSchema() {
return spSchema;
}

public Map<String, SpannerTable> setSpSchema(Map<String, SpannerTable> spSchema) {
return this.spSchema = spSchema;
}

public Map<String, SyntheticPKey> getSyntheticPks() {
return syntheticPKeys;
}
Expand All @@ -88,6 +92,10 @@ public Map<String, SourceTable> getSrcSchema() {
return srcSchema;
}

public Map<String, SourceTable> setSrcSchema(Map<String, SourceTable> srcSchema) {
return this.srcSchema = srcSchema;
}

public Map<String, NameAndCols> getToSpanner() {
return toSpanner;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra;

import com.google.auto.value.AutoValue;
import java.io.Serializable;

/**
* Represents a source column schema with type, primary key information, and column kind. This class
* provides immutable representation of a source column with necessary metadata.
*/
@AutoValue
public abstract class SourceColumn implements Serializable {

/**
* Creates a new instance of SourceColumn.
*
* @param name the name of the column, should not be null or empty.
* @param kind the kind/type of the column (e.g., partition key, clustering key, regular).
* @param sourceType the source type of the column (e.g., String, Int).
* @param isPrimaryKey whether the column is a primary key.
* @return a new immutable SourceColumn instance.
*/
public static SourceColumn create(
String name, String kind, String sourceType, boolean isPrimaryKey) {

return new AutoValue_SourceColumn(name, kind, sourceType, isPrimaryKey);
}

/**
* Gets the name of the column.
*
* @return the name of the column.
*/
public abstract String name();

/**
* Gets the kind/type of the column.
*
* @return the kind of the column.
*/
public abstract String kind();

/**
* Gets the source type of the column.
*
* @return the source type of the column.
*/
public abstract String sourceType();

/**
* Checks if the column is a primary key.
*
* @return {@code true} if the column is a primary key, otherwise {@code false}.
*/
public abstract boolean isPrimaryKey();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Map;

/** Represents an immutable source schema. */
@AutoValue
public abstract class SourceSchema implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Factory method to create a new instance of SourceSchema.
*
* @param tables the map of source tables to initialize.
* @return a new immutable SourceSchema instance.
*/
public static SourceSchema create(Map<String, SourceTable> tables) {
return new AutoValue_SourceSchema(Map.copyOf(tables));
}

/**
* Gets the source schema as an unmodifiable map.
*
* @return the unmodifiable source schema map.
*/
public abstract Map<String, SourceTable> tables();
}
Loading

0 comments on commit 7d06c3e

Please sign in to comment.