Skip to content

Commit

Permalink
[INLONG-9246][Sort] Pulsar source support audit when the deserialized…
Browse files Browse the repository at this point in the history
… type is not InlongMsg (#9255)
  • Loading branch information
vernedeng authored Nov 13, 2023
1 parent bdf63d5 commit b88685b
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -146,10 +147,14 @@ public String getMetadataKey(MetaField metaField) {
String metadataKey;
switch (metaField) {
case AUDIT_DATA_TIME:
metadataKey = "value.data-time";
if (format instanceof InLongMsgFormat) {
metadataKey = INLONG_MSG_AUDIT_TIME;
} else {
metadataKey = CONSUME_AUDIT_TIME;
}
break;
default:
throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
this.getClass().getSimpleName(), metaField));
}
return metadataKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
Expand Down Expand Up @@ -93,6 +94,8 @@ public class PulsarTableFactory implements DynamicTableSourceFactory {

public static final boolean UPSERT_DISABLED = false;

public static boolean innerFormat = false;

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Expand All @@ -103,6 +106,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
getValueDecodingFormat(helper);
ReadableConfig tableOptions = helper.getOptions();

innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));

// Validate configs are not conflict; each options is consumed; no unwanted configs
// PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not part of the validation.
helper.validateExcept(
Expand Down Expand Up @@ -154,6 +159,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
valueDecodingFormat,
valueProjection,
UPSERT_DISABLED,
innerFormat,
inlongMetric,
auditHostAndPorts,
auditKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.pulsar.table;

import org.apache.inlong.sort.protocol.node.ExtractNode;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -115,6 +117,11 @@ public enum ReadableMetadata {
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
message -> TimestampData.fromEpochMillis(message.getEventTime())),

CONSUME_TIME(
ExtractNode.CONSUME_AUDIT_TIME,
DataTypes.BIGINT().notNull(),
message -> System.currentTimeMillis()),

PROPERTIES(
"properties",
// key and value of the map are nullable to make handling easier in queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc

private final boolean upsertMode;

private final boolean innerFormat;

private SourceMetricData sourceMetricData;

private MetricOption metricOption;
Expand All @@ -67,6 +69,7 @@ public PulsarTableDeserializationSchema(
TypeInformation<RowData> producedTypeInfo,
PulsarRowDataConverter rowDataConverter,
boolean upsertMode,
boolean innerFormat,
MetricOption metricOption) {
if (upsertMode) {
checkNotNull(keyDeserialization, "upsert mode must specify a key format");
Expand All @@ -76,6 +79,7 @@ public PulsarTableDeserializationSchema(
this.rowDataConverter = checkNotNull(rowDataConverter);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.upsertMode = upsertMode;
this.innerFormat = innerFormat;
this.metricOption = metricOption;
}

Expand Down Expand Up @@ -109,8 +113,15 @@ public void deserialize(Message<byte[]> message, Collector<RowData> collector)
return;
}

valueDeserialization.deserialize(message.getData(),
new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData));
MetricsCollector<RowData> metricsCollector =
new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData);

// reset timestamp if the deserialize schema has not inner format
if (!innerFormat) {
metricsCollector.resetTimestamp(System.currentTimeMillis());
}

valueDeserialization.deserialize(message.getData(), metricsCollector);

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable {

private final int[] valueProjection;

private final boolean innerFormat;

// --------------------------------------------------------------------------------------------
// Mutable attributes. Will be updated after the applyReadableMetadata()
// --------------------------------------------------------------------------------------------
Expand All @@ -101,6 +103,7 @@ public PulsarTableDeserializationSchemaFactory(
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] valueProjection,
boolean upsertMode,
boolean innerFormat,
String inlongMetric,
String auditHostAndPorts,
String auditKeys) {
Expand All @@ -116,6 +119,7 @@ public PulsarTableDeserializationSchemaFactory(
this.producedDataType = physicalDataType;
this.connectorMetadataKeys = Collections.emptyList();
this.upsertMode = upsertMode;
this.innerFormat = innerFormat;

this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
Expand Down Expand Up @@ -178,6 +182,7 @@ public PulsarDeserializationSchema<RowData> createPulsarDeserialization(
producedTypeInfo,
rowDataConverter,
upsertMode,
innerFormat,
metricOption);
}

Expand Down

0 comments on commit b88685b

Please sign in to comment.