From e3e50b81437ab1e7d968a274ef6ae8b2057b6025 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Fri, 10 Nov 2023 15:24:28 +0800 Subject: [PATCH 1/5] [INLONG-9246][Sort] Pulsar source support audit when the deserialized type is not InlongMsg --- .../protocol/node/extract/PulsarExtractNode.java | 7 ++++++- .../inlong/sort/pulsar/PulsarTableFactory.java | 7 +++++++ .../sort/pulsar/table/PulsarReadableMetadata.java | 5 +++++ .../table/PulsarTableDeserializationSchema.java | 15 +++++++++++++-- .../PulsarTableDeserializationSchemaFactory.java | 5 +++++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index d6634013da7..7f16efb45c1 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -23,6 +23,7 @@ import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.transformation.WatermarkField; import com.google.common.base.Preconditions; @@ -146,7 +147,11 @@ public String getMetadataKey(MetaField metaField) { String metadataKey; switch (metaField) { case AUDIT_DATA_TIME: - metadataKey = "value.data-time"; + if (format instanceof InLongMsgFormat) { + metadataKey = "value.data-time"; + } else { + metadataKey = "consume_time"; + } break; default: throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 05277ff5a2b..70d525b6565 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -47,6 +47,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; @@ -93,6 +94,9 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { public static final boolean UPSERT_DISABLED = false; + public static final String INNER_FORMAT_TYPE = "inlong-msg"; + public static boolean innerFormat = false; + @Override public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); @@ -103,6 +107,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { getValueDecodingFormat(helper); ReadableConfig tableOptions = helper.getOptions(); + innerFormat = INNER_FORMAT_TYPE.equals(tableOptions.get(FORMAT)); + // Validate configs are not conflict; each options is consumed; no unwanted configs // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not part of the validation. helper.validateExcept( @@ -154,6 +160,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { valueDecodingFormat, valueProjection, UPSERT_DISABLED, + innerFormat, inlongMetric, auditHostAndPorts, auditKeys); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java index 2e0c1ab1dd2..b3f4c608f27 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java @@ -115,6 +115,11 @@ public enum ReadableMetadata { DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), message -> TimestampData.fromEpochMillis(message.getEventTime())), + CONSUME_TIME( + "consume_time", + DataTypes.BIGINT().notNull(), + message -> System.currentTimeMillis()), + PROPERTIES( "properties", // key and value of the map are nullable to make handling easier in queries diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index 4792d7b2870..4d2bf0992b8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -57,6 +57,8 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc private final boolean upsertMode; + private final boolean innerFormat; + private SourceMetricData sourceMetricData; private MetricOption metricOption; @@ -67,6 +69,7 @@ public PulsarTableDeserializationSchema( TypeInformation producedTypeInfo, PulsarRowDataConverter rowDataConverter, boolean upsertMode, + boolean innerFormat, MetricOption metricOption) { if (upsertMode) { checkNotNull(keyDeserialization, "upsert mode must specify a key format"); @@ -76,6 +79,7 @@ public PulsarTableDeserializationSchema( this.rowDataConverter = checkNotNull(rowDataConverter); this.producedTypeInfo = checkNotNull(producedTypeInfo); this.upsertMode = upsertMode; + this.innerFormat = innerFormat; this.metricOption = metricOption; } @@ -109,8 +113,15 @@ public void deserialize(Message message, Collector collector) return; } - valueDeserialization.deserialize(message.getData(), - new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData)); + MetricsCollector metricsCollector = + new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData); + + // reset time stamp if the deserialize schema has not inner format + if (!innerFormat) { + metricsCollector.resetTimestamp(message.getEventTime()); + } + + valueDeserialization.deserialize(message.getData(), metricsCollector); rowDataConverter.projectToProducedRowAndCollect( message, keyRowData, valueRowData, collector); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java index c063e265399..c193a348577 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java @@ -80,6 +80,8 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable { private final int[] valueProjection; + private final boolean innerFormat; + // -------------------------------------------------------------------------------------------- // Mutable attributes. Will be updated after the applyReadableMetadata() // -------------------------------------------------------------------------------------------- @@ -101,6 +103,7 @@ public PulsarTableDeserializationSchemaFactory( DecodingFormat> valueDecodingFormat, int[] valueProjection, boolean upsertMode, + boolean innerFormat, String inlongMetric, String auditHostAndPorts, String auditKeys) { @@ -116,6 +119,7 @@ public PulsarTableDeserializationSchemaFactory( this.producedDataType = physicalDataType; this.connectorMetadataKeys = Collections.emptyList(); this.upsertMode = upsertMode; + this.innerFormat = innerFormat; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; @@ -178,6 +182,7 @@ public PulsarDeserializationSchema createPulsarDeserialization( producedTypeInfo, rowDataConverter, upsertMode, + innerFormat, metricOption); } From 0f334cf801f70c81ada74178eb27644e24a7046f Mon Sep 17 00:00:00 2001 From: vernedeng Date: Fri, 10 Nov 2023 16:33:09 +0800 Subject: [PATCH 2/5] fix --- .../org/apache/inlong/sort/protocol/node/ExtractNode.java | 4 ++++ .../sort/protocol/node/extract/PulsarExtractNode.java | 6 +++--- .../org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 3 +-- .../inlong/sort/pulsar/table/PulsarReadableMetadata.java | 4 +++- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index b9d649d6210..fc68f0f356f 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -73,6 +73,10 @@ public abstract class ExtractNode implements Node { public static final String INLONG_MSG = "inlong-msg"; + public static final String INLONG_MSG_AUDIT_TIME = "value.data-time"; + + public static final String CONSUME_AUDIT_TIME = "consume_time"; + @JsonProperty("id") private String id; @JsonInclude(Include.NON_NULL) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index 7f16efb45c1..bc09b52e2c3 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -148,13 +148,13 @@ public String getMetadataKey(MetaField metaField) { switch (metaField) { case AUDIT_DATA_TIME: if (format instanceof InLongMsgFormat) { - metadataKey = "value.data-time"; + metadataKey = INLONG_MSG_AUDIT_TIME; } else { - metadataKey = "consume_time"; + metadataKey = CONSUME_AUDIT_TIME; } break; default: - throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s", this.getClass().getSimpleName(), metaField)); } return metadataKey; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 70d525b6565..6cbfea2689d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -94,7 +94,6 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { public static final boolean UPSERT_DISABLED = false; - public static final String INNER_FORMAT_TYPE = "inlong-msg"; public static boolean innerFormat = false; @Override @@ -107,7 +106,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { getValueDecodingFormat(helper); ReadableConfig tableOptions = helper.getOptions(); - innerFormat = INNER_FORMAT_TYPE.equals(tableOptions.get(FORMAT)); + innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT)); // Validate configs are not conflict; each options is consumed; no unwanted configs // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not part of the validation. diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java index b3f4c608f27..c53e514d24c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java @@ -22,7 +22,9 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.planner.expressions.Extract; import org.apache.flink.table.types.DataType; +import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.pulsar.client.api.Message; import java.io.Serializable; @@ -116,7 +118,7 @@ public enum ReadableMetadata { message -> TimestampData.fromEpochMillis(message.getEventTime())), CONSUME_TIME( - "consume_time", + ExtractNode.CONSUME_AUDIT_TIME, DataTypes.BIGINT().notNull(), message -> System.currentTimeMillis()), From 7154bbc3a79dc7eb04fe1a210479760ee48fbd24 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Fri, 10 Nov 2023 16:34:47 +0800 Subject: [PATCH 3/5] fix --- .../inlong/sort/pulsar/table/PulsarReadableMetadata.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java index c53e514d24c..6740e6111f4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java @@ -17,14 +17,14 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.protocol.node.ExtractNode; + import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.planner.expressions.Extract; import org.apache.flink.table.types.DataType; -import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.pulsar.client.api.Message; import java.io.Serializable; From a473c5a71197c1dadfe3a6fb72eb81f7290c610c Mon Sep 17 00:00:00 2001 From: vernedeng Date: Fri, 10 Nov 2023 16:37:32 +0800 Subject: [PATCH 4/5] fix --- .../sort/pulsar/table/PulsarTableDeserializationSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index 4d2bf0992b8..ac8dbb29813 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -118,7 +118,7 @@ public void deserialize(Message message, Collector collector) // reset time stamp if the deserialize schema has not inner format if (!innerFormat) { - metricsCollector.resetTimestamp(message.getEventTime()); + metricsCollector.resetTimestamp(System.currentTimeMillis()); } valueDeserialization.deserialize(message.getData(), metricsCollector); From e181cbafd513f0fb93b3fd2b76e33166ca745620 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Mon, 13 Nov 2023 10:45:16 +0800 Subject: [PATCH 5/5] fix --- .../sort/pulsar/table/PulsarTableDeserializationSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index ac8dbb29813..2dae39e8283 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -116,7 +116,7 @@ public void deserialize(Message message, Collector collector) MetricsCollector metricsCollector = new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData); - // reset time stamp if the deserialize schema has not inner format + // reset timestamp if the deserialize schema has not inner format if (!innerFormat) { metricsCollector.resetTimestamp(System.currentTimeMillis()); }