diff --git a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/meta/split/SourceSplitState.java b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/meta/split/SourceSplitState.java index ad842fd86c7..df1ef39f52b 100644 --- a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/meta/split/SourceSplitState.java +++ b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/meta/split/SourceSplitState.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.cdc.base.source.meta.split; +import org.apache.flink.annotation.Internal; + /** State of the reader, essentially a mutable version of the {@link SourceSplitBase}. * Copy from com.ververica:flink-cdc-base:2.3.0. * */ @@ -50,4 +52,10 @@ public final StreamSplitState asStreamSplitState() { /** Use the current split state to create a new SourceSplit. */ public abstract SourceSplitBase toSourceSplit(); + + /** Get the current split. */ + @Internal + public SourceSplitBase getSourceSplitBase() { + return split; + } } diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/OracleRecordEmitter.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/OracleRecordEmitter.java index 4fd0ae252b6..df7e0d2ee3e 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/OracleRecordEmitter.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/OracleRecordEmitter.java @@ -27,8 +27,11 @@ import io.debezium.connector.AbstractSourceInfo; import io.debezium.data.Envelope; import io.debezium.document.Array; +import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.TableChanges; +import io.debezium.relational.history.TableChanges.TableChange; +import java.util.Map; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.util.Collector; import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema; @@ -37,6 +40,7 @@ import org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitState; import org.apache.inlong.sort.cdc.base.source.metrics.SourceReaderMetrics; import org.apache.inlong.sort.cdc.base.source.reader.IncrementalSourceRecordEmitter; +import org.apache.inlong.sort.cdc.base.util.RecordUtils; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -76,7 +80,7 @@ protected void processElement( splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange); } if (includeSchemaChanges) { - emitElement(element, output, splitState); + emitElement(element, output); } } else if (isDataChangeRecord(element)) { if (splitState.isStreamSplitState()) { @@ -84,33 +88,33 @@ protected void processElement( splitState.asStreamSplitState().setStartingOffset(position); } reportMetrics(element); - emitElement(element, output, splitState); + final Map tableSchemas = + splitState.getSourceSplitBase().getTableSchemas(); + final TableChange tableSchema = + tableSchemas.getOrDefault(RecordUtils.getTableId(element), null); + debeziumDeserializationSchema.deserialize(element, new Collector() { + + @Override + public void collect(T record) { + Struct value = (Struct) element.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY); + String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); + String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY); + sourceReaderMetrics + .outputMetrics(dbName, schemaName, tableName, splitState.isSnapshotSplitState(), value); + output.collect(record); + } + + @Override + public void close() { + + } + }, tableSchema); } else { // unknown element LOG.info("Meet unknown element {}, just skip.", element); } } - protected void emitElement(SourceRecord element, SourceOutput output, - SourceSplitState splitState) throws Exception { - debeziumDeserializationSchema.deserialize(element, new Collector() { - - @Override - public void collect(T record) { - Struct value = (Struct) element.value(); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY); - String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); - String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY); - sourceReaderMetrics - .outputMetrics(dbName, schemaName, tableName, splitState.isSnapshotSplitState(), value); - output.collect(record); - } - - @Override - public void close() { - - } - }); - } }