Skip to content

Commit

Permalink
[INLONG-8988][Manager] Supports multiple wrap types for message body (#…
Browse files Browse the repository at this point in the history
…8989)

* [INLONG-8988][Manager] Supports multiple wrap types for message body
  • Loading branch information
fuweng11 authored Sep 26, 2023
1 parent 144f139 commit ae9990c
Show file tree
Hide file tree
Showing 17 changed files with 88 additions and 33 deletions.
8 changes: 8 additions & 0 deletions inlong-manager/manager-dao/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun</groupId>
<artifactId>tools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun</groupId>
<artifactId>jconsole</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class InlongStreamEntity implements Serializable {
private String mqResource;

private String dataType;
private String wrapType;
private String dataEncoding;
private String dataSeparator;
private String dataEscapeChar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<result column="mq_resource" jdbcType="VARCHAR" property="mqResource"/>

<result column="data_type" jdbcType="VARCHAR" property="dataType"/>
<result column="wrap_type" jdbcType="VARCHAR" property="wrapType"/>
<result column="data_encoding" jdbcType="VARCHAR" property="dataEncoding"/>
<result column="data_separator" jdbcType="VARCHAR" property="dataSeparator"/>
<result column="data_escape_char" jdbcType="VARCHAR" property="dataEscapeChar"/>
Expand All @@ -52,7 +53,7 @@
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, name, description, mq_resource,
data_type, data_encoding, data_separator, data_escape_char, sync_send,
data_type, wrap_type, data_encoding, data_separator, data_escape_char, sync_send,
daily_records, daily_storage, peak_records, max_length, storage_period, ext_params,
status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version
</sql>
Expand All @@ -61,18 +62,20 @@
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamEntity">
insert into inlong_stream (id, inlong_group_id, inlong_stream_id,
name, description, mq_resource,
data_type, data_encoding, data_separator,
data_escape_char, sync_send, daily_records,
daily_storage, peak_records, max_length,
storage_period, ext_params, status,
previous_status, creator, modifier)
data_type, wrap_type, data_encoding,
data_separator, data_escape_char, sync_send,
daily_records, daily_storage, peak_records,
max_length,storage_period, ext_params,
status, previous_status, creator,
modifier)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR}, #{mqResource,jdbcType=VARCHAR},
#{dataType,jdbcType=VARCHAR}, #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
#{dataEscapeChar,jdbcType=VARCHAR}, #{syncSend,jdbcType=INTEGER}, #{dailyRecords,jdbcType=INTEGER},
#{dailyStorage,jdbcType=INTEGER}, #{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
#{storagePeriod,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
#{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
#{dataType,jdbcType=VARCHAR}, #{wrapType,jdbcType=VARCHAR}, #{dataEncoding,jdbcType=VARCHAR},
#{dataSeparator,jdbcType=VARCHAR}, #{dataEscapeChar,jdbcType=VARCHAR}, #{syncSend,jdbcType=INTEGER},
#{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER}, #{peakRecords,jdbcType=INTEGER},
#{maxLength,jdbcType=INTEGER}, #{storagePeriod,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
#{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
#{modifier,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamEntity">
Expand All @@ -99,6 +102,9 @@
<if test="dataType != null">
data_type,
</if>
<if test="wrapType != null">
wrap_type,
</if>
<if test="dataEncoding != null">
data_encoding,
</if>
Expand Down Expand Up @@ -164,6 +170,9 @@
<if test="dataType != null">
#{dataType,jdbcType=VARCHAR},
</if>
<if test="wrapType != null">
#{wrapType,jdbcType=VARCHAR},
</if>
<if test="dataEncoding != null">
#{dataEncoding,jdbcType=VARCHAR},
</if>
Expand Down Expand Up @@ -234,7 +243,7 @@
parameterType="org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest">
select
distinct stream.id, stream.inlong_group_id, stream.inlong_stream_id, stream.name,
stream.description, stream.mq_resource, stream.data_type, stream.data_encoding,
stream.description, stream.mq_resource, stream.data_type, stream.wrap_type, stream.data_encoding,
stream.data_separator, stream.data_escape_char, stream.sync_send, stream.daily_records,
stream.daily_storage, stream.peak_records, stream.max_length, stream.storage_period,
stream.status, stream.creator, stream.modifier, stream.create_time, stream.modify_time, stream.version
Expand Down Expand Up @@ -286,8 +295,7 @@
and is_deleted = 0
</select>
<select id="selectAllStreams" resultType="org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo">
select
inlong_group_id,
select inlong_group_id,
inlong_stream_id,
mq_resource,
ext_params
Expand All @@ -311,6 +319,7 @@
description = #{description, jdbcType=VARCHAR},
mq_resource = #{mqResource, jdbcType=VARCHAR},
data_type = #{dataType, jdbcType=VARCHAR},
wrap_type = #{wrapType, jdbcType=VARCHAR},
data_encoding = #{dataEncoding, jdbcType=VARCHAR},
data_separator = #{dataSeparator, jdbcType=VARCHAR},
data_escape_char = #{dataEscapeChar, jdbcType=VARCHAR},
Expand Down Expand Up @@ -351,6 +360,9 @@
<if test="dataType != null">
data_type = #{dataType, jdbcType=VARCHAR},
</if>
<if test="wrapType != null">
wrap_type = #{wrapType, jdbcType=VARCHAR},
</if>
<if test="dataEncoding != null">
data_encoding = #{dataEncoding, jdbcType=VARCHAR},
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.manager.pojo.sort.node.base;

