From 292efc9e8bd8da36a1694a21f6d7934053080b60 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Mon, 31 Jul 2023 10:18:48 +0800 Subject: [PATCH 1/2] [BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot phase --- ...MongoDBConnectorDeserializationSchema.java | 12 +----- .../source/fetch/MongodbFetchTaskContext.java | 23 ++++++++++- .../cdc/mongodb/utils/MongodbRecordUtils.java | 38 ++++++++++++++++++- 3 files changed, 60 insertions(+), 13 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java index 75f3564c6c6..6f36f4be830 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java @@ -65,6 +65,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; public class MongoDBConnectorDeserializationSchema @@ -154,17 +155,6 @@ private SeaTunnelRow extractRowData(BsonDocument document) { return (SeaTunnelRow) physicalConverter.convert(document); } - private BsonDocument extractBsonDocument( - Struct value, @Nonnull Schema valueSchema, String fieldName) { - if (valueSchema.field(fieldName) != null) { - String docString = value.getString(fieldName); - if (docString != null) { - return BsonDocument.parse(docString); - } - } - return null; - } - // ------------------------------------------------------------------------------------- // Runtime Converters // ------------------------------------------------------------------------------------- diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java index 534baa72abd..28c3fc919e3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java @@ -27,10 +27,12 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.bson.BsonDocument; +import org.bson.BsonString; import org.bson.BsonType; import org.bson.BsonValue; @@ -50,12 +52,16 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils.compareBsonValue; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.buildSourceRecord; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient; @@ -172,9 +178,24 @@ public void rewriteOutputBuffer( switch (OperationType.fromString(operationType)) { case INSERT: + outputBuffer.put(key, changeRecord); + break; case UPDATE: case REPLACE: - outputBuffer.put(key, changeRecord); + Schema valueSchema = changeRecord.valueSchema(); + BsonDocument fullDocument = + extractBsonDocument(value, valueSchema, FULL_DOCUMENT); + fullDocument.put(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT)); + SourceRecord record = + buildSourceRecord( + changeRecord.sourcePartition(), + changeRecord.sourceOffset(), + changeRecord.topic(), + changeRecord.kafkaPartition(), + changeRecord.keySchema(), + changeRecord.key(), + fullDocument); + outputBuffer.put(key, record); break; case DELETE: outputBuffer.remove(key); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java index c4d51c59e41..1e9ab577229 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -66,7 +67,18 @@ public static BsonDocument getResumeToken(SourceRecord sourceRecord) { public static BsonDocument getDocumentKey(@Nonnull SourceRecord sourceRecord) { Struct value = (Struct) sourceRecord.value(); - return BsonDocument.parse(value.getString(DOCUMENT_KEY)); + return extractBsonDocument(value, sourceRecord.valueSchema(), DOCUMENT_KEY); + } + + public static BsonDocument extractBsonDocument( + Struct value, @Nonnull Schema valueSchema, String fieldName) { + if (valueSchema.field(fieldName) != null) { + String docString = value.getString(fieldName); + if (docString != null) { + return BsonDocument.parse(docString); + } + } + return null; } public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String key) { @@ -139,6 +151,30 @@ public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String k valueSchemaAndValue.value()); } + public static @Nonnull SourceRecord buildSourceRecord( + Map sourcePartition, + Map sourceOffset, + String topicName, + Integer partition, + Schema keySchema, + Object key, + BsonDocument valueDocument) { + BsonValueToSchemaAndValue schemaAndValue = + new BsonValueToSchemaAndValue(new DefaultJson().getJsonWriterSettings()); + SchemaAndValue valueSchemaAndValue = + schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), valueDocument); + + return new SourceRecord( + sourcePartition, + sourceOffset, + topicName, + partition, + keySchema, + key, + valueSchemaAndValue.schema(), + valueSchemaAndValue.value()); + } + public static @Nonnull Map createSourceOffsetMap( @Nonnull BsonDocument idDocument, boolean isSnapshotRecord) { Map sourceOffset = new HashMap<>(); From 1ab5fc8f2f88f6485f0e30e157d4d782c7cbda76 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Thu, 10 Aug 2023 20:50:47 +0800 Subject: [PATCH 2/2] [Bug][Connector-V2][mongo-cdc] mongo --- .../source/fetch/MongodbFetchTaskContext.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java index 96afa90d851..fa0931a8070 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java @@ -32,6 +32,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.bson.BsonDocument; +import org.bson.BsonInt64; import org.bson.BsonString; import org.bson.BsonType; import org.bson.BsonValue; @@ -52,7 +53,12 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD; @@ -185,7 +191,10 @@ public void rewriteOutputBuffer( Schema valueSchema = changeRecord.valueSchema(); BsonDocument fullDocument = extractBsonDocument(value, valueSchema, FULL_DOCUMENT); - fullDocument.put(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT)); + if (fullDocument == null) { + break; + } + BsonDocument valueDocument = normalizeSnapshotDocument(fullDocument, value); SourceRecord record = buildSourceRecord( changeRecord.sourcePartition(), @@ -194,7 +203,7 @@ public void rewriteOutputBuffer( changeRecord.kafkaPartition(), changeRecord.keySchema(), changeRecord.key(), - fullDocument); + valueDocument); outputBuffer.put(key, record); break; case DELETE: @@ -223,6 +232,30 @@ record -> { .collect(Collectors.toList()); } + private BsonDocument normalizeSnapshotDocument( + @Nonnull final BsonDocument fullDocument, Struct value) { + return new BsonDocument() + .append(ID_FIELD, new BsonString(value.getString(DOCUMENT_KEY))) + .append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT)) + .append( + NS_FIELD, + new BsonDocument( + DB_FIELD, + new BsonString( + value.getStruct(NS_FIELD).getString(DB_FIELD))) + .append( + COLL_FIELD, + new BsonString( + value.getStruct(NS_FIELD).getString(COLL_FIELD)))) + .append(DOCUMENT_KEY, new BsonString(value.getString(DOCUMENT_KEY))) + .append(FULL_DOCUMENT, fullDocument) + .append(TS_MS_FIELD, new BsonInt64(value.getInt64(TS_MS_FIELD))) + .append( + SOURCE_FIELD, + new BsonDocument(SNAPSHOT_FIELD, new BsonString(SNAPSHOT_TRUE)) + .append(TS_MS_FIELD, new BsonInt64(0L))); + } + @Override public void close() { Runtime.getRuntime()