Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7893][Manager] Support field description when parsing field by JSON #7894

Merged
merged 2 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,19 @@ public class InlongConstants {
public static final Set<String> STREAM_FIELD_TYPES =
Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp");

/**
* The name prop when batch parsing fields in JSON mode
*/
public static final String BATCH_PARSING_FILED_JSON_NAME_PROP = "name";

/**
* The type prop when batch parsing fields in JSON mode
*/
public static final String BATCH_PARSING_FILED_JSON_TYPE_PROP = "type";

/**
* The comment prop when batch parsing fields in JSON mode
*/
public static final String BATCH_PARSING_FILED_JSON_COMMENT_PROP = "desc";

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,19 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;

/**
* Implementation of sink service interface
Expand All @@ -93,7 +97,7 @@
public class StreamSinkServiceImpl implements StreamSinkService {

private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class);
private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]");
private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;

Expand Down Expand Up @@ -291,7 +295,8 @@ public List<SinkBriefInfo> listBrief(String groupId, String streamId) {
Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);

List<SinkBriefInfo> summaryList = sinkMapper.selectSummary(groupId, streamId);
LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
LOGGER.debug("success to list sink summary by groupId={}, streamId={}", groupId, streamId);

return summaryList;
}

Expand Down Expand Up @@ -710,25 +715,21 @@ public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
String method = parseFieldRequest.getMethod();
String statement = parseFieldRequest.getStatement();

Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
} else if (STATEMENT_TYPE_SQL.equals(method)) {
return parseFieldsBySql(statement);
} else {
return parseFieldsByCsv(statement);
switch (method) {
case STATEMENT_TYPE_JSON:
return parseFieldsByJson(statement);
case STATEMENT_TYPE_SQL:
return parseFieldsBySql(statement);
case STATEMENT_TYPE_CSV:
return parseFieldsByCsv(statement);
default:
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
String.format("Unsupported parse mode: %s", method));
}
return fieldsMap.entrySet().stream().map(entry -> {
SinkField field = new SinkField();
field.setFieldName(entry.getKey());
field.setFieldType(entry.getValue());
return field;
}).collect(Collectors.toList());

} catch (Exception e) {
LOGGER.error("parse sink fields error", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
String.format("parse sink fields error : %s", e.getMessage()));
String.format("parse sink fields error: %s", e.getMessage()));
}
}

Expand All @@ -741,7 +742,7 @@ private List<SinkField> parseFieldsByCsv(String statement) {
continue;
}

String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS);
String[] cols = PARSE_FIELD_CSV_SPLITTER.split(line, PARSE_FIELD_CSV_MAX_COLUMNS);
if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"At least two fields are required, line number is " + (i + 1));
Expand Down Expand Up @@ -771,21 +772,28 @@ private List<SinkField> parseFieldsBySql(String sql) throws JSQLParserException
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
List<SinkField> fields = new ArrayList<>();
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (ColumnDefinition definition : columnDefinitions) {
// get field name
String columnName = definition.getColumnName();
ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
// get field type
String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase();
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (!(statement instanceof CreateTable)) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");
}
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (ColumnDefinition definition : columnDefinitions) {
// get field name
String columnName = definition.getColumnName();
ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
SinkField sinkField = new SinkField();
sinkField.setFieldName(columnName);
// get field type
String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase();
sinkField.setFieldType(realDataType);
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (CollectionUtils.isNotEmpty(columnSpecs)) {
int commentIndex = -1;
for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) {
for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) {
String spec = columnSpecs.get(csIndex);
if (spec.toUpperCase().startsWith("COMMENT")) {
commentIndex = csIndex;
Expand All @@ -796,25 +804,26 @@ private List<SinkField> parseFieldsBySql(String sql) throws JSQLParserException
if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) {
comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", "");
}

SinkField sinkField = new SinkField();
sinkField.setFieldName(columnName);
sinkField.setFieldType(realDataType);
sinkField.setFieldComment(comment);
fields.add(sinkField);
}
} else {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");

fields.add(sinkField);
}
return fields;
}

private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
// Use LinkedHashMap deserialization to keep the order of the fields
return objectMapper.readValue(statement,
new TypeReference<LinkedHashMap<String, String>>() {
});
private List<SinkField> parseFieldsByJson(String statement) throws JsonProcessingException {
return objectMapper.readValue(statement, new TypeReference<List<Map<String, String>>>() {
}).stream().map(line -> {
String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP);
String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP);
String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP);
SinkField sinkField = new SinkField();
sinkField.setFieldName(name);
sinkField.setFieldType(type);
sinkField.setFieldComment(desc);
return sinkField;
}).collect(Collectors.toList());
}

private void checkSinkRequestParams(SinkRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,19 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FIELD_TYPES;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;

Expand Down Expand Up @@ -740,21 +743,17 @@ public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
String method = parseFieldRequest.getMethod();
String statement = parseFieldRequest.getStatement();

Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
} else if (STATEMENT_TYPE_SQL.equals(method)) {
return parseFieldsBySql(statement);
} else {
return parseFieldsByCsv(statement);
switch (method) {
case STATEMENT_TYPE_JSON:
return parseFieldsByJson(statement);
case STATEMENT_TYPE_SQL:
return parseFieldsBySql(statement);
case STATEMENT_TYPE_CSV:
return parseFieldsByCsv(statement);
default:
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
String.format("Unsupported parse field mode: %s", method));
}
return fieldsMap.entrySet().stream().map(entry -> {
StreamField field = new StreamField();
field.setFieldName(entry.getKey());
field.setFieldType(entry.getValue());
return field;
}).collect(Collectors.toList());

} catch (Exception e) {
LOGGER.error("parse inlong stream fields error", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
Expand Down Expand Up @@ -831,27 +830,35 @@ private List<StreamField> parseFieldsBySql(String sql) throws JSQLParserExceptio
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
List<StreamField> fields = new ArrayList<>();
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (int i = 0; i < columnDefinitions.size(); i++) {
ColumnDefinition definition = columnDefinitions.get(i);
// get field name
String columnName = definition.getColumnName();
ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
// convert SQL type to Java type
Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType);
if (clazz == Object.class) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType);
}
String type = clazz.getSimpleName().toLowerCase();
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (!(statement instanceof CreateTable)) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");
}
CreateTable createTable = (CreateTable) statement;
List<ColumnDefinition> columnDefinitions = createTable.getColumnDefinitions();
// get column definition
for (int i = 0; i < columnDefinitions.size(); i++) {
ColumnDefinition definition = columnDefinitions.get(i);
StreamField streamField = new StreamField();
// get field name
String columnName = definition.getColumnName();
streamField.setFieldName(columnName);

ColDataType colDataType = definition.getColDataType();
String sqlDataType = colDataType.getDataType();
// convert SQL type to Java type
Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType);
if (clazz == Object.class) {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType);
}
String type = clazz.getSimpleName().toLowerCase();
streamField.setFieldType(type);
// get field comment
List<String> columnSpecs = definition.getColumnSpecs();
if (CollectionUtils.isNotEmpty(columnSpecs)) {
int commentIndex = -1;
for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) {
for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) {
String spec = columnSpecs.get(csIndex);
if (spec.toUpperCase().startsWith("COMMENT")) {
commentIndex = csIndex;
Expand All @@ -862,25 +869,25 @@ private List<StreamField> parseFieldsBySql(String sql) throws JSQLParserExceptio
if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) {
comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", "");
}

StreamField streamField = new StreamField();
streamField.setFieldName(columnName);
streamField.setFieldType(type);
streamField.setFieldComment(comment);
fields.add(streamField);
}
} else {
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The SQL statement must be a table creation statement");
fields.add(streamField);
}
return fields;
}

private Map<String, String> parseFieldsByJson(String statement) throws JsonProcessingException {
// Use LinkedHashMap deserialization to keep the order of the fields
return objectMapper.readValue(statement,
new TypeReference<LinkedHashMap<String, String>>() {
});
private List<StreamField> parseFieldsByJson(String statement) throws JsonProcessingException {
return objectMapper.readValue(statement, new TypeReference<List<Map<String, String>>>() {
}).stream().map(line -> {
String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP);
String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP);
String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP);
StreamField streamField = new StreamField();
streamField.setFieldName(name);
streamField.setFieldType(type);
streamField.setFieldComment(desc);
return streamField;
}).collect(Collectors.toList());
}

/**
Expand Down
Loading