Skip to content

Commit

Permalink
[update][plugin][redisreader] Refactor connection item from list to…
Browse files Browse the repository at this point in the history
… string
  • Loading branch information
wgzhao committed Oct 9, 2024
1 parent e9cdef5 commit 4ea03b3
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 76 deletions.
10 changes: 4 additions & 6 deletions docs/assets/jobs/redisreader.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
"reader": {
"name": "redisreader",
"parameter": {
"connection": [
{
"uri": "tcp://127.0.0.1:7003",
"auth": "password"
}
],
"connection": {
"uri": "tcp://127.0.0.1:7003",
"auth": "password"
},
"include": [
"^user"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ public class RedisKey
public static final String MODE = "mode";
public static final String AUTH = "auth";
public static final String MASTER_NAME = "masterName";
public static final String INCLUDE = "include";
public static final String EXCLUDE = "exclude";
public static final String DB = "db";
public static final String KEY_THRESHOLD_LENGTH = "keyThresholdLength";
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,18 @@ public void init()

private void validateParam()
{
List<Object> connections = conf.getList("connection");
for (Object connection : connections) {
Configuration conConf = Configuration.from(connection.toString());
String uri = conConf.getString(RedisKey.URI);
if (uri == null || uri.isEmpty()) {
throw AddaxException.asAddaxException(REQUIRED_VALUE, "uri is null or empty");
}
if (!(uri.startsWith("tcp") || uri.startsWith("file") || uri.startsWith("http") || uri.startsWith("https"))) {
throw AddaxException.asAddaxException(ILLEGAL_VALUE, "uri is not start with tcp, file, http or https");
}
String mode = conConf.getString(RedisKey.MODE, "standalone");
if ("sentinel".equalsIgnoreCase(mode)) {
// required other items
conConf.getNecessaryValue(RedisKey.MASTER_NAME, REQUIRED_VALUE);
}
Configuration conConf = conf.getConfiguration(RedisKey.CONNECTION);
String uri = conConf.getString(RedisKey.URI);
if (uri == null || uri.isEmpty()) {
throw AddaxException.asAddaxException(REQUIRED_VALUE, "uri is null or empty");
}
if (!(uri.startsWith("tcp") || uri.startsWith("file") || uri.startsWith("http") || uri.startsWith("https"))) {
throw AddaxException.asAddaxException(ILLEGAL_VALUE, "uri is not start with tcp, file, http or https");
}
String mode = conConf.getString(RedisKey.MODE, "standalone");
if ("sentinel".equalsIgnoreCase(mode)) {
// required other items
conConf.getNecessaryValue(RedisKey.MASTER_NAME, REQUIRED_VALUE);
}
}

Expand Down Expand Up @@ -160,66 +157,63 @@ public static class Task
/**
* value达到64m阀值,将记录该key
*/
private int keyThresholdLength = 64 * 1024 * 1024;
private int keyThresholdLength;

@Override
public void startRead(RecordSender recordSender)
{
Configuration pluginJobConf = getPluginJobConf();
List<Object> connections = pluginJobConf.getList("connection");
Configuration connection = pluginJobConf.getConfiguration(RedisKey.CONNECTION);
try {
for (Object obj : connections) {
Configuration connection = Configuration.from(obj.toString());
String uri = connection.getString(RedisKey.URI);
String mode = connection.getString(RedisKey.MODE, "standalone");
String masterName = connection.getString(RedisKey.MASTER_NAME, null);
File file = new File(UUID.randomUUID() + ".rdb");
if (uri.startsWith("http") || uri.startsWith("https")) {
Request.get(uri).execute().saveContent(file);
}
else if (uri.startsWith("tcp")) {
this.dump(uriToHosts(uri), mode, connection.getString(RedisKey.AUTH), masterName, file);
String uri = connection.getString(RedisKey.URI);
String mode = connection.getString(RedisKey.MODE, "standalone");
String masterName = connection.getString(RedisKey.MASTER_NAME, null);
File file = new File(UUID.randomUUID() + ".rdb");
if (uri.startsWith("http") || uri.startsWith("https")) {
Request.get(uri).execute().saveContent(file);
}
else if (uri.startsWith("tcp")) {
this.dump(uriToHosts(uri), mode, connection.getString(RedisKey.AUTH), masterName, file);
}
else {
file = new File(uri);
}

LOG.info("loading {} ", file.getAbsolutePath());
RedisReplicator r = new RedisReplicator(file, FileType.RDB, com.moilioncircle.redis.replicator.Configuration.defaultSetting());
r.addEventListener((replicator, event) -> {
if (event instanceof KeyStringValueString) {
KeyStringValueString dkv = (KeyStringValueString) event;
long dbNumber = dkv.getDb().getDbNumber();
int rdbType = dkv.getValueRdbType();
byte[] key = dkv.getKey();
byte[] value = dkv.getValue();
long expire = dkv.getExpiredMs() == null ? 0 : dkv.getExpiredMs();

//记录较大的key
recordBigKey(dbNumber, rdbType, key, value);

//记录数据类型
collectType(rdbType);

if (Task.this.matchDB((int) dbNumber) && Task.this.matchKey(key)) {
Record record = recordSender.createRecord();
record.addColumn(new LongColumn(dbNumber));
record.addColumn(new LongColumn(rdbType));
record.addColumn(new LongColumn(expire));
record.addColumn(new BytesColumn(key));
record.addColumn(new BytesColumn(value));
recordSender.sendToWriter(record);
}
}
else {
file = new File(uri);
LOG.warn("The type is unsupported yet");
}

LOG.info("loading {} ", file.getAbsolutePath());
RedisReplicator r = new RedisReplicator(file, FileType.RDB, com.moilioncircle.redis.replicator.Configuration.defaultSetting());
r.addEventListener((replicator, event) -> {
if (event instanceof KeyStringValueString) {
KeyStringValueString dkv = (KeyStringValueString) event;
long dbNumber = dkv.getDb().getDbNumber();
int rdbType = dkv.getValueRdbType();
byte[] key = dkv.getKey();
byte[] value = dkv.getValue();
long expire = dkv.getExpiredMs() == null ? 0 : dkv.getExpiredMs();

//记录较大的key
recordBigKey(dbNumber, rdbType, key, value);

//记录数据类型
collectType(rdbType);

if (Task.this.matchDB((int) dbNumber) && Task.this.matchKey(key)) {
Record record = recordSender.createRecord();
record.addColumn(new LongColumn(dbNumber));
record.addColumn(new LongColumn(rdbType));
record.addColumn(new LongColumn(expire));
record.addColumn(new BytesColumn(key));
record.addColumn(new BytesColumn(value));
recordSender.sendToWriter(record);
}
}
else {
LOG.warn("The type is unsupported yet");
}
});
r.open();
r.close();
// delete temporary local file
Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
} // end for
});
r.open();
r.close();
// delete temporary local file
Files.deleteIfExists(Paths.get(file.getAbsolutePath()));
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
Expand All @@ -230,10 +224,10 @@ else if (uri.startsWith("tcp")) {
public void init()
{
Configuration pluginJobConf = this.getPluginJobConf();
List<Object> include = pluginJobConf.getList("include");
List<Object> exclude = pluginJobConf.getList("exclude");
List<Object> db = pluginJobConf.getList("db");
this.keyThresholdLength = pluginJobConf.getInt("keyThresholdLength", 64 * 1024 * 1024);
List<Object> include = pluginJobConf.getList(RedisKey.INCLUDE);
List<Object> exclude = pluginJobConf.getList(RedisKey.EXCLUDE);
List<Object> db = pluginJobConf.getList(RedisKey.DB);
this.keyThresholdLength = pluginJobConf.getInt(RedisKey.KEY_THRESHOLD_LENGTH, 64 * 1024 * 1024);
if (include != null) {
for (Object reg : include) {
Pattern pattern = Pattern.compile(reg.toString());
Expand Down

0 comments on commit 4ea03b3

Please sign in to comment.