diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index f83a559fc7..1579bb0730 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -168,4 +168,19 @@ public class InlongConstants { public static final Set STREAM_FIELD_TYPES = Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp"); + /** + * The name prop when batch parsing fields in JSON mode + */ + public static final String BATCH_PARSING_FILED_JSON_NAME_PROP = "name"; + + /** + * The type prop when batch parsing fields in JSON mode + */ + public static final String BATCH_PARSING_FILED_JSON_TYPE_PROP = "type"; + + /** + * The comment prop when batch parsing fields in JSON mode + */ + public static final String BATCH_PARSING_FILED_JSON_COMMENT_PROP = "desc"; + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index d5e15dfe02..f369d22140 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -76,15 +76,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET; import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS; +import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP; /** * Implementation of sink service interface @@ -93,7 +97,7 @@ public class StreamSinkServiceImpl implements StreamSinkService { private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class); - private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,"; + private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]"); private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3; private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2; @@ -291,7 +295,8 @@ public List listBrief(String groupId, String streamId) { Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY); List summaryList = sinkMapper.selectSummary(groupId, streamId); - LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId); + LOGGER.debug("success to list sink summary by groupId={}, streamId={}", groupId, streamId); + return summaryList; } @@ -710,25 +715,21 @@ public List parseFields(ParseFieldRequest parseFieldRequest) { String method = parseFieldRequest.getMethod(); String statement = parseFieldRequest.getStatement(); - Map fieldsMap; - if (STATEMENT_TYPE_JSON.equals(method)) { - fieldsMap = parseFieldsByJson(statement); - } else if (STATEMENT_TYPE_SQL.equals(method)) { - return parseFieldsBySql(statement); - } else { - return parseFieldsByCsv(statement); + switch (method) { + case STATEMENT_TYPE_JSON: + return parseFieldsByJson(statement); + case STATEMENT_TYPE_SQL: + return parseFieldsBySql(statement); + case STATEMENT_TYPE_CSV: + return parseFieldsByCsv(statement); + default: + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + String.format("Unsupported parse mode: %s", method)); } - return fieldsMap.entrySet().stream().map(entry -> { - SinkField field = new SinkField(); - field.setFieldName(entry.getKey()); - field.setFieldType(entry.getValue()); - return field; - }).collect(Collectors.toList()); } catch (Exception e) { - LOGGER.error("parse sink fields error", e); throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - String.format("parse sink fields error : %s", e.getMessage())); + String.format("parse sink fields error: %s", e.getMessage())); } } @@ -741,7 +742,7 @@ private List parseFieldsByCsv(String statement) { continue; } - String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS); + String[] cols = PARSE_FIELD_CSV_SPLITTER.split(line, PARSE_FIELD_CSV_MAX_COLUMNS); if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) { throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "At least two fields are required, line number is " + (i + 1)); @@ -771,21 +772,28 @@ private List parseFieldsBySql(String sql) throws JSQLParserException CCJSqlParserManager pm = new CCJSqlParserManager(); Statement statement = pm.parse(new StringReader(sql)); List fields = new ArrayList<>(); - if (statement instanceof CreateTable) { - CreateTable createTable = (CreateTable) statement; - List columnDefinitions = createTable.getColumnDefinitions(); - // get column definition - for (ColumnDefinition definition : columnDefinitions) { - // get field name - String columnName = definition.getColumnName(); - ColDataType colDataType = definition.getColDataType(); - String sqlDataType = colDataType.getDataType(); - // get field type - String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase(); - // get field comment - List columnSpecs = definition.getColumnSpecs(); + if (!(statement instanceof CreateTable)) { + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + "The SQL statement must be a table creation statement"); + } + CreateTable createTable = (CreateTable) statement; + List columnDefinitions = createTable.getColumnDefinitions(); + // get column definition + for (ColumnDefinition definition : columnDefinitions) { + // get field name + String columnName = definition.getColumnName(); + ColDataType colDataType = definition.getColDataType(); + String sqlDataType = colDataType.getDataType(); + SinkField sinkField = new SinkField(); + sinkField.setFieldName(columnName); + // get field type + String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase(); + sinkField.setFieldType(realDataType); + // get field comment + List columnSpecs = definition.getColumnSpecs(); + if (CollectionUtils.isNotEmpty(columnSpecs)) { int commentIndex = -1; - for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) { + for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) { String spec = columnSpecs.get(csIndex); if (spec.toUpperCase().startsWith("COMMENT")) { commentIndex = csIndex; @@ -796,25 +804,26 @@ private List parseFieldsBySql(String sql) throws JSQLParserException if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) { comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", ""); } - - SinkField sinkField = new SinkField(); - sinkField.setFieldName(columnName); - sinkField.setFieldType(realDataType); sinkField.setFieldComment(comment); - fields.add(sinkField); } - } else { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "The SQL statement must be a table creation statement"); + + fields.add(sinkField); } return fields; } - private Map parseFieldsByJson(String statement) throws JsonProcessingException { - // Use LinkedHashMap deserialization to keep the order of the fields - return objectMapper.readValue(statement, - new TypeReference>() { - }); + private List parseFieldsByJson(String statement) throws JsonProcessingException { + return objectMapper.readValue(statement, new TypeReference>>() { + }).stream().map(line -> { + String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP); + String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP); + String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP); + SinkField sinkField = new SinkField(); + sinkField.setFieldName(name); + sinkField.setFieldType(type); + sinkField.setFieldComment(desc); + return sinkField; + }).collect(Collectors.toList()); } private void checkSinkRequestParams(SinkRequest request) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 4937a7c6b0..97c7a19d01 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -81,16 +81,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS; +import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FIELD_TYPES; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP; import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams; import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams; @@ -740,21 +743,17 @@ public List parseFields(ParseFieldRequest parseFieldRequest) { String method = parseFieldRequest.getMethod(); String statement = parseFieldRequest.getStatement(); - Map fieldsMap; - if (STATEMENT_TYPE_JSON.equals(method)) { - fieldsMap = parseFieldsByJson(statement); - } else if (STATEMENT_TYPE_SQL.equals(method)) { - return parseFieldsBySql(statement); - } else { - return parseFieldsByCsv(statement); + switch (method) { + case STATEMENT_TYPE_JSON: + return parseFieldsByJson(statement); + case STATEMENT_TYPE_SQL: + return parseFieldsBySql(statement); + case STATEMENT_TYPE_CSV: + return parseFieldsByCsv(statement); + default: + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + String.format("Unsupported parse field mode: %s", method)); } - return fieldsMap.entrySet().stream().map(entry -> { - StreamField field = new StreamField(); - field.setFieldName(entry.getKey()); - field.setFieldType(entry.getValue()); - return field; - }).collect(Collectors.toList()); - } catch (Exception e) { LOGGER.error("parse inlong stream fields error", e); throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, @@ -831,27 +830,35 @@ private List parseFieldsBySql(String sql) throws JSQLParserExceptio CCJSqlParserManager pm = new CCJSqlParserManager(); Statement statement = pm.parse(new StringReader(sql)); List fields = new ArrayList<>(); - if (statement instanceof CreateTable) { - CreateTable createTable = (CreateTable) statement; - List columnDefinitions = createTable.getColumnDefinitions(); - // get column definition - for (int i = 0; i < columnDefinitions.size(); i++) { - ColumnDefinition definition = columnDefinitions.get(i); - // get field name - String columnName = definition.getColumnName(); - ColDataType colDataType = definition.getColDataType(); - String sqlDataType = colDataType.getDataType(); - // convert SQL type to Java type - Class clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType); - if (clazz == Object.class) { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType); - } - String type = clazz.getSimpleName().toLowerCase(); - // get field comment - List columnSpecs = definition.getColumnSpecs(); + if (!(statement instanceof CreateTable)) { + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + "The SQL statement must be a table creation statement"); + } + CreateTable createTable = (CreateTable) statement; + List columnDefinitions = createTable.getColumnDefinitions(); + // get column definition + for (int i = 0; i < columnDefinitions.size(); i++) { + ColumnDefinition definition = columnDefinitions.get(i); + StreamField streamField = new StreamField(); + // get field name + String columnName = definition.getColumnName(); + streamField.setFieldName(columnName); + + ColDataType colDataType = definition.getColDataType(); + String sqlDataType = colDataType.getDataType(); + // convert SQL type to Java type + Class clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType); + if (clazz == Object.class) { + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType); + } + String type = clazz.getSimpleName().toLowerCase(); + streamField.setFieldType(type); + // get field comment + List columnSpecs = definition.getColumnSpecs(); + if (CollectionUtils.isNotEmpty(columnSpecs)) { int commentIndex = -1; - for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) { + for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) { String spec = columnSpecs.get(csIndex); if (spec.toUpperCase().startsWith("COMMENT")) { commentIndex = csIndex; @@ -862,25 +869,25 @@ private List parseFieldsBySql(String sql) throws JSQLParserExceptio if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) { comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", ""); } - - StreamField streamField = new StreamField(); - streamField.setFieldName(columnName); - streamField.setFieldType(type); streamField.setFieldComment(comment); - fields.add(streamField); } - } else { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "The SQL statement must be a table creation statement"); + fields.add(streamField); } return fields; } - private Map parseFieldsByJson(String statement) throws JsonProcessingException { - // Use LinkedHashMap deserialization to keep the order of the fields - return objectMapper.readValue(statement, - new TypeReference>() { - }); + private List parseFieldsByJson(String statement) throws JsonProcessingException { + return objectMapper.readValue(statement, new TypeReference>>() { + }).stream().map(line -> { + String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP); + String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP); + String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP); + StreamField streamField = new StreamField(); + streamField.setFieldName(name); + streamField.setFieldType(type); + streamField.setFieldComment(desc); + return streamField; + }).collect(Collectors.toList()); } /** diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java index 29cd7cb59d..502c2afdd0 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java @@ -39,12 +39,16 @@ public class InlongStreamTest extends ServiceBaseTest { @Test public void testParseStreamFieldsByJson() { - String streamFieldsJson = "{\"name0\":\"string\",\"name1\":\"string\"}"; + String streamFieldsJson = + "[{\"name\":\"name0\",\"type\":\"string\",\"desc\":\"desc0\"},{\"name\":\"name1\",\"type\":\"string\"}]"; List expectStreamFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { StreamField field = new StreamField(); field.setFieldName("name" + i); field.setFieldType("string"); + if (i == 0) { + field.setFieldComment("desc0"); + } expectStreamFields.add(field); } StreamField[] expectResult = expectStreamFields.toArray(new StreamField[0]); @@ -57,12 +61,16 @@ public void testParseStreamFieldsByJson() { @Test public void testParseSinkFieldsByJson() { - String sinkFieldsJson = "{\"sinkFieldName0\":\"string\",\"sinkFieldName1\":\"string\"}"; + String sinkFieldsJson = + "[{\"name\":\"sinkFieldName0\",\"type\":\"string\",\"desc\":\"desc0 content\"},{\"name\":\"sinkFieldName1\",\"type\":\"string\"}]"; List expectSinkFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { SinkField field = new SinkField(); field.setFieldName("sinkFieldName" + i); field.setFieldType("string"); + if (i == 0) { + field.setFieldComment("desc0 content"); + } expectSinkFields.add(field); } SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]); @@ -75,14 +83,14 @@ public void testParseSinkFieldsByJson() { @Test public void testParseStreamFieldsBySql() { - String streamFieldsSql = "CREATE TABLE my_table (name0 VARCHAR(50) comment 'desc0', name1 VARCHAR(50))"; + String streamFieldsSql = "CREATE TABLE my_table (name0 VARCHAR(50) comment 'desc0 content', name1 VARCHAR(50))"; List expectStreamFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { StreamField field = new StreamField(); field.setFieldName("name" + i); field.setFieldType("string"); if (i == 0) { - field.setFieldComment("desc0"); + field.setFieldComment("desc0 content"); } expectStreamFields.add(field); } @@ -97,14 +105,14 @@ public void testParseStreamFieldsBySql() { @Test public void testParseSinkFieldsBySql() { String sinkFieldsSql = - "CREATE TABLE my_table (sinkFieldName0 VARCHAR(50) comment 'desc0', sinkFieldName1 VARCHAR(50))"; + "CREATE TABLE my_table (sinkFieldName0 VARCHAR(50) comment 'desc0 content', sinkFieldName1 VARCHAR(50))"; List expectSinkFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { SinkField field = new SinkField(); field.setFieldName("sinkFieldName" + i); field.setFieldType("varchar"); if (i == 0) { - field.setFieldComment("desc0"); + field.setFieldComment("desc0 content"); } expectSinkFields.add(field); }