diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java index f1ac2a41999..2981d1982ba 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** * generator for flume config @@ -32,43 +33,48 @@ public class FlumeConfigGenerator { public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type"; public static final String KEY_SORT_SINK_TYPE = "sortSink.type"; public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type"; + public static final String KEY_SDK_START_TIME = "sortSdk.startTime"; + public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime"; public static Map generateFlumeConfiguration(SortTaskConfig taskConfig) { Map flumeConf = new HashMap<>(); String name = taskConfig.getName(); Map sinkParams = taskConfig.getSinkParams(); // channels - appendChannels(flumeConf, name); + appendChannels(flumeConf, name, sinkParams); // sinks appendSinks(flumeConf, name, sinkParams); // sources - appendSources(flumeConf, name); + appendSources(flumeConf, name, sinkParams); return flumeConf; } /** - * appendChannels + * append channels config * - * @param flumeConf + * @param flumeConf final config of flume + * @param name sort task name + * @param sinkParams sink params of this task */ - private static void appendChannels(Map flumeConf, String name) { + private static void appendChannels(Map flumeConf, String name, Map sinkParams) { StringBuilder builder = new StringBuilder(); String channelName = name + "Channel"; flumeConf.put(name + ".channels", channelName); String prefix = builder.append(name).append(".channels.").append(channelName).append(".").toString(); builder.setLength(0); String channelType = builder.append(prefix).append("type").toString(); - String channelClass = CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE); + String channelClass = sinkParams.getOrDefault(KEY_SORT_CHANNEL_TYPE, + CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE)); flumeConf.put(channelType, channelClass); appendCommon(flumeConf, prefix, null, name); } /** - * appendCommon + * appendCommon config * - * @param flumeConf - * @param prefix - * @param componentParams + * @param flumeConf final config of flume + * @param prefix prefix of common properties + * @param componentParams common properties */ private static void appendCommon( Map flumeConf, @@ -95,9 +101,11 @@ private static void appendCommon( } /** - * appendSinks + * append sink config * - * @param flumeConf + * @param flumeConf final config of flume + * @param name sort task name + * @param sinkParams sink params of this task */ private static void appendSinks(Map flumeConf, String name, Map sinkParams) { // sinks @@ -108,7 +116,8 @@ private static void appendSinks(Map flumeConf, String name, Map< // type builder.setLength(0); String sinkType = builder.append(prefix).append("type").toString(); - String sinkClass = CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE); + String sinkClass = sinkParams.getOrDefault(KEY_SORT_SINK_TYPE, + CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE)); flumeConf.put(sinkType, sinkClass); // channel builder.setLength(0); @@ -120,11 +129,16 @@ private static void appendSinks(Map flumeConf, String name, Map< } /** - * appendSources + * append source config * - * @param flumeConf + * @param flumeConf final config of flume + * @param name sort task name + * @param sinkParams sink params of this task */ - private static void appendSources(Map flumeConf, String name) { + private static void appendSources( + Map flumeConf, + String name, Map sinkParams) { // sources String sourceName = name + "Source"; flumeConf.put(name + ".sources", sourceName); @@ -133,7 +147,8 @@ private static void appendSources(Map flumeConf, String name) { // type builder.setLength(0); String sourceType = builder.append(prefix).append("type").toString(); - String sourceClass = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE); + String sourceClass = sinkParams.getOrDefault(KEY_SORT_SOURCE_TYPE, + CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE)); flumeConf.put(sourceType, sourceClass); // channel builder.setLength(0); @@ -144,7 +159,16 @@ private static void appendSources(Map flumeConf, String name) { builder.setLength(0); String selectorTypeKey = builder.append(prefix).append("selector.type").toString(); flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector"); - // + // valid msg time interval + builder.setLength(0); + String startTimeKey = builder.append(prefix).append(KEY_SDK_START_TIME).toString(); + Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME)) + .map(startTime -> flumeConf.put(startTimeKey, startTime)); + builder.setLength(0); + String stopTimeKey = builder.append(prefix).append(KEY_SDK_STOP_TIME).toString(); + Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME)) + .map(stopTime -> flumeConf.put(stopTimeKey, stopTime)); + appendCommon(flumeConf, prefix, null, name); } }