diff --git a/LICENSE b/LICENSE index bd06a03806b3..adabba50de63 100644 --- a/LICENSE +++ b/LICENSE @@ -219,6 +219,7 @@ seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/se seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java from https://github.com/debezium/debezium +seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb from https://github.com/ververica/flink-cdc-connectors generate_client_protocol.sh from https://github.com/hazelcast/hazelcast seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast @@ -239,4 +240,4 @@ seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java from https://github.com/JSQLParser/JSqlParser seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser -seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser \ No newline at end of file +seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md b/docs/en/connector-v2/source/MongoDB-CDC.md index fca1116a31a7..955040917607 100644 --- a/docs/en/connector-v2/source/MongoDB-CDC.md +++ b/docs/en/connector-v2/source/MongoDB-CDC.md @@ -127,7 +127,7 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun ## How to Create a MongoDB Data Synchronization Jobs -The following example demonstrates how to create a data synchronization job that reads data from MongoDB and prints it on the local client: +The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and prints it on the local client: ```hocon env { @@ -163,7 +163,7 @@ sink { } ``` -The following example demonstrates how to create a data synchronization job that reads data from MongoDB and cdc write to mysql database: +The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and write to mysql database: ```hocon env { @@ -199,7 +199,7 @@ sink { } ``` -The following example demonstrates how to create a data synchronization job that read the data of multiple library tables mongodb and prints it on the local client: +The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client: ```hocon env { @@ -267,7 +267,7 @@ sink { ## Changelog -- Add MongoDB CDC Source Connector +- [Feature]Add MongoDB CDC Source Connector([4923](https://github.com/apache/seatunnel/pull/4923)) ### next version diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java index 99cac90ef152..1c5eea075d72 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java @@ -169,7 +169,12 @@ public void execute(Context context) throws Exception { BsonDocument startKey = (BsonDocument) snapshotSplit.getSplitStart()[1]; BsonDocument endKey = (BsonDocument) snapshotSplit.getSplitEnd()[1]; BsonDocument hint = (BsonDocument) snapshotSplit.getSplitStart()[0]; - + log.info( + "Initializing snapshot split processing: TableId={}, StartKey={}, EndKey={}, Hint={}", + snapshotSplit.getTableId(), + startKey, + endKey, + hint); return collection .find() .min(startKey) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml index 38eda0a913d6..a8814c11ee6c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml @@ -26,9 +26,7 @@ SeaTunnel : E2E : Connector V2 : CDC Mongodb - 8 - 8 - UTF-8 + 8.0.16 @@ -55,6 +53,19 @@ org.testcontainers mysql ${testcontainer.version} + test + + + org.apache.seatunnel + connector-jdbc + ${project.version} + test + + + mysql + mysql-connector-java + ${mysql.version} + test diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index 966f031d62f6..c78f54993049 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -92,7 +92,7 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource { private static final String SINK_SQL = "select name,description,weight from products"; private static final String MYSQL_DRIVER_JAR = - "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar"; + "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw"); @@ -107,7 +107,7 @@ private static MySqlContainer createMySqlContainer() { mySqlContainer.withUsername(MYSQL_USER_NAME); mySqlContainer.withPassword(MYSQL_USER_PASSWORD); // For local test use - mySqlContainer.setPortBindings(Collections.singletonList("3308:3306")); + // mySqlContainer.setPortBindings(Collections.singletonList("3308:3306")); return mySqlContainer; } @@ -118,7 +118,7 @@ private static MySqlContainer createMySqlContainer() { container.execInContainer( "bash", "-c", - "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + "mkdir -p /tmp/seatunnel/Jdbc/lib && cd /tmp/seatunnel/Jdbc/lib && curl -O " + MYSQL_DRIVER_JAR); Assertions.assertEquals(0, extraCommands.getExitCode()); }; @@ -176,31 +176,27 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) { }); // insert update delete - // upsertDeleteSourceTable(); - // - // await().atMost(240000, TimeUnit.MILLISECONDS) - // .untilAsserted( - // () -> { - // Assertions.assertIterableEquals( - // readMongodbData().stream() - // .peek(e -> e.remove("_id")) - // .map(Document::entrySet) - // .map(Set::stream) - // .map( - // entryStream -> - // entryStream - // - // .map(Map.Entry::getValue) - // .collect( - // - // Collectors.toCollection( - // - // ArrayList - // - // ::new))) - // .collect(Collectors.toList()), - // querySql()); - // }); + upsertDeleteSourceTable(); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + readMongodbData().stream() + .peek(e -> e.remove("_id")) + .map(Document::entrySet) + .map(Set::stream) + .map( + entryStream -> + entryStream + .map(Map.Entry::getValue) + .collect( + Collectors.toCollection( + ArrayList + ::new))) + .collect(Collectors.toList()), + querySql()); + }); } private Connection getJdbcConnection() throws SQLException {