Skip to content

Commit

Permalink
Cassandra Bulk - List and Set - Avro Mapping. (#2165)
Browse files Browse the repository at this point in the history
* Adding List and Map tables in UT

* Support for non-nested Cassandra List and Set
  • Loading branch information
VardhanThigle authored Feb 6, 2025
1 parent 79c9fe9 commit 2adaab6
Show file tree
Hide file tree
Showing 16 changed files with 591 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraFieldMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueArrayMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueExtractor;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider;
import com.google.common.collect.ImmutableMap;
import com.google.common.reflect.TypeToken;

/** Represent Unified type mapping, value extractor and value mappings for Cassandra. */
@AutoValue
Expand All @@ -40,20 +42,72 @@ public abstract static class Builder {

abstract ImmutableMap.Builder<String, CassandraFieldMapper<?>> fieldMappingBuilder();

/**
* Maintain mappings for a given type, as primitive as well as part of collections.
*
* @param cassandraType - name of the cassandra type, as discovered by the schema discovery.
* @param type - Unified mapping type.
* @param rowValueExtractor - {@link CassandraRowValueExtractor} to extract value from {@link
* com.datastax.driver.core.Row Cassandra Row}
* @param rowValueMapper - {@link CassandraRowValueMapper} to map value to {@link
* com.google.cloud.teleport.v2.source.reader.io.row.SourceRow}
* @param typeClass - Class of the extracted value. Generally return type of the
* rowValueExtractor.
* @return Builder
*/
public <T> Builder put(
String cassandraType,
UnifiedMappingProvider.Type type,
CassandraRowValueExtractor<T> rowValueExtractor,
CassandraRowValueMapper<T> rowValueMapper) {
CassandraRowValueMapper<T> rowValueMapper,
Class<T> typeClass) {
this.typeMappingBuilder()
.put(cassandraType.toUpperCase(), UnifiedMappingProvider.getMapping(type));
this.fieldMappingBuilder()
.put(
cassandraType.toUpperCase(),
CassandraFieldMapper.create(rowValueExtractor, rowValueMapper));
if (!type.equals(UnifiedMappingProvider.Type.UNSUPPORTED)) {
putList(cassandraType, type, rowValueExtractor, rowValueMapper, typeClass);
putSet(cassandraType, type, rowValueExtractor, rowValueMapper, typeClass);
}
return this;
}

private <T> void putList(
String cassandraType,
UnifiedMappingProvider.Type type,
CassandraRowValueExtractor<T> rowValueExtractor,
CassandraRowValueMapper<T> rowValueMapper,
Class<T> typeClass) {
String listType = "LIST<" + cassandraType.toUpperCase() + ">";
this.typeMappingBuilder().put(listType, UnifiedMappingProvider.getArrayMapping(type));
TypeToken<T> typeToken = TypeToken.of(typeClass);
this.fieldMappingBuilder()
.put(
listType,
CassandraFieldMapper.create(
(row, name) -> row.getList(name, typeToken),
CassandraRowValueArrayMapper.create(rowValueMapper)));
}

private <T> void putSet(
String cassandraType,
UnifiedMappingProvider.Type type,
CassandraRowValueExtractor<T> rowValueExtractor,
CassandraRowValueMapper<T> rowValueMapper,
Class<T> typeClass) {
String setType = "SET<" + cassandraType.toUpperCase() + ">";
TypeToken<T> typeToken = TypeToken.of(typeClass);
this.typeMappingBuilder().put(setType, UnifiedMappingProvider.getArrayMapping(type));
this.fieldMappingBuilder()
.put(
setType,
CassandraFieldMapper.create(
(row, name) -> row.getSet(name, typeToken),
CassandraRowValueArrayMapper.create(rowValueMapper)));
}

public abstract CassandraMappings build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.IntervalNano;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider;
import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.UUID;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.commons.codec.binary.Hex;
Expand All @@ -40,7 +44,11 @@ public class CassandraMappingsProvider {
private static final CassandraRowValueMapper toString = (value, schema) -> value.toString();

/** Pass the value as an integer to avro. */
private static final CassandraRowValueMapper<Number> toInt = (value, schema) -> value.intValue();
private static final CassandraRowValueMapper<Byte> byteToInt =
(value, schema) -> value.intValue();

private static final CassandraRowValueMapper<Short> shortToInt =
(value, schema) -> value.intValue();

/** Map {@link ByteBuffer} to a Hex encoded String. */
private static final CassandraRowValueMapper<ByteBuffer> ByteBufferToHexString =
Expand Down Expand Up @@ -84,38 +92,121 @@ public class CassandraMappingsProvider {

private static final CassandraMappings CASSANDRA_MAPPINGS =
CassandraMappings.builder()
.put("ASCII", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough)
.put("BIGINT", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough)
.put("BLOB", UnifiedMappingProvider.Type.STRING, Row::getBytes, ByteBufferToHexString)
.put("BOOLEAN", UnifiedMappingProvider.Type.BOOLEAN, Row::getBool, valuePassThrough)
.put("COUNTER", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough)
.put("DATE", UnifiedMappingProvider.Type.DATE, Row::getDate, localDateToAvroLogicalDate)
.put(
"ASCII",
UnifiedMappingProvider.Type.STRING,
Row::getString,
valuePassThrough,
String.class)
.put(
"BIGINT",
UnifiedMappingProvider.Type.LONG,
Row::getLong,
valuePassThrough,
Long.class)
.put(
"BLOB",
UnifiedMappingProvider.Type.STRING,
Row::getBytes,
ByteBufferToHexString,
ByteBuffer.class)
.put(
"BOOLEAN",
UnifiedMappingProvider.Type.BOOLEAN,
Row::getBool,
valuePassThrough,
Boolean.class)
.put(
"COUNTER",
UnifiedMappingProvider.Type.LONG,
Row::getLong,
valuePassThrough,
Long.class)
.put(
"DATE",
UnifiedMappingProvider.Type.DATE,
Row::getDate,
localDateToAvroLogicalDate,
LocalDate.class)
// The Cassandra decimal does not have precision and scale fixed in the
// schema which would be needed if we want to map it to Avro Decimal.
.put("DECIMAL", UnifiedMappingProvider.Type.STRING, Row::getDecimal, toString)
.put("DOUBLE", UnifiedMappingProvider.Type.DOUBLE, Row::getDouble, valuePassThrough)
.put("DURATION", UnifiedMappingProvider.Type.INTERVAL_NANO, getDuration, durationToAvro)
.put("FLOAT", UnifiedMappingProvider.Type.FLOAT, Row::getFloat, valuePassThrough)
.put("INET", UnifiedMappingProvider.Type.STRING, Row::getInet, toString)
.put("INT", UnifiedMappingProvider.Type.INTEGER, Row::getInt, valuePassThrough)
.put("SMALLINT", UnifiedMappingProvider.Type.INTEGER, Row::getShort, toInt)
.put("TEXT", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough)
.put(
"DECIMAL",
UnifiedMappingProvider.Type.STRING,
Row::getDecimal,
toString,
BigDecimal.class)
.put(
"DOUBLE",
UnifiedMappingProvider.Type.DOUBLE,
Row::getDouble,
valuePassThrough,
Double.class)
.put(
"DURATION",
UnifiedMappingProvider.Type.INTERVAL_NANO,
getDuration,
durationToAvro,
Duration.class)
.put(
"FLOAT",
UnifiedMappingProvider.Type.FLOAT,
Row::getFloat,
valuePassThrough,
Float.class)
.put(
"INET", UnifiedMappingProvider.Type.STRING, Row::getInet, toString, InetAddress.class)
.put(
"INT",
UnifiedMappingProvider.Type.INTEGER,
Row::getInt,
valuePassThrough,
Integer.class)
.put(
"SMALLINT",
UnifiedMappingProvider.Type.INTEGER,
Row::getShort,
shortToInt,
Short.class)
.put(
"TEXT",
UnifiedMappingProvider.Type.STRING,
Row::getString,
valuePassThrough,
String.class)
.put(
"TIME",
UnifiedMappingProvider.Type.INTERVAL_NANO,
Row::getTime,
cassandraTimeToIntervalNano)
.put("TIMESTAMP", UnifiedMappingProvider.Type.TIMESTAMP, Row::getTimestamp, dateToAvro)
.put("TIMEUUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString)
.put("TINYINT", UnifiedMappingProvider.Type.INTEGER, Row::getByte, toInt)
.put("UUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString)
.put("VARCHAR", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough)
.put("VARINT", UnifiedMappingProvider.Type.NUMBER, Row::getVarint, toString)
cassandraTimeToIntervalNano,
Long.class)
.put(
"TIMESTAMP",
UnifiedMappingProvider.Type.TIMESTAMP,
Row::getTimestamp,
dateToAvro,
Date.class)
.put("TIMEUUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString, UUID.class)
.put("TINYINT", UnifiedMappingProvider.Type.INTEGER, Row::getByte, byteToInt, Byte.class)
.put("UUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString, UUID.class)
.put(
"VARCHAR",
UnifiedMappingProvider.Type.STRING,
Row::getString,
valuePassThrough,
String.class)
.put(
"VARINT",
UnifiedMappingProvider.Type.NUMBER,
Row::getVarint,
toString,
BigInteger.class)
.put(
"UNSUPPORTED",
UnifiedMappingProvider.Type.UNSUPPORTED,
(row, name) -> null,
(value, schema) -> null)
(value, schema) -> null,
null)
.build();

private CassandraMappingsProvider() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ abstract class CassandraRowMapper implements Transformer<Row, SourceRow>, Serial
public static final ImmutableMap<String, CassandraFieldMapper<?>> MAPPINGS =
CassandraMappingsProvider.getFieldMapping();

/*
* TODO(vardhanvthigle): support nested collections.
*/
public static CassandraRowMapper create(
SourceSchemaReference sourceSchemaReference, SourceTableSchema sourceTableSchema) {
return new AutoValue_CassandraRowMapper(sourceSchemaReference, sourceTableSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecordBuilder;
import org.checkerframework.checker.nullness.qual.NonNull;

@AutoValue
public abstract class CassandraRowValueArrayMapper<T>
implements CassandraRowValueMapper<Iterable<T>> {

public static <T> CassandraRowValueArrayMapper<T> create(
CassandraRowValueMapper<T> rowValueMapper) {
return new AutoValue_CassandraRowValueArrayMapper<>(rowValueMapper);
}

abstract CassandraRowValueMapper<T> rowValueMapper();

/**
* Map the extracted value to an object accepted by {@link GenericRecordBuilder#set(Field,
* Object)} as per the schema of the field.
*
* @param values extracted value collection.
* @param schema Avro Schema.
* @return mapped object.
*/
@Override
public Object map(@NonNull Iterable<T> values, Schema schema) {
return ImmutableList.builder().addAll(values).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.Serializable;
import javax.annotation.Nullable;

public interface CassandraRowValueExtractor<T extends Object> extends Serializable {
public interface CassandraRowValueExtractor<T> extends Serializable {

/**
* Extract the requested field from the result set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,17 @@ private ImmutableMap<String, SourceColumnType> getTableColumns(
for (ColumnMetadata columnMetadata : metadata.getTable(table).get().getColumns().values()) {
String name = columnMetadata.getName().toString();
SourceColumnType sourceColumnType =
new SourceColumnType(columnMetadata.getType().toString(), new Long[] {}, new Long[] {});
new SourceColumnType(
/*
* Get the name of the type as represented in CSql Language, using the driver's `asCql` wrapper.
* here we exclude the frozen keyword, as a type being frozen or not does not matter to the read pipeline.
*/
columnMetadata
.getType()
.asCql(false /*includeFrozen*/, true /*prettyPrint*/)
.toUpperCase(),
new Long[] {},
new Long[] {});
tableSchemaBuilder.put(name, sourceColumnType);
}
return tableSchemaBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public Schema getSchema(SourceColumnType columnType) {
: SchemaBuilder.builder().unionOf().nullType().and().type(schema).endUnion();
}

/*
* TODO(vardhanvthigle): Handle Nested collections.
*/
private Schema getBasicSchema(SourceColumnType columnType) {
return mappers
.get(this.mapperType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.schema.typemapping;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.avro.Schema;

Expand All @@ -24,7 +25,7 @@
* com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.MysqlMappingProvider
* MysqlMappingProvider}.
*/
public interface UnifiedTypeMapping {
public interface UnifiedTypeMapping extends Serializable {

/**
* Convert the Source Schema.
Expand Down
Loading

0 comments on commit 2adaab6

Please sign in to comment.