From 94d05ebbde3acf4797e303161a1d06838ed99085 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 11 Oct 2023 16:55:20 +0800 Subject: [PATCH 1/5] [INLONG-9041][Manager] Support saving schema information when saving iceberg source --- .../manager/common/enums/FieldType.java | 1 + .../pojo/sort/util/FieldInfoUtils.java | 14 ++++- .../manager/pojo/source/SourceRequest.java | 3 + .../sink/iceberg/IcebergCatalogUtils.java | 1 + .../source/AbstractSourceOperator.java | 9 +++ .../service/source/StreamSourceOperator.java | 7 +++ .../source/iceberg/IcebergSourceOperator.java | 61 +++++++++++++++++++ 7 files changed, 95 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java index 1bcab5b3586..79026e88f75 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java @@ -48,6 +48,7 @@ public enum FieldType { FLOAT64, DATETIME, TIMESTAMP, + TIMESTAMPTZ, LOCAL_ZONE_TIMESTAMP, ARRAY, MAP, diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java index 43d48a6e766..ce658c9167b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java @@ -56,6 +56,7 @@ import org.apache.commons.lang3.StringUtils; import java.math.BigDecimal; +import java.sql.Timestamp; import java.util.List; import java.util.Objects; @@ -213,16 +214,27 @@ public static Class sqlTypeToJavaType(String type) { case DECIMAL: return BigDecimal.class; case VARCHAR: + case STRING: return String.class; case DATE: case TIME: - case TIMESTAMP: return java.util.Date.class; + case TIMESTAMP: + case TIMESTAMPTZ: + return Timestamp.class; default: return Object.class; } } + /** + * Convert SQL type names to Java type string. + */ + public static String sqlTypeToJavaTypeStr(String type) { + Class clazz = FieldInfoUtils.sqlTypeToJavaType(type); + return clazz == Object.class ? "string" : clazz.getSimpleName().toLowerCase(); + } + /** * Get the FieldFormat of Sort according to type string * diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index a9e60cd5a24..19e3fecba6e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -101,6 +101,9 @@ public class SourceRequest { @Length(min = 1, max = 163840, message = "length must be between 1 and 163840") private String snapshot; + @ApiModelProperty(value = "Whether to get schema after saving or updating. Default is false") + private Boolean enableGetSchema = false; + @ApiModelProperty("Version") @NotNull(groups = UpdateValidation.class, message = "version cannot be null") private Integer version; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java index 7c34ce703b3..3a6847fed08 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java @@ -149,6 +149,7 @@ public static List getColumns(String metastoreUri, String dbN IcebergColumnInfo info = new IcebergColumnInfo(); info.setName(column.name()); info.setRequired(column.isRequired()); + info.setType(column.type().toString()); columnList.add(info); } return columnList; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index eaf75c9971b..a448989d048 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -97,6 +97,9 @@ public Integer saveOpt(SourceRequest request, Integer groupStatus, String operat setTargetEntity(request, entity); sourceMapper.insert(entity); saveFieldOpt(entity, request.getFieldList()); + if (request.getEnableGetSchema()) { + getFieldInfo(request, operator); + } return entity.getId(); } @@ -321,4 +324,10 @@ protected String getSerializationType(StreamSource streamSource, String streamDa return DataTypeEnum.forType(streamDataType).getType(); } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public void getFieldInfo(SourceRequest request, String operator) { + LOGGER.info("not support get field info for source type ={}", request.getSourceType()); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index 1addda40d69..878dcd8b90f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -116,4 +116,11 @@ default Map> getSourcesMap(InlongGroupInfo groupInfo, */ void restartOpt(SourceRequest request, String operator); + /** + * Get the source field info. + * + * @param request request of source + */ + void getFieldInfo(SourceRequest request, String operator); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java index 6e75373363a..682710243e7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java @@ -17,23 +17,34 @@ package org.apache.inlong.manager.service.source.iceberg; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.pojo.sink.iceberg.IcebergColumnInfo; +import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource; import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceDTO; import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceRequest; import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.resource.sink.iceberg.IcebergCatalogUtils; import org.apache.inlong.manager.service.source.AbstractSourceOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; import java.util.List; /** @@ -42,6 +53,8 @@ @Service public class IcebergSourceOperator extends AbstractSourceOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSourceOperator.class); + @Autowired private ObjectMapper objectMapper; @@ -83,4 +96,52 @@ public StreamSource getFromEntity(StreamSourceEntity entity) { source.setFieldList(sourceFields); return source; } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public void getFieldInfo(SourceRequest request, String operator) { + IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request; + + LOGGER.info("get field for iceberg {}", sourceRequest); + String metastoreUri = sourceRequest.getUri(); + String dbName = sourceRequest.getDatabase(); + String tableName = sourceRequest.getTableName(); + boolean tableExists = IcebergCatalogUtils.tableExists(metastoreUri, dbName, tableName); + List streamFields = new ArrayList<>(); + if (tableExists) { + List existColumns = IcebergCatalogUtils.getColumns(metastoreUri, dbName, tableName); + for (IcebergColumnInfo columnInfo : existColumns) { + StreamField streamField = new StreamField(); + streamField.setFieldName(columnInfo.getName()); + streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getType())); + streamField.setFieldComment(columnInfo.getDesc()); + streamFields.add(streamField); + } + updateField(sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId(), streamFields); + } + } + + public void updateField(String groupId, String streamId, List fieldList) { + LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", groupId, streamId, + fieldList); + try { + streamFieldMapper.deleteAllByIdentifier(groupId, streamId); + if (CollectionUtils.isEmpty(fieldList)) { + return; + } + fieldList.forEach(streamField -> streamField.setId(null)); + List list = CommonBeanUtils.copyListProperties(fieldList, + InlongStreamFieldEntity::new); + for (InlongStreamFieldEntity entity : list) { + entity.setInlongGroupId(groupId); + entity.setInlongStreamId(streamId); + entity.setIsDeleted(InlongConstants.UN_DELETED); + } + streamFieldMapper.insertAll(list); + LOGGER.info("success to update inlong stream field for groupId={}", groupId); + } catch (Exception e) { + LOGGER.error("failed to update inlong stream field: ", e); + throw new BusinessException(ErrorCodeEnum.STREAM_FIELD_SAVE_FAILED); + } + } } From 197ac367cec19c3692349954f7b9c761f97a6691 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 11 Oct 2023 17:05:38 +0800 Subject: [PATCH 2/5] [INLONG-9041][Manager] Fix comment --- .../inlong/manager/service/source/StreamSourceOperator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index 878dcd8b90f..c5b4802ea02 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -120,6 +120,7 @@ default Map> getSourcesMap(InlongGroupInfo groupInfo, * Get the source field info. * * @param request request of source + * @param operator operator */ void getFieldInfo(SourceRequest request, String operator); From 2c49e0fc4ca1ac1d4e1d89d3950ebf269fe1831d Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 11 Oct 2023 17:13:33 +0800 Subject: [PATCH 3/5] [INLONG-9041][Manager] Fix comment --- .../apache/inlong/manager/pojo/source/SourceRequest.java | 2 +- .../manager/service/source/AbstractSourceOperator.java | 8 ++++---- .../manager/service/source/StreamSourceOperator.java | 4 ++-- .../service/source/iceberg/IcebergSourceOperator.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index 19e3fecba6e..f2d6f70b3ba 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -102,7 +102,7 @@ public class SourceRequest { private String snapshot; @ApiModelProperty(value = "Whether to get schema after saving or updating. Default is false") - private Boolean enableGetSchema = false; + private Boolean enableSyncSchema = false; @ApiModelProperty("Version") @NotNull(groups = UpdateValidation.class, message = "version cannot be null") diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index a448989d048..bb54145a06b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -97,8 +97,8 @@ public Integer saveOpt(SourceRequest request, Integer groupStatus, String operat setTargetEntity(request, entity); sourceMapper.insert(entity); saveFieldOpt(entity, request.getFieldList()); - if (request.getEnableGetSchema()) { - getFieldInfo(request, operator); + if (request.getEnableSyncSchema()) { + syncSourceFieldInfo(request, operator); } return entity.getId(); } @@ -327,7 +327,7 @@ protected String getSerializationType(StreamSource streamSource, String streamDa @Override @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) - public void getFieldInfo(SourceRequest request, String operator) { - LOGGER.info("not support get field info for source type ={}", request.getSourceType()); + public void syncSourceFieldInfo(SourceRequest request, String operator) { + LOGGER.info("not support sync source field info for type ={}", request.getSourceType()); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index c5b4802ea02..be1768b5997 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -117,11 +117,11 @@ default Map> getSourcesMap(InlongGroupInfo groupInfo, void restartOpt(SourceRequest request, String operator); /** - * Get the source field info. + * Sync the source field info to stream fields. * * @param request request of source * @param operator operator */ - void getFieldInfo(SourceRequest request, String operator); + void syncSourceFieldInfo(SourceRequest request, String operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java index 682710243e7..47aed17f98f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java @@ -99,7 +99,7 @@ public StreamSource getFromEntity(StreamSourceEntity entity) { @Override @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) - public void getFieldInfo(SourceRequest request, String operator) { + public void syncSourceFieldInfo(SourceRequest request, String operator) { IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request; LOGGER.info("get field for iceberg {}", sourceRequest); From bedd2f40070bf6367ee7d4c4029e027b04554841 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 11 Oct 2023 17:14:45 +0800 Subject: [PATCH 4/5] [INLONG-9041][Manager] Fix comment --- .../manager/service/source/iceberg/IcebergSourceOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java index 47aed17f98f..cb29ffd6ef3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java @@ -102,7 +102,7 @@ public StreamSource getFromEntity(StreamSourceEntity entity) { public void syncSourceFieldInfo(SourceRequest request, String operator) { IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request; - LOGGER.info("get field for iceberg {}", sourceRequest); + LOGGER.info("sync source field for iceberg {}", sourceRequest); String metastoreUri = sourceRequest.getUri(); String dbName = sourceRequest.getDatabase(); String tableName = sourceRequest.getTableName(); From 1bda1d348f7f4ab9af45ad8e63ce97473732fd72 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 11 Oct 2023 17:16:05 +0800 Subject: [PATCH 5/5] [INLONG-9041][Manager] Fix comment --- .../org/apache/inlong/manager/pojo/source/SourceRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index f2d6f70b3ba..f535483805f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -101,7 +101,7 @@ public class SourceRequest { @Length(min = 1, max = 163840, message = "length must be between 1 and 163840") private String snapshot; - @ApiModelProperty(value = "Whether to get schema after saving or updating. Default is false") + @ApiModelProperty(value = "Whether to sync schema from source after saving or updating. Default is false") private Boolean enableSyncSchema = false; @ApiModelProperty("Version")