Skip to content

Commit

Permalink
[INLONG-5117][SortStandalone] Support specify component type from rem…
Browse files Browse the repository at this point in the history
…ote config (apache#5134)
  • Loading branch information
vernedeng authored and bruceneenhl committed Aug 12, 2022
1 parent 87abf3f commit 273478d
Showing 1 changed file with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* generator for flume config
Expand All @@ -32,43 +33,48 @@ public class FlumeConfigGenerator {
public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
public static final String KEY_SDK_START_TIME = "sortSdk.startTime";
public static final String KEY_SDK_STOP_TIME = "sortSdk.stopTime";

public static Map<String, String> generateFlumeConfiguration(SortTaskConfig taskConfig) {
Map<String, String> flumeConf = new HashMap<>();
String name = taskConfig.getName();
Map<String, String> sinkParams = taskConfig.getSinkParams();
// channels
appendChannels(flumeConf, name);
appendChannels(flumeConf, name, sinkParams);
// sinks
appendSinks(flumeConf, name, sinkParams);
// sources
appendSources(flumeConf, name);
appendSources(flumeConf, name, sinkParams);
return flumeConf;
}

/**
* appendChannels
* append channels config
*
* @param flumeConf
* @param flumeConf final config of flume
* @param name sort task name
* @param sinkParams sink params of this task
*/
private static void appendChannels(Map<String, String> flumeConf, String name) {
private static void appendChannels(Map<String, String> flumeConf, String name, Map<String, String> sinkParams) {
StringBuilder builder = new StringBuilder();
String channelName = name + "Channel";
flumeConf.put(name + ".channels", channelName);
String prefix = builder.append(name).append(".channels.").append(channelName).append(".").toString();
builder.setLength(0);
String channelType = builder.append(prefix).append("type").toString();
String channelClass = CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
String channelClass = sinkParams.getOrDefault(KEY_SORT_CHANNEL_TYPE,
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE));
flumeConf.put(channelType, channelClass);
appendCommon(flumeConf, prefix, null, name);
}

/**
* appendCommon
* appendCommon config
*
* @param flumeConf
* @param prefix
* @param componentParams
* @param flumeConf final config of flume
* @param prefix prefix of common properties
* @param componentParams common properties
*/
private static void appendCommon(
Map<String, String> flumeConf,
Expand All @@ -95,9 +101,11 @@ private static void appendCommon(
}

/**
* appendSinks
* append sink config
*
* @param flumeConf
* @param flumeConf final config of flume
* @param name sort task name
* @param sinkParams sink params of this task
*/
private static void appendSinks(Map<String, String> flumeConf, String name, Map<String, String> sinkParams) {
// sinks
Expand All @@ -108,7 +116,8 @@ private static void appendSinks(Map<String, String> flumeConf, String name, Map<
// type
builder.setLength(0);
String sinkType = builder.append(prefix).append("type").toString();
String sinkClass = CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
String sinkClass = sinkParams.getOrDefault(KEY_SORT_SINK_TYPE,
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE));
flumeConf.put(sinkType, sinkClass);
// channel
builder.setLength(0);
Expand All @@ -120,11 +129,16 @@ private static void appendSinks(Map<String, String> flumeConf, String name, Map<
}

/**
* appendSources
* append source config
*
* @param flumeConf
* @param flumeConf final config of flume
* @param name sort task name
* @param sinkParams sink params of this task
*/
private static void appendSources(Map<String, String> flumeConf, String name) {
private static void appendSources(
Map<String, String> flumeConf,
String name, Map<String,
String> sinkParams) {
// sources
String sourceName = name + "Source";
flumeConf.put(name + ".sources", sourceName);
Expand All @@ -133,7 +147,8 @@ private static void appendSources(Map<String, String> flumeConf, String name) {
// type
builder.setLength(0);
String sourceType = builder.append(prefix).append("type").toString();
String sourceClass = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
String sourceClass = sinkParams.getOrDefault(KEY_SORT_SOURCE_TYPE,
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE));
flumeConf.put(sourceType, sourceClass);
// channel
builder.setLength(0);
Expand All @@ -144,7 +159,16 @@ private static void appendSources(Map<String, String> flumeConf, String name) {
builder.setLength(0);
String selectorTypeKey = builder.append(prefix).append("selector.type").toString();
flumeConf.put(selectorTypeKey, "org.apache.flume.channel.ReplicatingChannelSelector");
//
// valid msg time interval
builder.setLength(0);
String startTimeKey = builder.append(prefix).append(KEY_SDK_START_TIME).toString();
Optional.ofNullable(sinkParams.get(KEY_SDK_START_TIME))
.map(startTime -> flumeConf.put(startTimeKey, startTime));
builder.setLength(0);
String stopTimeKey = builder.append(prefix).append(KEY_SDK_STOP_TIME).toString();
Optional.ofNullable(sinkParams.get(KEY_SDK_STOP_TIME))
.map(stopTime -> flumeConf.put(stopTimeKey, stopTime));

appendCommon(flumeConf, prefix, null, name);
}
}

0 comments on commit 273478d

Please sign in to comment.