diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index b9d649d6210..fc68f0f356f 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -73,6 +73,10 @@ public abstract class ExtractNode implements Node { public static final String INLONG_MSG = "inlong-msg"; + public static final String INLONG_MSG_AUDIT_TIME = "value.data-time"; + + public static final String CONSUME_AUDIT_TIME = "consume_time"; + @JsonProperty("id") private String id; @JsonInclude(Include.NON_NULL) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index d6634013da7..bc09b52e2c3 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -23,6 +23,7 @@ import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.transformation.WatermarkField; import com.google.common.base.Preconditions; @@ -146,10 +147,14 @@ public String getMetadataKey(MetaField metaField) { String metadataKey; switch (metaField) { case AUDIT_DATA_TIME: - metadataKey = "value.data-time"; + if (format instanceof InLongMsgFormat) { + metadataKey = INLONG_MSG_AUDIT_TIME; + } else { + metadataKey = CONSUME_AUDIT_TIME; + } break; default: - throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s", this.getClass().getSimpleName(), metaField)); } return metadataKey; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 05277ff5a2b..6cbfea2689d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -47,6 +47,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; @@ -93,6 +94,8 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { public static final boolean UPSERT_DISABLED = false; + public static boolean innerFormat = false; + @Override public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); @@ -103,6 +106,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { getValueDecodingFormat(helper); ReadableConfig tableOptions = helper.getOptions(); + innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT)); + // Validate configs are not conflict; each options is consumed; no unwanted configs // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not part of the validation. helper.validateExcept( @@ -154,6 +159,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { valueDecodingFormat, valueProjection, UPSERT_DISABLED, + innerFormat, inlongMetric, auditHostAndPorts, auditKeys); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java index 2e0c1ab1dd2..6740e6111f4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.protocol.node.ExtractNode; + import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; @@ -115,6 +117,11 @@ public enum ReadableMetadata { DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), message -> TimestampData.fromEpochMillis(message.getEventTime())), + CONSUME_TIME( + ExtractNode.CONSUME_AUDIT_TIME, + DataTypes.BIGINT().notNull(), + message -> System.currentTimeMillis()), + PROPERTIES( "properties", // key and value of the map are nullable to make handling easier in queries diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index 4792d7b2870..2dae39e8283 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -57,6 +57,8 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc private final boolean upsertMode; + private final boolean innerFormat; + private SourceMetricData sourceMetricData; private MetricOption metricOption; @@ -67,6 +69,7 @@ public PulsarTableDeserializationSchema( TypeInformation producedTypeInfo, PulsarRowDataConverter rowDataConverter, boolean upsertMode, + boolean innerFormat, MetricOption metricOption) { if (upsertMode) { checkNotNull(keyDeserialization, "upsert mode must specify a key format"); @@ -76,6 +79,7 @@ public PulsarTableDeserializationSchema( this.rowDataConverter = checkNotNull(rowDataConverter); this.producedTypeInfo = checkNotNull(producedTypeInfo); this.upsertMode = upsertMode; + this.innerFormat = innerFormat; this.metricOption = metricOption; } @@ -109,8 +113,15 @@ public void deserialize(Message message, Collector collector) return; } - valueDeserialization.deserialize(message.getData(), - new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData)); + MetricsCollector metricsCollector = + new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData); + + // reset timestamp if the deserialize schema has not inner format + if (!innerFormat) { + metricsCollector.resetTimestamp(System.currentTimeMillis()); + } + + valueDeserialization.deserialize(message.getData(), metricsCollector); rowDataConverter.projectToProducedRowAndCollect( message, keyRowData, valueRowData, collector); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java index c063e265399..c193a348577 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java @@ -80,6 +80,8 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable { private final int[] valueProjection; + private final boolean innerFormat; + // -------------------------------------------------------------------------------------------- // Mutable attributes. Will be updated after the applyReadableMetadata() // -------------------------------------------------------------------------------------------- @@ -101,6 +103,7 @@ public PulsarTableDeserializationSchemaFactory( DecodingFormat> valueDecodingFormat, int[] valueProjection, boolean upsertMode, + boolean innerFormat, String inlongMetric, String auditHostAndPorts, String auditKeys) { @@ -116,6 +119,7 @@ public PulsarTableDeserializationSchemaFactory( this.producedDataType = physicalDataType; this.connectorMetadataKeys = Collections.emptyList(); this.upsertMode = upsertMode; + this.innerFormat = innerFormat; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; @@ -178,6 +182,7 @@ public PulsarDeserializationSchema createPulsarDeserialization( producedTypeInfo, rowDataConverter, upsertMode, + innerFormat, metricOption); }