From 2e70a12a2f95e9c8eacdc6ce91afa6f4297e2df3 Mon Sep 17 00:00:00 2001 From: castor <58140421+castorqin@users.noreply.github.com> Date: Sat, 4 May 2024 14:25:39 +0800 Subject: [PATCH] [Manager] Fix sort standalone get kafka config error (#10106) Co-authored-by: castorqin --- .../node/kafka/KafkaDataNodeOperator.java | 14 ++++++++++++ .../service/sink/kafka/KafkaSinkOperator.java | 22 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java index ae91b2f394a..4fbc740d368 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java @@ -40,6 +40,8 @@ import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; +import java.util.Map; + /** * Kafka data node operator */ @@ -48,6 +50,9 @@ public class KafkaDataNodeOperator extends AbstractDataNodeOperator { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDataNodeOperator.class); + private static final String bootstrapServers = "bootstrap.servers"; + private static final String clientId = "client.id"; + @Autowired private ObjectMapper objectMapper; @@ -79,6 +84,15 @@ public DataNodeInfo getFromEntity(DataNodeEntity entity) { return kafkaDataNodeInfo; } + @Override + public Map parse2SinkParams(DataNodeInfo info) { + Map params = super.parse2SinkParams(info); + KafkaDataNodeInfo kafkaDataNodeInfo = (KafkaDataNodeInfo) info; + params.put(bootstrapServers, kafkaDataNodeInfo.getBootstrapServers()); + params.put(clientId, kafkaDataNodeInfo.getClientId()); + return params; + } + @Override protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { KafkaDataNodeRequest nodeRequest = (KafkaDataNodeRequest) request; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java index 4357556732f..d7fa197c9ff 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -30,6 +31,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,7 @@ import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; /** * Kafka sink operator @@ -46,6 +49,8 @@ public class KafkaSinkOperator extends AbstractSinkOperator { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkOperator.class); + private static final String topic = "topic"; + @Autowired private ObjectMapper objectMapper; @@ -75,6 +80,23 @@ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntit } } + @Override + public Map parse2IdParams(StreamSinkEntity streamSink, List fields, + DataNodeInfo dataNodeInfo) { + + Map params = super.parse2IdParams(streamSink, fields, dataNodeInfo); + + KafkaSinkDTO kafkaSinkDTO; + try { + kafkaSinkDTO = objectMapper.readValue(streamSink.getExtParams(), KafkaSinkDTO.class); + } catch (JsonProcessingException e) { + LOGGER.error("parse kafka sink dto error", e); + return params; + } + params.put(topic, kafkaSinkDTO.getTopicName()); + return params; + } + @Override public StreamSink getFromEntity(StreamSinkEntity entity) { KafkaSink sink = new KafkaSink();