Skip to content

Commit

Permalink
[INLONG-7695][Sort] Fix NPE when running Oracle all database migratio…
Browse files Browse the repository at this point in the history
…n jobs (apache#7696)
  • Loading branch information
e-mhui authored Mar 27, 2023
1 parent 24909e6 commit b7fd7f6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.cdc.base.source.meta.split;

import org.apache.flink.annotation.Internal;

/** State of the reader, essentially a mutable version of the {@link SourceSplitBase}.
* Copy from com.ververica:flink-cdc-base:2.3.0.
* */
Expand Down Expand Up @@ -50,4 +52,10 @@ public final StreamSplitState asStreamSplitState() {

/** Use the current split state to create a new SourceSplit. */
public abstract SourceSplitBase toSourceSplit();

/** Get the current split. */
@Internal
public SourceSplitBase getSourceSplitBase() {
return split;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.document.Array;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import java.util.Map;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
Expand All @@ -37,6 +40,7 @@
import org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitState;
import org.apache.inlong.sort.cdc.base.source.metrics.SourceReaderMetrics;
import org.apache.inlong.sort.cdc.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.inlong.sort.cdc.base.util.RecordUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,41 +80,41 @@ protected void processElement(
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
}
if (includeSchemaChanges) {
emitElement(element, output, splitState);
emitElement(element, output);
}
} else if (isDataChangeRecord(element)) {
if (splitState.isStreamSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asStreamSplitState().setStartingOffset(position);
}
reportMetrics(element);
emitElement(element, output, splitState);
final Map<TableId, TableChange> tableSchemas =
splitState.getSourceSplitBase().getTableSchemas();
final TableChange tableSchema =
tableSchemas.getOrDefault(RecordUtils.getTableId(element), null);
debeziumDeserializationSchema.deserialize(element, new Collector<T>() {

@Override
public void collect(T record) {
Struct value = (Struct) element.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
sourceReaderMetrics
.outputMetrics(dbName, schemaName, tableName, splitState.isSnapshotSplitState(), value);
output.collect(record);
}

@Override
public void close() {

}
}, tableSchema);
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
}
}

protected void emitElement(SourceRecord element, SourceOutput<T> output,
SourceSplitState splitState) throws Exception {
debeziumDeserializationSchema.deserialize(element, new Collector<T>() {

@Override
public void collect(T record) {
Struct value = (Struct) element.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
sourceReaderMetrics
.outputMetrics(dbName, schemaName, tableName, splitState.isSnapshotSplitState(), value);
output.collect(record);
}

@Override
public void close() {

}
});
}
}

0 comments on commit b7fd7f6

Please sign in to comment.