From 708ffc3878eaf9877ea695234f9f1f2d18862e0a Mon Sep 17 00:00:00 2001 From: feat Date: Sat, 22 Apr 2023 12:54:41 +0800 Subject: [PATCH 1/2] [INLONG-7893][Manager] Support field description when parsing field by json --- .../common/consts/InlongConstants.java | 3 +++ .../service/sink/StreamSinkServiceImpl.java | 23 ++++++++++++++----- .../stream/InlongStreamServiceImpl.java | 23 ++++++++++++++----- .../service/stream/InlongStreamTest.java | 12 ++++++++-- 4 files changed, 47 insertions(+), 14 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index f83a559fc7..500a755a71 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -167,5 +167,8 @@ public class InlongConstants { public static final Set STREAM_FIELD_TYPES = Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp"); + public static final String STREAM_FILED_JSON_NAME_PROP = "name"; + public static final String STREAM_FILED_JSON_TYPE_PROP = "type"; + public static final String STREAM_FILED_JSON_COMMENT_PROP = "desc"; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index d5e15dfe02..895bc2a254 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -85,6 +85,9 @@ import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; +import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_TYPE_PROP; /** * Implementation of sink service interface @@ -712,7 +715,7 @@ public List parseFields(ParseFieldRequest parseFieldRequest) { Map fieldsMap; if (STATEMENT_TYPE_JSON.equals(method)) { - fieldsMap = parseFieldsByJson(statement); + return parseFieldsByJson(statement); } else if (STATEMENT_TYPE_SQL.equals(method)) { return parseFieldsBySql(statement); } else { @@ -810,11 +813,19 @@ private List parseFieldsBySql(String sql) throws JSQLParserException return fields; } - private Map parseFieldsByJson(String statement) throws JsonProcessingException { - // Use LinkedHashMap deserialization to keep the order of the fields - return objectMapper.readValue(statement, - new TypeReference>() { - }); + private List parseFieldsByJson(String statement) throws JsonProcessingException { + return objectMapper.readValue(statement, new TypeReference>>() { + }).stream().map(line -> { + String name = line.get(STREAM_FILED_JSON_NAME_PROP); + String type = line.get(STREAM_FILED_JSON_TYPE_PROP); + String desc = line.get(STREAM_FILED_JSON_COMMENT_PROP); + Map.Entry next = line.entrySet().iterator().next(); + SinkField streamField = new SinkField(); + streamField.setFieldName(name); + streamField.setFieldType(type); + streamField.setFieldComment(desc); + return streamField; + }).collect(Collectors.toList()); } private void checkSinkRequestParams(SinkRequest request) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 4937a7c6b0..207002771f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -91,6 +91,9 @@ import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FIELD_TYPES; +import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_TYPE_PROP; import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams; import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams; @@ -742,7 +745,7 @@ public List parseFields(ParseFieldRequest parseFieldRequest) { Map fieldsMap; if (STATEMENT_TYPE_JSON.equals(method)) { - fieldsMap = parseFieldsByJson(statement); + return parseFieldsByJson(statement); } else if (STATEMENT_TYPE_SQL.equals(method)) { return parseFieldsBySql(statement); } else { @@ -876,11 +879,19 @@ private List parseFieldsBySql(String sql) throws JSQLParserExceptio return fields; } - private Map parseFieldsByJson(String statement) throws JsonProcessingException { - // Use LinkedHashMap deserialization to keep the order of the fields - return objectMapper.readValue(statement, - new TypeReference>() { - }); + private List parseFieldsByJson(String statement) throws JsonProcessingException { + return objectMapper.readValue(statement, new TypeReference>>() { + }).stream().map(line -> { + String name = line.get(STREAM_FILED_JSON_NAME_PROP); + String type = line.get(STREAM_FILED_JSON_TYPE_PROP); + String desc = line.get(STREAM_FILED_JSON_COMMENT_PROP); + Map.Entry next = line.entrySet().iterator().next(); + StreamField streamField = new StreamField(); + streamField.setFieldName(name); + streamField.setFieldType(type); + streamField.setFieldComment(desc); + return streamField; + }).collect(Collectors.toList()); } /** diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java index 29cd7cb59d..b51eafe1b5 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java @@ -39,12 +39,16 @@ public class InlongStreamTest extends ServiceBaseTest { @Test public void testParseStreamFieldsByJson() { - String streamFieldsJson = "{\"name0\":\"string\",\"name1\":\"string\"}"; + String streamFieldsJson = + "[{\"name\":\"name0\",\"type\":\"string\",\"desc\":\"desc0\"},{\"name\":\"name1\",\"type\":\"string\"}]"; List expectStreamFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { StreamField field = new StreamField(); field.setFieldName("name" + i); field.setFieldType("string"); + if (i == 0) { + field.setFieldComment("desc0"); + } expectStreamFields.add(field); } StreamField[] expectResult = expectStreamFields.toArray(new StreamField[0]); @@ -57,12 +61,16 @@ public void testParseStreamFieldsByJson() { @Test public void testParseSinkFieldsByJson() { - String sinkFieldsJson = "{\"sinkFieldName0\":\"string\",\"sinkFieldName1\":\"string\"}"; + String sinkFieldsJson = + "[{\"name\":\"sinkFieldName0\",\"type\":\"string\",\"desc\":\"desc0\"},{\"name\":\"sinkFieldName1\",\"type\":\"string\"}]"; List expectSinkFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { SinkField field = new SinkField(); field.setFieldName("sinkFieldName" + i); field.setFieldType("string"); + if (i == 0) { + field.setFieldComment("desc0"); + } expectSinkFields.add(field); } SinkField[] expectResult = expectSinkFields.toArray(new SinkField[0]); From 519d13a9b1732b85c563d2a9630c9cf208329408 Mon Sep 17 00:00:00 2001 From: feat Date: Mon, 24 Apr 2023 10:28:45 +0800 Subject: [PATCH 2/2] [INLONG-7893][Manager] Support field description when parsing field by JSON --- .../common/consts/InlongConstants.java | 18 ++- .../service/sink/StreamSinkServiceImpl.java | 104 +++++++++--------- .../stream/InlongStreamServiceImpl.java | 96 ++++++++-------- .../service/stream/InlongStreamTest.java | 12 +- 4 files changed, 118 insertions(+), 112 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index 500a755a71..1579bb0730 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -167,8 +167,20 @@ public class InlongConstants { public static final Set STREAM_FIELD_TYPES = Sets.newHashSet("string", "int", "long", "float", "double", "date", "timestamp"); - public static final String STREAM_FILED_JSON_NAME_PROP = "name"; - public static final String STREAM_FILED_JSON_TYPE_PROP = "type"; - public static final String STREAM_FILED_JSON_COMMENT_PROP = "desc"; + + /** + * The name prop when batch parsing fields in JSON mode + */ + public static final String BATCH_PARSING_FILED_JSON_NAME_PROP = "name"; + + /** + * The type prop when batch parsing fields in JSON mode + */ + public static final String BATCH_PARSING_FILED_JSON_TYPE_PROP = "type"; + + /** + * The comment prop when batch parsing fields in JSON mode + */ + public static final String BATCH_PARSING_FILED_JSON_COMMENT_PROP = "desc"; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 895bc2a254..f369d22140 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -76,18 +76,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET; import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS; +import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; -import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_COMMENT_PROP; -import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_NAME_PROP; -import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_TYPE_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP; /** * Implementation of sink service interface @@ -96,7 +97,7 @@ public class StreamSinkServiceImpl implements StreamSinkService { private static final Logger LOGGER = LoggerFactory.getLogger(StreamSinkServiceImpl.class); - private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,"; + private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]"); private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3; private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2; @@ -294,7 +295,8 @@ public List listBrief(String groupId, String streamId) { Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY); List summaryList = sinkMapper.selectSummary(groupId, streamId); - LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId); + LOGGER.debug("success to list sink summary by groupId={}, streamId={}", groupId, streamId); + return summaryList; } @@ -713,25 +715,21 @@ public List parseFields(ParseFieldRequest parseFieldRequest) { String method = parseFieldRequest.getMethod(); String statement = parseFieldRequest.getStatement(); - Map fieldsMap; - if (STATEMENT_TYPE_JSON.equals(method)) { - return parseFieldsByJson(statement); - } else if (STATEMENT_TYPE_SQL.equals(method)) { - return parseFieldsBySql(statement); - } else { - return parseFieldsByCsv(statement); + switch (method) { + case STATEMENT_TYPE_JSON: + return parseFieldsByJson(statement); + case STATEMENT_TYPE_SQL: + return parseFieldsBySql(statement); + case STATEMENT_TYPE_CSV: + return parseFieldsByCsv(statement); + default: + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + String.format("Unsupported parse mode: %s", method)); } - return fieldsMap.entrySet().stream().map(entry -> { - SinkField field = new SinkField(); - field.setFieldName(entry.getKey()); - field.setFieldType(entry.getValue()); - return field; - }).collect(Collectors.toList()); } catch (Exception e) { - LOGGER.error("parse sink fields error", e); throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - String.format("parse sink fields error : %s", e.getMessage())); + String.format("parse sink fields error: %s", e.getMessage())); } } @@ -744,7 +742,7 @@ private List parseFieldsByCsv(String statement) { continue; } - String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER, PARSE_FIELD_CSV_MAX_COLUMNS); + String[] cols = PARSE_FIELD_CSV_SPLITTER.split(line, PARSE_FIELD_CSV_MAX_COLUMNS); if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) { throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "At least two fields are required, line number is " + (i + 1)); @@ -774,21 +772,28 @@ private List parseFieldsBySql(String sql) throws JSQLParserException CCJSqlParserManager pm = new CCJSqlParserManager(); Statement statement = pm.parse(new StringReader(sql)); List fields = new ArrayList<>(); - if (statement instanceof CreateTable) { - CreateTable createTable = (CreateTable) statement; - List columnDefinitions = createTable.getColumnDefinitions(); - // get column definition - for (ColumnDefinition definition : columnDefinitions) { - // get field name - String columnName = definition.getColumnName(); - ColDataType colDataType = definition.getColDataType(); - String sqlDataType = colDataType.getDataType(); - // get field type - String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase(); - // get field comment - List columnSpecs = definition.getColumnSpecs(); + if (!(statement instanceof CreateTable)) { + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + "The SQL statement must be a table creation statement"); + } + CreateTable createTable = (CreateTable) statement; + List columnDefinitions = createTable.getColumnDefinitions(); + // get column definition + for (ColumnDefinition definition : columnDefinitions) { + // get field name + String columnName = definition.getColumnName(); + ColDataType colDataType = definition.getColDataType(); + String sqlDataType = colDataType.getDataType(); + SinkField sinkField = new SinkField(); + sinkField.setFieldName(columnName); + // get field type + String realDataType = StringUtils.substringBefore(sqlDataType, LEFT_BRACKET).toLowerCase(); + sinkField.setFieldType(realDataType); + // get field comment + List columnSpecs = definition.getColumnSpecs(); + if (CollectionUtils.isNotEmpty(columnSpecs)) { int commentIndex = -1; - for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) { + for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) { String spec = columnSpecs.get(csIndex); if (spec.toUpperCase().startsWith("COMMENT")) { commentIndex = csIndex; @@ -799,16 +804,10 @@ private List parseFieldsBySql(String sql) throws JSQLParserException if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) { comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", ""); } - - SinkField sinkField = new SinkField(); - sinkField.setFieldName(columnName); - sinkField.setFieldType(realDataType); sinkField.setFieldComment(comment); - fields.add(sinkField); } - } else { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "The SQL statement must be a table creation statement"); + + fields.add(sinkField); } return fields; } @@ -816,15 +815,14 @@ private List parseFieldsBySql(String sql) throws JSQLParserException private List parseFieldsByJson(String statement) throws JsonProcessingException { return objectMapper.readValue(statement, new TypeReference>>() { }).stream().map(line -> { - String name = line.get(STREAM_FILED_JSON_NAME_PROP); - String type = line.get(STREAM_FILED_JSON_TYPE_PROP); - String desc = line.get(STREAM_FILED_JSON_COMMENT_PROP); - Map.Entry next = line.entrySet().iterator().next(); - SinkField streamField = new SinkField(); - streamField.setFieldName(name); - streamField.setFieldType(type); - streamField.setFieldComment(desc); - return streamField; + String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP); + String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP); + String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP); + SinkField sinkField = new SinkField(); + sinkField.setFieldName(name); + sinkField.setFieldType(type); + sinkField.setFieldComment(desc); + return sinkField; }).collect(Collectors.toList()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 207002771f..97c7a19d01 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -81,19 +81,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import static org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS; +import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FIELD_TYPES; -import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_COMMENT_PROP; -import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_NAME_PROP; -import static org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FILED_JSON_TYPE_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP; +import static org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP; import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams; import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams; @@ -743,21 +743,17 @@ public List parseFields(ParseFieldRequest parseFieldRequest) { String method = parseFieldRequest.getMethod(); String statement = parseFieldRequest.getStatement(); - Map fieldsMap; - if (STATEMENT_TYPE_JSON.equals(method)) { - return parseFieldsByJson(statement); - } else if (STATEMENT_TYPE_SQL.equals(method)) { - return parseFieldsBySql(statement); - } else { - return parseFieldsByCsv(statement); + switch (method) { + case STATEMENT_TYPE_JSON: + return parseFieldsByJson(statement); + case STATEMENT_TYPE_SQL: + return parseFieldsBySql(statement); + case STATEMENT_TYPE_CSV: + return parseFieldsByCsv(statement); + default: + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + String.format("Unsupported parse field mode: %s", method)); } - return fieldsMap.entrySet().stream().map(entry -> { - StreamField field = new StreamField(); - field.setFieldName(entry.getKey()); - field.setFieldType(entry.getValue()); - return field; - }).collect(Collectors.toList()); - } catch (Exception e) { LOGGER.error("parse inlong stream fields error", e); throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, @@ -834,27 +830,35 @@ private List parseFieldsBySql(String sql) throws JSQLParserExceptio CCJSqlParserManager pm = new CCJSqlParserManager(); Statement statement = pm.parse(new StringReader(sql)); List fields = new ArrayList<>(); - if (statement instanceof CreateTable) { - CreateTable createTable = (CreateTable) statement; - List columnDefinitions = createTable.getColumnDefinitions(); - // get column definition - for (int i = 0; i < columnDefinitions.size(); i++) { - ColumnDefinition definition = columnDefinitions.get(i); - // get field name - String columnName = definition.getColumnName(); - ColDataType colDataType = definition.getColDataType(); - String sqlDataType = colDataType.getDataType(); - // convert SQL type to Java type - Class clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType); - if (clazz == Object.class) { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType); - } - String type = clazz.getSimpleName().toLowerCase(); - // get field comment - List columnSpecs = definition.getColumnSpecs(); + if (!(statement instanceof CreateTable)) { + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + "The SQL statement must be a table creation statement"); + } + CreateTable createTable = (CreateTable) statement; + List columnDefinitions = createTable.getColumnDefinitions(); + // get column definition + for (int i = 0; i < columnDefinitions.size(); i++) { + ColumnDefinition definition = columnDefinitions.get(i); + StreamField streamField = new StreamField(); + // get field name + String columnName = definition.getColumnName(); + streamField.setFieldName(columnName); + + ColDataType colDataType = definition.getColDataType(); + String sqlDataType = colDataType.getDataType(); + // convert SQL type to Java type + Class clazz = FieldInfoUtils.sqlTypeToJavaType(sqlDataType); + if (clazz == Object.class) { + throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, + "Unrecognized SQL field type, line: " + (i + 1) + ", type: " + sqlDataType); + } + String type = clazz.getSimpleName().toLowerCase(); + streamField.setFieldType(type); + // get field comment + List columnSpecs = definition.getColumnSpecs(); + if (CollectionUtils.isNotEmpty(columnSpecs)) { int commentIndex = -1; - for (int csIndex = 0; columnSpecs != null && csIndex < columnSpecs.size(); csIndex++) { + for (int csIndex = 0; csIndex < columnSpecs.size(); csIndex++) { String spec = columnSpecs.get(csIndex); if (spec.toUpperCase().startsWith("COMMENT")) { commentIndex = csIndex; @@ -865,16 +869,9 @@ private List parseFieldsBySql(String sql) throws JSQLParserExceptio if (-1 != commentIndex && columnSpecs.size() > commentIndex + 1) { comment = columnSpecs.get(commentIndex + 1).replaceAll("['\"]", ""); } - - StreamField streamField = new StreamField(); - streamField.setFieldName(columnName); - streamField.setFieldType(type); streamField.setFieldComment(comment); - fields.add(streamField); } - } else { - throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, - "The SQL statement must be a table creation statement"); + fields.add(streamField); } return fields; } @@ -882,10 +879,9 @@ private List parseFieldsBySql(String sql) throws JSQLParserExceptio private List parseFieldsByJson(String statement) throws JsonProcessingException { return objectMapper.readValue(statement, new TypeReference>>() { }).stream().map(line -> { - String name = line.get(STREAM_FILED_JSON_NAME_PROP); - String type = line.get(STREAM_FILED_JSON_TYPE_PROP); - String desc = line.get(STREAM_FILED_JSON_COMMENT_PROP); - Map.Entry next = line.entrySet().iterator().next(); + String name = line.get(BATCH_PARSING_FILED_JSON_NAME_PROP); + String type = line.get(BATCH_PARSING_FILED_JSON_TYPE_PROP); + String desc = line.get(BATCH_PARSING_FILED_JSON_COMMENT_PROP); StreamField streamField = new StreamField(); streamField.setFieldName(name); streamField.setFieldType(type); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java index b51eafe1b5..502c2afdd0 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java @@ -62,14 +62,14 @@ public void testParseStreamFieldsByJson() { @Test public void testParseSinkFieldsByJson() { String sinkFieldsJson = - "[{\"name\":\"sinkFieldName0\",\"type\":\"string\",\"desc\":\"desc0\"},{\"name\":\"sinkFieldName1\",\"type\":\"string\"}]"; + "[{\"name\":\"sinkFieldName0\",\"type\":\"string\",\"desc\":\"desc0 content\"},{\"name\":\"sinkFieldName1\",\"type\":\"string\"}]"; List expectSinkFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { SinkField field = new SinkField(); field.setFieldName("sinkFieldName" + i); field.setFieldType("string"); if (i == 0) { - field.setFieldComment("desc0"); + field.setFieldComment("desc0 content"); } expectSinkFields.add(field); } @@ -83,14 +83,14 @@ public void testParseSinkFieldsByJson() { @Test public void testParseStreamFieldsBySql() { - String streamFieldsSql = "CREATE TABLE my_table (name0 VARCHAR(50) comment 'desc0', name1 VARCHAR(50))"; + String streamFieldsSql = "CREATE TABLE my_table (name0 VARCHAR(50) comment 'desc0 content', name1 VARCHAR(50))"; List expectStreamFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { StreamField field = new StreamField(); field.setFieldName("name" + i); field.setFieldType("string"); if (i == 0) { - field.setFieldComment("desc0"); + field.setFieldComment("desc0 content"); } expectStreamFields.add(field); } @@ -105,14 +105,14 @@ public void testParseStreamFieldsBySql() { @Test public void testParseSinkFieldsBySql() { String sinkFieldsSql = - "CREATE TABLE my_table (sinkFieldName0 VARCHAR(50) comment 'desc0', sinkFieldName1 VARCHAR(50))"; + "CREATE TABLE my_table (sinkFieldName0 VARCHAR(50) comment 'desc0 content', sinkFieldName1 VARCHAR(50))"; List expectSinkFields = new ArrayList<>(); for (int i = 0; i < 2; i++) { SinkField field = new SinkField(); field.setFieldName("sinkFieldName" + i); field.setFieldType("varchar"); if (i == 0) { - field.setFieldComment("desc0"); + field.setFieldComment("desc0 content"); } expectSinkFields.add(field); }