From 63382146c619741feac7a536add56bd8904b2fba Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Wed, 9 Oct 2024 11:18:46 +0800 Subject: [PATCH] [INLONG-11307][Manager] Fix the problem of unable to obtain extparams from multiple data sources --- .../service/source/AbstractSourceOperator.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 7c4c0c4882..5caf6d2473 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -74,6 +74,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; @@ -119,6 +120,9 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { private InlongStreamEntityMapper streamMapper; @Autowired private ObjectMapper objectMapper; + @Autowired + private AutowireCapableBeanFactory autowireCapableBeanFactory; + private SourceOperatorFactory operatorFactory; /** * Getting the source type. @@ -523,7 +527,12 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) { InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId); InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); - String extParams = getExtParams(entity); + if (operatorFactory == null) { + operatorFactory = new SourceOperatorFactory(); + autowireCapableBeanFactory.autowireBean(operatorFactory); + } + StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); + String extParams = sourceOperator.getExtParams(entity); if (groupEntity != null && streamEntity != null) { dataConfig.setState( SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))