import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.StreamField;
Expand Down Expand Up @@ -92,7 +93,7 @@ default List<FieldInfo> parseStreamFieldInfos(List<StreamField> streamFields, St
*/
default Format parsingFormat(
String serializationType,
boolean wrapWithInlongMsg,
String wrapType,
String separatorStr,
boolean ignoreParseErrors) {
Format format;
Expand Down Expand Up @@ -129,7 +130,7 @@ default Format parsingFormat(
default:
throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
}
if (wrapWithInlongMsg) {
if (Objects.equals(wrapType, MessageWrapType.INLONG_MSG_V0.getName())) {
Format innerFormat = format;
format = new InLongMsgFormat(innerFormat, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {

Format format = parsingFormat(
kafkaSource.getSerializationType(),
kafkaSource.isWrapWithInlongMsg(),
kafkaSource.getWrapType(),
kafkaSource.getDataSeparator(),
kafkaSource.isIgnoreParseErrors());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
pulsarSource.getPulsarTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();

Format format = parsingFormat(pulsarSource.getSerializationType(),
pulsarSource.isWrapWithInlongMsg(),
pulsarSource.getWrapType(),
pulsarSource.getDataSeparator(),
pulsarSource.isIgnoreParseError());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.pojo.source.kafka;

import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
Expand Down Expand Up @@ -92,8 +93,8 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;

@ApiModelProperty("Whether wrap content with InlongMsg")
private boolean wrapWithInlongMsg = true;
@ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();

public KafkaSource() {
this.setSourceType(SourceType.KAFKA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.pojo.source.pulsar;

import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
Expand Down Expand Up @@ -83,9 +84,9 @@ public class PulsarSource extends StreamSource {
@Builder.Default
private boolean isInlongComponent = false;

@ApiModelProperty("Whether wrap content with InlongMsg")
@ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
@Builder.Default
private boolean wrapWithInlongMsg = true;
private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();

public PulsarSource() {
this.setSourceType(SourceType.PULSAR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class InlongStreamBriefInfo {
@ApiModelProperty(value = "Data storage period, unit: day")
private Integer storagePeriod;

@ApiModelProperty(value = "Whether the message body wrapped with InlongMsg")
private Boolean wrapWithInlongMsg;
@ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;

@ApiModelProperty(value = "Whether to ignore the parse errors of field value")
private Boolean ignoreParseError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Whether to ignore the parse errors of field value")
private boolean ignoreParseError;

@ApiModelProperty(value = "Whether the message body wrapped with InlongMsg")
private boolean wrapWithInlongMsg;

/**
* Pack extended attributes into ExtParams
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "Version number")
private Integer version;

@ApiModelProperty(value = "Whether the message body wrapped with InlongMsg")
private Boolean wrapWithInlongMsg = true;
@ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;

@ApiModelProperty(value = "Whether to ignore the parse errors of field value")
private Boolean ignoreParseError = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "Whether to ignore the parse errors of field value")
private boolean ignoreParseError = true;

@ApiModelProperty(value = "Whether the message body wrapped with InlongMsg")
private boolean wrapWithInlongMsg = true;
@ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
private String wrapType;

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
}
}

kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
kafkaSource.setWrapType(streamInfo.getWrapType());

kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
kafkaSource.setFieldList(streamInfo.getFieldList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
String serializationType = DataTypeEnum.forType(streamInfo.getDataType()).getType();
pulsarSource.setSerializationType(serializationType);
}
pulsarSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
pulsarSource.setWrapType(streamInfo.getWrapType());
pulsarSource.setIgnoreParseError(streamInfo.getIgnoreParseError());

// set the token info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`description` varchar(256) DEFAULT '' COMMENT 'Description of inlong stream',
`mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of TubeMQ, corresponding to the topic of Pulsar',
`data_type` varchar(20) DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
`wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT 'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
`data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator',
`data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
Expand Down
1 change: 1 addition & 0 deletions inlong-manager/manager-web/sql/apache_inlong_manager.sql
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`description` varchar(256) DEFAULT '' COMMENT 'Description of inlong stream',
`mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of TubeMQ, corresponding to the topic of Pulsar',
`data_type` varchar(20) DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
`wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT 'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
`data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator',
`data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
Expand Down
31 changes: 31 additions & 0 deletions inlong-manager/manager-web/sql/changes-1.10.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This is the SQL change file from version 1.9.0 to the current version 1.10.0.
-- When upgrading to version 1.10.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module.

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

USE `apache_inlong_manager`;

ALTER TABLE `inlong_stream`
ADD COLUMN `wrap_type` varchar(256) DEFAULT 'INLONG_MSG_V0' COMMENT 'The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc';




0 comments on commit ae9990c

Please sign in to comment.