Skip to content

Commit

Permalink
[INLONG-11306][Agent] Modify the naming of variables in the redis sou…
Browse files Browse the repository at this point in the history
…rce (apache#11309)
  • Loading branch information
justinwwhuang authored Oct 9, 2024
1 parent 14463d1 commit c6a66e8
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_REDIS_READTIMEOUT = "task.redisTask.readTimeout";
public static final String TASK_REDIS_REPLID = "task.redisTask.replId";
public static final String TASK_REDIS_OFFSET = "task.redisTask.offset";
public static final String TASK_REDIS_DB_NUMBER = "task.redisTask.dbNumber";
public static final String TASK_REDIS_DB_NAME = "task.redisTask.dbName";
public static final String TASK_REDIS_COMMAND = "task.redisTask.command";
public static final String TASK_REDIS_KEYS = "task.redisTask.keys";
public static final String TASK_REDIS_FIELD_OR_MEMBER = "task.redisTask.fieldOrMember";
public static final String TASK_REDIS_IS_SUBSCRIBE = "task.redisTask.isSubscribe";
public static final String TASK_REDIS_SUBOPERATION = "task.redisTask.subOperation";
public static final String TASK_REDIS_SUBSCRIPTION_OPERATION = "task.redisTask.subscriptionOperation";
public static final String TASK_REDIS_SYNC_FREQ = "task.redisTask.syncFreq";

public static final String TASK_STATE = "task.state";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.agent.pojo;

import lombok.Data;

@Data
public class RedisTask {

Expand All @@ -29,13 +30,13 @@ public class RedisTask {
private String readTimeout;
private String queueSize;
private String replId;
private String dbNumber;
private String dbName;
private String command;
private String keys;
private String fieldOrMember;
private Boolean isSubscribe;
private String syncFreq;
private String subOperations;
private String subscriptionOperation;

@Data
public static class RedisTaskConfig {
Expand All @@ -48,12 +49,12 @@ public static class RedisTaskConfig {
private String timeout;
private String queueSize;
private String replId;
private String dbNumber;
private String dbName;
private String command;
private String keys;
private String fieldOrMember;
private Boolean isSubscribe;
private String syncFreq;
private String subOperations;
private String subscriptionOperation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ private static RedisTask getRedisTask(DataConfig dataConfig) {
redisTask.setReadTimeout(config.getTimeout());
redisTask.setReplId(config.getReplId());
redisTask.setCommand(config.getCommand());
redisTask.setDbNumber(config.getDbNumber());
redisTask.setDbName(config.getDbName());
redisTask.setKeys(config.getKeys());
redisTask.setFieldOrMember(config.getFieldOrMember());
redisTask.setIsSubscribe(config.getIsSubscribe());
redisTask.setSyncFreq(config.getSyncFreq());
redisTask.setSubOperations(config.getSubOperations());
redisTask.setSubscriptionOperation(config.getSubscriptionOperation());

return redisTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ public class RedisSource extends AbstractSource {
private String readTimeout;
private String replId;
private String snapShot;
private String dbNumber;
private String dbName;
private String redisCommand;

private String fieldOrMember;
private boolean destroyed;
private boolean isSubscribe;
private Set<String> keys;
private Set<String> subOperations;
private Set<String> subscribeOperations;
private Replicator redisReplicator;
private BlockingQueue<SourceData> redisQueue;
private ScheduledExecutorService executor;
Expand Down Expand Up @@ -136,7 +136,7 @@ protected void initSource(InstanceProfile profile) {
this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, "");
this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
this.dbName = profile.get(TaskConstants.TASK_REDIS_DB_NAME);
this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
this.instanceId = profile.getInstanceId();
Expand All @@ -146,8 +146,8 @@ protected void initSource(InstanceProfile profile) {
try {
if (isSubscribe) {
// use subscribe mode
this.subOperations = new ConcurrentSkipListSet<>(
Arrays.asList(profile.get(TaskConstants.TASK_REDIS_SUBOPERATION).split(",")));
this.subscribeOperations = new ConcurrentSkipListSet<>(
Arrays.asList(profile.get(TaskConstants.TASK_REDIS_SUBSCRIPTION_OPERATION).split(",")));
this.executor = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
this.redisReplicator = new RedisReplicator(uri);
initReplicator();
Expand Down Expand Up @@ -364,8 +364,8 @@ public boolean sourceExist() {
private String getRedisUri() {
StringBuffer sb = new StringBuffer("redis://");
sb.append(hostName).append(":").append(port);
if (!StringUtils.isEmpty(dbNumber)) {
sb.append("/").append(dbNumber);
if (!StringUtils.isEmpty(dbName)) {
sb.append("/").append(dbName);
}
sb.append("?");
if (!StringUtils.isEmpty(authPassword)) {
Expand Down Expand Up @@ -393,9 +393,9 @@ private String getRedisUri() {
}

private void initReplicator() {
if (!subOperations.isEmpty()) {
if (!subscribeOperations.isEmpty()) {
DefaultCommandParser replicatorCommandParser = new DefaultCommandParser();
for (String subOperation : subOperations) {
for (String subOperation : subscribeOperations) {
this.redisReplicator.addCommandParser(CommandName.name(subOperation), replicatorCommandParser);
}
this.redisReplicator.addEventListener((replicator, event) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void initProfile() {
profile.set(TaskConstants.TASK_REDIS_COMMAND, command);
profile.set(TaskConstants.TASK_REDIS_KEYS, keys);
profile.set(TaskConstants.TASK_AUDIT_VERSION, "0");
profile.set(TaskConstants.TASK_REDIS_SUBOPERATION, subOperation);
profile.set(TaskConstants.TASK_REDIS_SUBSCRIPTION_OPERATION, subOperation);
profile.setInstanceId(instanceId);
}

Expand Down

0 comments on commit c6a66e8

Please sign in to comment.