From a48ce4bfcce72f0ddb7c641e4cc9e73ba9df5595 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 30 Jan 2024 11:47:30 +0800 Subject: [PATCH] fixed schema change --- .../seatunnel/iceberg/data/RowConverter.java | 35 +++++++++++++------ .../sink/writer/IcebergRecordWriter.java | 22 +++--------- .../seatunnel/iceberg/utils/SchemaUtils.java | 8 ++--- 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java index 3dc453f80309..8c699b344021 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java @@ -139,7 +139,7 @@ private GenericRecord convertToStruct( String[] filedNames = fromType.getFieldNames(); for (int i = 0; i < filedNames.length; i++) { String recordField = filedNames[i]; - SeaTunnelDataType dataType = fromType.getFieldType(i); + Type afterType = SchemaUtils.toIcebergType(fromType.getFieldType(i)); Types.NestedField tableField = lookupStructField(recordField, schema, structFieldId); // add column if (Objects.isNull(tableField)) { @@ -147,19 +147,32 @@ private GenericRecord convertToStruct( // add the column if schema evolution is on String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); - Type type = SchemaUtils.toIcebergType(dataType); - wrapper.addColumn(parentFieldName, recordField, type); + wrapper.addColumn(parentFieldName, recordField, afterType); } continue; } - result.setField( - tableField.name(), - convertValue( - row.getField(i), - fromType.getFieldType(i), - tableField.type(), - tableField.fieldId(), - wrapper)); + // update column type,; + boolean hasSchemaUpdates = false; + if (config.isTableSchemaEvolutionEnabled() && Objects.nonNull(wrapper)) { + // update the type if needed and schema evolution is on + Type.PrimitiveType evolveDataType = + SchemaUtils.needsDataTypeUpdate(tableField.type(), afterType); + if (Objects.nonNull(evolveDataType)) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + wrapper.modifyColumn(fieldName, evolveDataType); + hasSchemaUpdates = true; + } + } + if (!hasSchemaUpdates) { + result.setField( + tableField.name(), + convertValue( + row.getField(i), + fromType.getFieldType(i), + tableField.type(), + tableField.fieldId(), + wrapper)); + } } return result; } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java index ad3f3c483c6a..2be206ebb6e6 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java @@ -38,7 +38,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import lombok.extern.slf4j.Slf4j; @@ -97,17 +96,17 @@ public void applySchemaChange(SeaTunnelRowType afterRowType, SchemaChangeEvent e AlterTableDropColumnEvent dropColumnEvent = (AlterTableDropColumnEvent) event; updates.deleteColumn(dropColumnEvent.getColumn()); } else if (event instanceof AlterTableAddColumnEvent) { - // Change during data consumption process + // Update column , during data consumption process + } else if (event instanceof AlterTableModifyColumnEvent) { + // Update type , during data consumption process } else if (event instanceof AlterTableChangeColumnEvent) { + // rename AlterTableChangeColumnEvent changeColumnEvent = (AlterTableChangeColumnEvent) event; changeColumn( schema, changeColumnEvent.getColumn(), changeColumnEvent.getOldColumn(), updates); - } else if (event instanceof AlterTableModifyColumnEvent) { - AlterTableModifyColumnEvent modifyColumnEvent = (AlterTableModifyColumnEvent) event; - modifyColumn(schema, modifyColumnEvent.getColumn(), updates); } if (!updates.empty()) { applySchemaUpdate(updates); @@ -123,19 +122,6 @@ private void changeColumn( } } - private static void modifyColumn( - Schema schema, Column changeColumn, SchemaChangeWrapper updates) { - Types.NestedField nestedField = schema.findField(changeColumn.getName()); - if (nestedField != null) { - Type type = SchemaUtils.toIcebergType(changeColumn.getDataType()); - if (nestedField.type() != type && type.isPrimitiveType()) { - updates.modifyColumn(changeColumn.getName(), (Type.PrimitiveType) type); - } else { - log.warn("Only PrimitiveType updates are supported, {}", type); - } - } - } - /** * apply schema update * diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java index f6503caa3858..fafac0eaae06 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java @@ -28,7 +28,6 @@ import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergTypeMapper; import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaAddColumn; @@ -68,14 +67,13 @@ public class SchemaUtils { private SchemaUtils() {} - public static Type.PrimitiveType needsDataTypeUpdate( - Type currentIcebergType, SeaTunnelDataType dataType) { + public static Type.PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Type afterType) { if (currentIcebergType.typeId() == Type.TypeID.FLOAT - && dataType.getSqlType() == SqlType.DOUBLE) { + && afterType.typeId() == Type.TypeID.DOUBLE) { return Types.DoubleType.get(); } if (currentIcebergType.typeId() == Type.TypeID.INTEGER - && dataType.getSqlType() == SqlType.BIGINT) { + && afterType.typeId() == Type.TypeID.LONG) { return Types.LongType.get(); } return null;