Skip to content

Commit

Permalink
fixed schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 30, 2024
1 parent 10be48f commit a48ce4b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,40 @@ private GenericRecord convertToStruct(
String[] filedNames = fromType.getFieldNames();
for (int i = 0; i < filedNames.length; i++) {
String recordField = filedNames[i];
SeaTunnelDataType dataType = fromType.getFieldType(i);
Type afterType = SchemaUtils.toIcebergType(fromType.getFieldType(i));
Types.NestedField tableField = lookupStructField(recordField, schema, structFieldId);
// add column
if (Objects.isNull(tableField)) {
if (config.isTableSchemaEvolutionEnabled() && Objects.nonNull(wrapper)) {
// add the column if schema evolution is on
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(dataType);
wrapper.addColumn(parentFieldName, recordField, type);
wrapper.addColumn(parentFieldName, recordField, afterType);
}
continue;
}
result.setField(
tableField.name(),
convertValue(
row.getField(i),
fromType.getFieldType(i),
tableField.type(),
tableField.fieldId(),
wrapper));
// update column type,;
boolean hasSchemaUpdates = false;
if (config.isTableSchemaEvolutionEnabled() && Objects.nonNull(wrapper)) {
// update the type if needed and schema evolution is on
Type.PrimitiveType evolveDataType =
SchemaUtils.needsDataTypeUpdate(tableField.type(), afterType);
if (Objects.nonNull(evolveDataType)) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
wrapper.modifyColumn(fieldName, evolveDataType);
hasSchemaUpdates = true;
}
}
if (!hasSchemaUpdates) {
result.setField(
tableField.name(),
convertValue(
row.getField(i),
fromType.getFieldType(i),
tableField.type(),
tableField.fieldId(),
wrapper));
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -97,17 +96,17 @@ public void applySchemaChange(SeaTunnelRowType afterRowType, SchemaChangeEvent e
AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent) event;
updates.deleteColumn(dropColumnEvent.getColumn());
} else if (event instanceof AlterTableAddColumnEvent) {
// Change during data consumption process
// Update column , during data consumption process
} else if (event instanceof AlterTableModifyColumnEvent) {
// Update type , during data consumption process
} else if (event instanceof AlterTableChangeColumnEvent) {
// rename
AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent) event;
changeColumn(
schema,
changeColumnEvent.getColumn(),
changeColumnEvent.getOldColumn(),
updates);
} else if (event instanceof AlterTableModifyColumnEvent) {
AlterTableModifyColumnEvent modifyColumnEvent = (AlterTableModifyColumnEvent) event;
modifyColumn(schema, modifyColumnEvent.getColumn(), updates);
}
if (!updates.empty()) {
applySchemaUpdate(updates);
Expand All @@ -123,19 +122,6 @@ private void changeColumn(
}
}

private static void modifyColumn(
Schema schema, Column changeColumn, SchemaChangeWrapper updates) {
Types.NestedField nestedField = schema.findField(changeColumn.getName());
if (nestedField != null) {
Type type = SchemaUtils.toIcebergType(changeColumn.getDataType());
if (nestedField.type() != type && type.isPrimitiveType()) {
updates.modifyColumn(changeColumn.getName(), (Type.PrimitiveType) type);
} else {
log.warn("Only PrimitiveType updates are supported, {}", type);
}
}
}

/**
* apply schema update
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaAddColumn;
Expand Down Expand Up @@ -68,14 +67,13 @@ public class SchemaUtils {

private SchemaUtils() {}

public static Type.PrimitiveType needsDataTypeUpdate(
Type currentIcebergType, SeaTunnelDataType dataType) {
public static Type.PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Type afterType) {
if (currentIcebergType.typeId() == Type.TypeID.FLOAT
&& dataType.getSqlType() == SqlType.DOUBLE) {
&& afterType.typeId() == Type.TypeID.DOUBLE) {
return Types.DoubleType.get();
}
if (currentIcebergType.typeId() == Type.TypeID.INTEGER
&& dataType.getSqlType() == SqlType.BIGINT) {
&& afterType.typeId() == Type.TypeID.LONG) {
return Types.LongType.get();
}
return null;
Expand Down

0 comments on commit a48ce4b

Please sign in to comment.