From d057a0540368b6275a86be3e71526e36f9337214 Mon Sep 17 00:00:00 2001 From: wgzhao Date: Mon, 23 Sep 2024 16:58:59 +0800 Subject: [PATCH] [feature][plugin][jsonreader] Add support for multiline json file (#1140) 1. Implement support for reading multi-line JSON files. 2. Introduce a `singleLine` option to toggle between reading single-line and multi-line JSON files, with the default set to single-line. --- docs/reader/jsonfilereader.md | 126 +++++++++++-- .../reader/jsonfilereader/JsonReader.java | 169 +++++++++++------- .../src/main/resources/plugin.json | 4 +- .../main/resources/plugin_job_template.json | 1 + 4 files changed, 222 insertions(+), 78 deletions(-) diff --git a/docs/reader/jsonfilereader.md b/docs/reader/jsonfilereader.md index 920ee9bf3..0b54ae0d0 100644 --- a/docs/reader/jsonfilereader.md +++ b/docs/reader/jsonfilereader.md @@ -13,17 +13,9 @@ JSON File Reader 提供了读取本地文件系统数据存储的能力。 其中 `/tmp/test*.json` 为同一个 json 文件的多个复制,内容如下: ```json -{ - "name": "zhangshan", - "id": 19890604, - "age": 12, - "score": { - "math": 92.5, - "english": 97.5, - "chinese": 95 - }, - "pubdate": "2020-09-05" -} +{"name": "zhangshan","id": 19890604,"age": 12,"score": {"math": 92.5,"english": 97.5,"chinese": 95},"pubdate": "2020-09-05"} +{"name": "lisi","id": 19890605,"age": 12,"score": {"math": 90.5,"english": 77.5,"chinese": 90},"pubdate": "2020-09-05"} +{"name": "wangwu","id": 19890606,"age": 12,"score": {"math": 89,"english": 100,"chinese": 92},"pubdate": "2020-09-05"} ``` ## 参数说明 @@ -35,14 +27,22 @@ JSON File Reader 提供了读取本地文件系统数据存储的能力。 | fieldDelimiter | 是 | string | `,` | 描述:读取的字段分隔符 | | compress | 否 | string | 无 | 文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、gzip、bzip2 | | encoding | 否 | string | utf-8 | 读取文件的编码配置 | +| singleLine | 否 | boolean | true | 每条数据是否为一行, 详见下文 | ### path -本地文件系统的路径信息,注意这里可以支持填写多个路径 +本地文件系统的路径信息,注意这里可以支持填写多个路径,比如: -- 当指定单个本地文件,JsonFileReader暂时只能使用单线程进行数据抽取。 -- 当指定多个本地文件,JsonFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。 -- 当指定通配符,JsonFileReader尝试遍历出多个文件信息。例如: 指定`/*` 代表读取/目录下所有的文件,指定`/bazhen/*` 代表读取bazhen目录下游所有的文件。 JsonFileReader目前只支持 `*` 作为文件通配符。 +```json +{ + "path": [ + "/var/ftp/test.json", // 读取 /var/ftp 目录下的 test.json 文件 + "/var/tmp/*.json", // 读取 /var/tmp 目录下所有 json 文件 + "/public/ftp", // 读取 /public/ftp 目录下所有文件, 如果 ftp 是文件的话,则直接读取 + "/public/a??.json" // 读取 /public 目录下所有 a 开头,后面跟两个字符,最后是 json 结尾的文件 + ] +} +``` 特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。 @@ -52,6 +52,102 @@ JSON File Reader 提供了读取本地文件系统数据存储的能力。 对于用户指定Column信息,type必须填写,index/value 必须选择其一 +### singleLine + +使用 JSON 格式存储数据,业界有两种方式,一种是每行一个 JSON 对象,也就是 `Single Line JSON(aka. JSONL or JSON Lines)`; +另一种是整个文件是一个 JSON 数组,每个元素是一个 JSON 对象,也就是 `Multiline JSON`。 + +Addax 默认支持每行一个 JSON 对象的格式,即 `singeLine = true`, 在这种情况下,要注意的是: + +1. 每行 JSON 对象的末尾不能有逗号,否则会解析失败。 +2. 一个JSON 对象不能跨行,否则会解析失败。 + +如果数据是整个文件是一个 JSON 数组,每个元素是一个 JSON 对象,需要设置 `singeLine` 为 `false`。 +假设上述列子中的数据用下面的格式表示: + +```json +{ + "result": [ + { + "name": "zhangshan", + "id": 19890604, + "age": 12, + "score": { + "math": 92.5, + "english": 97.5, + "chinese": 95 + }, + "pubdate": "2020-09-05" + }, + { + "name": "lisi", + "id": 19890605, + "age": 12, + "score": { + "math": 90.5, + "english": 77.5, + "chinese": 90 + }, + "pubdate": "2020-09-05" + }, + { + "name": "wangwu", + "id": 19890606, + "age": 12, + "score": { + "math": 89, + "english": 100, + "chinese": 92 + }, + "pubdate": "2020-09-05" + } + ] +} +``` + +因为这种格式是合法的 JSON 格式,因此每个 JSON 对象可以跨行。相应的,这类数据读取时,其 `path` 配置应该如下填写: + +```json +{ + "singleLine": false, + "column": [ + { + "index": "$.result[*].id", + "type": "long" + }, + { + "index": "$.result[*].name", + "type": "string" + }, + { + "index": "$.result[*].age", + "type": "long" + }, + { + "index": "$.result[*].score.math", + "type": "double" + }, + { + "index": "$.result[*].score.english", + "type": "double" + }, + { + "index": "$..result[*].pubdate", + "type": "date" + }, + { + "type": "string", + "value": "constant string" + } + ] +} +``` + +更详细的使用说明请参考 [Jayway JsonPath](https://github.com/json-path/JsonPath) 的语法。 + +注意: 这种数据在一个 JSON 数组里时,程序只能采取将整个文件读取到内存中,然后解析的方式,因此不适合大文件的读取。 +对于大文件的读取,建议使用每行一个 JSON 对象的格式,也就是 `Single Line JSON` 的格式,这种格式可以采取逐行读取的方式,不会占用太多内存。 + ## 类型转换 | Addax 内部类型 | 本地文件 数据类型 | diff --git a/plugin/reader/jsonfilereader/src/main/java/com/wgzhao/addax/plugin/reader/jsonfilereader/JsonReader.java b/plugin/reader/jsonfilereader/src/main/java/com/wgzhao/addax/plugin/reader/jsonfilereader/JsonReader.java index 79d5daf89..42d12c0ee 100644 --- a/plugin/reader/jsonfilereader/src/main/java/com/wgzhao/addax/plugin/reader/jsonfilereader/JsonReader.java +++ b/plugin/reader/jsonfilereader/src/main/java/com/wgzhao/addax/plugin/reader/jsonfilereader/JsonReader.java @@ -21,6 +21,10 @@ import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.ParseContext; +import com.jayway.jsonpath.ReadContext; +import com.jayway.jsonpath.TypeRef; import com.wgzhao.addax.common.base.Constant; import com.wgzhao.addax.common.base.Key; import com.wgzhao.addax.common.compress.ZipCycleInputStream; @@ -47,6 +51,7 @@ import java.io.BufferedInputStream; import java.io.BufferedReader; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -56,7 +61,11 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; import static com.wgzhao.addax.common.spi.ErrorCode.ENCODING_ERROR; @@ -94,11 +103,6 @@ private void validateParameter() // Compatible with the old version, path is a string before String pathInString = this.originConfig.getNecessaryValue(Key.PATH, REQUIRED_VALUE); - if (StringUtils.isBlank(pathInString)) { - throw AddaxException.asAddaxException( - REQUIRED_VALUE, - "您需要指定待读取的源目录或文件"); - } if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) { path = new ArrayList<>(); path.add(pathInString); @@ -108,7 +112,7 @@ private void validateParameter() if (null == path || path.isEmpty()) { throw AddaxException.asAddaxException( REQUIRED_VALUE, - "您需要指定待读取的源目录或文件"); + "The item `path` must be not empty"); } } @@ -125,13 +129,12 @@ private void validateParameter() catch (UnsupportedCharsetException uce) { throw AddaxException.asAddaxException( NOT_SUPPORT_TYPE, - String.format("不支持您配置的编码格式 : [%s]", encoding), uce); + "Not supported encoding type " + encoding, uce); } catch (Exception e) { throw AddaxException.asAddaxException( ENCODING_ERROR, - String.format("编码配置异常, 请联系我们: %s", e.getMessage()), - e); + "Encoding Error:", e); } } @@ -146,11 +149,13 @@ private void validateParameter() String columnValue = eachColumnConf.getString(Key.VALUE); if (null == columnIndex && null == columnValue) { - throw AddaxException.asAddaxException(CONFIG_ERROR, "由于您配置了type, 则至少需要配置 index 或 value"); + throw AddaxException.asAddaxException(CONFIG_ERROR, + "Either index or value is required for type configuration"); } if (null != columnIndex && null != columnValue) { - throw AddaxException.asAddaxException(CONFIG_ERROR, "您混合配置了index, value, 每一列同时仅能选择其中一种"); + throw AddaxException.asAddaxException(CONFIG_ERROR, + "Both index and value are set, only one is allowed"); } } } @@ -165,12 +170,6 @@ public void prepare() LOG.info("The number of files you will read: [{}]", this.sourceFiles.size()); } - @Override - public void post() - { - // - } - @Override public void destroy() { @@ -184,13 +183,11 @@ public List split(int adviceNumber) LOG.debug("begin to split..."); List readerSplitConfigs = new ArrayList<>(); - // warn:每个slice拖且仅拖一个文件, - // int splitNumber = adviceNumber - int splitNumber = this.sourceFiles.size(); + int splitNumber = Math.min(sourceFiles.size(), adviceNumber); if (0 == splitNumber) { throw AddaxException.asAddaxException( CONFIG_ERROR, - String.format("NOT find any file in your path: %s", originConfig.getString(Key.PATH))); + "none find path " + originConfig.getString(Key.PATH)); } List> splitSourceFiles = FileHelper.splitSourceFiles(sourceFiles, splitNumber); @@ -219,6 +216,9 @@ public static class Task private String compressType; private String encoding; + private ParseContext parse; + private boolean multiline; + @Override public void init() { @@ -227,13 +227,18 @@ public void init() this.columns = readerSliceConfig.getListConfiguration(Key.COLUMN); this.compressType = readerSliceConfig.getString(Key.COMPRESS, null); this.encoding = readerSliceConfig.getString(Key.ENCODING, "utf-8"); + this.multiline = readerSliceConfig.getBool("singleLine", true); + // return null for missing leafs. + com.jayway.jsonpath.Configuration jsonConf = com.jayway.jsonpath.Configuration.defaultConfiguration(); + jsonConf.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL); + this.parse = JsonPath.using(jsonConf); } //解析json,返回已经经过处理的行 private List parseFromJson(String json) { List splitLine = new ArrayList<>(); - DocumentContext document = JsonPath.parse(json); + DocumentContext document = parse.parse(json); String tempValue; for (Configuration eachColumnConf : columns) { String columnIndex = eachColumnConf.getString(Key.INDEX); @@ -245,12 +250,7 @@ private List parseFromJson(String json) tempValue = columnValue; } else { - try { - tempValue = document.read(columnIndex, columnType.getClass()); - } - catch (Exception ignore) { - tempValue = null; - } + tempValue = document.read(columnIndex, columnType.getClass()); } Column insertColumn = getColumn(columnType, tempValue, columnFormat); splitLine.add(insertColumn); @@ -309,9 +309,8 @@ private Column getColumn(String type, String columnValue, String columnFormat) } break; default: - String errorMessage = String.format("The type %s is unsupported", type); - LOG.error(errorMessage); - throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, errorMessage); + throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, + "The type" + type + " is unsupported"); } return columnGenerated; } @@ -326,18 +325,6 @@ private void transportOneRecord(RecordSender recordSender, List sourceLi recordSender.sendToWriter(record); } - @Override - public void prepare() - { - // - } - - @Override - public void post() - { - // - } - @Override public void destroy() { @@ -376,31 +363,91 @@ public void startRead(RecordSender recordSender) else { reader = new BufferedReader(new InputStreamReader(fileInputStream, encoding), Constant.DEFAULT_BUFFER_SIZE); } + } + catch (CompressorException | UnsupportedEncodingException e) { + throw AddaxException.asAddaxException(IO_ERROR, e); + } + if (multiline) { + multilineJsonParse(reader, recordSender); + } else { + singleJsonParse(reader, recordSender); + } + IOUtils.closeQuietly(reader, null); + } + LOG.debug("end reading source files..."); + } - // read the content - String jsonLine; + /** + * parse JSON Lines file + * each line is a json object + * + * @param reader {@link BufferedReader} + * @param recordSender {@link RecordSender} + */ + private void multilineJsonParse(BufferedReader reader, RecordSender recordSender) + { + // read the content + String jsonLine; + try { + jsonLine = reader.readLine(); + while (jsonLine != null) { + List sourceLine = parseFromJson(jsonLine); + transportOneRecord(recordSender, sourceLine); + recordSender.flush(); jsonLine = reader.readLine(); - while (jsonLine != null) { - List sourceLine = parseFromJson(jsonLine); - transportOneRecord(recordSender, sourceLine); - recordSender.flush(); - jsonLine = reader.readLine(); - } } - catch (CompressorException | UnsupportedEncodingException e) { - e.printStackTrace(); + } + catch (IOException e) { + throw AddaxException.asAddaxException(IO_ERROR, e); + } + } + + private void singleJsonParse(BufferedReader reader, RecordSender recordSender) + { + StringBuilder jsonBuffer = new StringBuilder(); + String line; + try { + while ((line = reader.readLine()) != null) { + jsonBuffer.append(line); } - catch (IOException e) { - // warn: 有可能本地无法读取文件 - String message = String.format("Failed to open file %s", fileName); - LOG.error(message); - throw AddaxException.asAddaxException(IO_ERROR, message); + } + catch (IOException e) { + throw AddaxException.asAddaxException(IO_ERROR, e); + } + + DocumentContext ctx = parse.parse(jsonBuffer.toString()); + List> jsonColumns = new ArrayList<>(); + List sourceLine = new ArrayList<>(); + int recordNum = -1; + List placeHolder = Collections.emptyList(); + for (Configuration col: columns) { + if (col.getString(Key.VALUE) == null) { + if (recordNum < 0) { + List jsonColumn = ctx.read(col.getString(Key.INDEX)); + recordNum = jsonColumn.size(); + jsonColumns.add(jsonColumn); + } else { + jsonColumns.add(ctx.read(col.getString(Key.INDEX))); + } + } else { + // the column use constant, mark it + jsonColumns.add(placeHolder); } - finally { - IOUtils.closeQuietly(reader, null); + } + for (int i =0 ;i < recordNum; i++) { + for (int j=0; j < columns.size(); j++) { + Configuration column = columns.get(j); + if (jsonColumns.get(j).isEmpty()) { + // use constant value + sourceLine.add(getColumn(column.getString(Key.TYPE), column.getString(Key.VALUE), column.getString(Key.FORMAT))); + } else { + sourceLine.add(getColumn(column.getString(Key.TYPE), String.valueOf(jsonColumns.get(j).get(i)), column.getString(Key.FORMAT))); + } } + transportOneRecord(recordSender, sourceLine); + recordSender.flush(); + sourceLine.clear(); } - LOG.debug("end reading source files..."); } } } diff --git a/plugin/reader/jsonfilereader/src/main/resources/plugin.json b/plugin/reader/jsonfilereader/src/main/resources/plugin.json index b1bd2f1d5..643b1ee29 100644 --- a/plugin/reader/jsonfilereader/src/main/resources/plugin.json +++ b/plugin/reader/jsonfilereader/src/main/resources/plugin.json @@ -1,6 +1,6 @@ { "name": "jsonfilereader", "class": "com.wgzhao.addax.plugin.reader.jsonfilereader.JsonReader", - "description": "useScene: test. mechanism: use addax framework to transport data from json file. warn: The more you know about the data, the less problems you encounter.", - "developer": "szunicom" + "description": "read json file, support JSON and JSON Lines format", + "developer": "szunicom, wgzhao" } diff --git a/plugin/reader/jsonfilereader/src/main/resources/plugin_job_template.json b/plugin/reader/jsonfilereader/src/main/resources/plugin_job_template.json index 781ce9caa..06b7864a2 100644 --- a/plugin/reader/jsonfilereader/src/main/resources/plugin_job_template.json +++ b/plugin/reader/jsonfilereader/src/main/resources/plugin_job_template.json @@ -4,6 +4,7 @@ "path": [ "/tmp/test*.json" ], + "singleLine": true, "column": [ { "index": "$.id",