diff --git a/docs/assets/jobs/redisreader.json b/docs/assets/jobs/redisreader.json index 1e995d45b..4ad3c5ba6 100644 --- a/docs/assets/jobs/redisreader.json +++ b/docs/assets/jobs/redisreader.json @@ -4,12 +4,10 @@ "reader": { "name": "redisreader", "parameter": { - "connection": [ - { - "uri": "tcp://127.0.0.1:7003", - "auth": "password" - } - ], + "connection": { + "uri": "tcp://127.0.0.1:7003", + "auth": "password" + }, "include": [ "^user" ], diff --git a/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisKey.java b/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisKey.java index 15931dc4d..fde896580 100644 --- a/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisKey.java +++ b/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisKey.java @@ -27,4 +27,8 @@ public class RedisKey public static final String MODE = "mode"; public static final String AUTH = "auth"; public static final String MASTER_NAME = "masterName"; + public static final String INCLUDE = "include"; + public static final String EXCLUDE = "exclude"; + public static final String DB = "db"; + public static final String KEY_THRESHOLD_LENGTH = "keyThresholdLength"; } diff --git a/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisReader.java b/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisReader.java index fb48dbd31..870c7fe61 100644 --- a/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisReader.java +++ b/plugin/reader/redisreader/src/main/java/com/wgzhao/addax/plugin/reader/redisreader/RedisReader.java @@ -96,21 +96,18 @@ public void init() private void validateParam() { - List connections = conf.getList("connection"); - for (Object connection : connections) { - Configuration conConf = Configuration.from(connection.toString()); - String uri = conConf.getString(RedisKey.URI); - if (uri == null || uri.isEmpty()) { - throw AddaxException.asAddaxException(REQUIRED_VALUE, "uri is null or empty"); - } - if (!(uri.startsWith("tcp") || uri.startsWith("file") || uri.startsWith("http") || uri.startsWith("https"))) { - throw AddaxException.asAddaxException(ILLEGAL_VALUE, "uri is not start with tcp, file, http or https"); - } - String mode = conConf.getString(RedisKey.MODE, "standalone"); - if ("sentinel".equalsIgnoreCase(mode)) { - // required other items - conConf.getNecessaryValue(RedisKey.MASTER_NAME, REQUIRED_VALUE); - } + Configuration conConf = conf.getConfiguration(RedisKey.CONNECTION); + String uri = conConf.getString(RedisKey.URI); + if (uri == null || uri.isEmpty()) { + throw AddaxException.asAddaxException(REQUIRED_VALUE, "uri is null or empty"); + } + if (!(uri.startsWith("tcp") || uri.startsWith("file") || uri.startsWith("http") || uri.startsWith("https"))) { + throw AddaxException.asAddaxException(ILLEGAL_VALUE, "uri is not start with tcp, file, http or https"); + } + String mode = conConf.getString(RedisKey.MODE, "standalone"); + if ("sentinel".equalsIgnoreCase(mode)) { + // required other items + conConf.getNecessaryValue(RedisKey.MASTER_NAME, REQUIRED_VALUE); } } @@ -160,66 +157,63 @@ public static class Task /** * value达到64m阀值,将记录该key */ - private int keyThresholdLength = 64 * 1024 * 1024; + private int keyThresholdLength; @Override public void startRead(RecordSender recordSender) { Configuration pluginJobConf = getPluginJobConf(); - List connections = pluginJobConf.getList("connection"); + Configuration connection = pluginJobConf.getConfiguration(RedisKey.CONNECTION); try { - for (Object obj : connections) { - Configuration connection = Configuration.from(obj.toString()); - String uri = connection.getString(RedisKey.URI); - String mode = connection.getString(RedisKey.MODE, "standalone"); - String masterName = connection.getString(RedisKey.MASTER_NAME, null); - File file = new File(UUID.randomUUID() + ".rdb"); - if (uri.startsWith("http") || uri.startsWith("https")) { - Request.get(uri).execute().saveContent(file); - } - else if (uri.startsWith("tcp")) { - this.dump(uriToHosts(uri), mode, connection.getString(RedisKey.AUTH), masterName, file); + String uri = connection.getString(RedisKey.URI); + String mode = connection.getString(RedisKey.MODE, "standalone"); + String masterName = connection.getString(RedisKey.MASTER_NAME, null); + File file = new File(UUID.randomUUID() + ".rdb"); + if (uri.startsWith("http") || uri.startsWith("https")) { + Request.get(uri).execute().saveContent(file); + } + else if (uri.startsWith("tcp")) { + this.dump(uriToHosts(uri), mode, connection.getString(RedisKey.AUTH), masterName, file); + } + else { + file = new File(uri); + } + + LOG.info("loading {} ", file.getAbsolutePath()); + RedisReplicator r = new RedisReplicator(file, FileType.RDB, com.moilioncircle.redis.replicator.Configuration.defaultSetting()); + r.addEventListener((replicator, event) -> { + if (event instanceof KeyStringValueString) { + KeyStringValueString dkv = (KeyStringValueString) event; + long dbNumber = dkv.getDb().getDbNumber(); + int rdbType = dkv.getValueRdbType(); + byte[] key = dkv.getKey(); + byte[] value = dkv.getValue(); + long expire = dkv.getExpiredMs() == null ? 0 : dkv.getExpiredMs(); + + //记录较大的key + recordBigKey(dbNumber, rdbType, key, value); + + //记录数据类型 + collectType(rdbType); + + if (Task.this.matchDB((int) dbNumber) && Task.this.matchKey(key)) { + Record record = recordSender.createRecord(); + record.addColumn(new LongColumn(dbNumber)); + record.addColumn(new LongColumn(rdbType)); + record.addColumn(new LongColumn(expire)); + record.addColumn(new BytesColumn(key)); + record.addColumn(new BytesColumn(value)); + recordSender.sendToWriter(record); + } } else { - file = new File(uri); + LOG.warn("The type is unsupported yet"); } - - LOG.info("loading {} ", file.getAbsolutePath()); - RedisReplicator r = new RedisReplicator(file, FileType.RDB, com.moilioncircle.redis.replicator.Configuration.defaultSetting()); - r.addEventListener((replicator, event) -> { - if (event instanceof KeyStringValueString) { - KeyStringValueString dkv = (KeyStringValueString) event; - long dbNumber = dkv.getDb().getDbNumber(); - int rdbType = dkv.getValueRdbType(); - byte[] key = dkv.getKey(); - byte[] value = dkv.getValue(); - long expire = dkv.getExpiredMs() == null ? 0 : dkv.getExpiredMs(); - - //记录较大的key - recordBigKey(dbNumber, rdbType, key, value); - - //记录数据类型 - collectType(rdbType); - - if (Task.this.matchDB((int) dbNumber) && Task.this.matchKey(key)) { - Record record = recordSender.createRecord(); - record.addColumn(new LongColumn(dbNumber)); - record.addColumn(new LongColumn(rdbType)); - record.addColumn(new LongColumn(expire)); - record.addColumn(new BytesColumn(key)); - record.addColumn(new BytesColumn(value)); - recordSender.sendToWriter(record); - } - } - else { - LOG.warn("The type is unsupported yet"); - } - }); - r.open(); - r.close(); - // delete temporary local file - Files.deleteIfExists(Paths.get(file.getAbsolutePath())); - } // end for + }); + r.open(); + r.close(); + // delete temporary local file + Files.deleteIfExists(Paths.get(file.getAbsolutePath())); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); @@ -230,10 +224,10 @@ else if (uri.startsWith("tcp")) { public void init() { Configuration pluginJobConf = this.getPluginJobConf(); - List include = pluginJobConf.getList("include"); - List exclude = pluginJobConf.getList("exclude"); - List db = pluginJobConf.getList("db"); - this.keyThresholdLength = pluginJobConf.getInt("keyThresholdLength", 64 * 1024 * 1024); + List include = pluginJobConf.getList(RedisKey.INCLUDE); + List exclude = pluginJobConf.getList(RedisKey.EXCLUDE); + List db = pluginJobConf.getList(RedisKey.DB); + this.keyThresholdLength = pluginJobConf.getInt(RedisKey.KEY_THRESHOLD_LENGTH, 64 * 1024 * 1024); if (include != null) { for (Object reg : include) { Pattern pattern = Pattern.compile(reg.toString());