diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 97c92f3616c..17b1ee6fa28 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -259,14 +259,16 @@ private TubeClientConfig initTubeConfig(String masterUrl) throws Exception { * @throws FlumeException if an RPC client connection could not be opened */ private void initCreateConnection() throws FlumeException { -// synchronized (tubeSessionLock) { + // check the TubeMQ address + if (masterHostAndPortLists == null || masterHostAndPortLists.isEmpty()) { + logger.warn("Failed to get TubeMQ Cluster, make sure register TubeMQ to manager successfully."); + return; + } // if already connected, just skip if (sessionFactories != null) { return; } sessionFactories = new HashMap<>(); - Preconditions.checkState(masterHostAndPortLists != null && !masterHostAndPortLists.isEmpty(), - "No tube service url specified"); for (String masterUrl : masterHostAndPortLists) { createConnection(masterUrl); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java index dad0da6d47e..cef15ad0a7e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java @@ -136,7 +136,7 @@ public void setConfigLogMetric(StreamConfigLogMetric streamConfigLogMetric) { } public void initCreateConnection(CreatePulsarClientCallBack callBack) { - if (pulsarUrl2token.isEmpty()) { + if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) { logger.warn("Failed to get Pulsar Cluster, make sure register pulsar to manager successfully."); return; } diff --git a/pom.xml b/pom.xml index 9f2983b6586..b99707eadcd 100644 --- a/pom.xml +++ b/pom.xml @@ -1007,12 +1007,6 @@ iceberg-flink-runtime-1.13 ${iceberg.flink.version} - - org.apache.iceberg - iceberg-flink-runtime-1.13 - ${iceberg.flink.version} - - org.apache.flink flink-table-planner-blink_${flink.scala.binary.version}