diff --git a/tis-datax/pom.xml b/tis-datax/pom.xml
index 9b4d95419..6894cfe1a 100644
--- a/tis-datax/pom.xml
+++ b/tis-datax/pom.xml
@@ -67,6 +67,7 @@
tis-ds-mysql-plugin
tis-ds-mysql-v5-plugin
tis-ds-mysql-v8-plugin
+ tis-ds-mysql-mariadb-plugin
tis-datax-local-executor
tis-datax-hudi-dependency
diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java
index 936bb81d5..e2a298790 100644
--- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java
+++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java
@@ -27,6 +27,7 @@ public class ReaderFilterNormalQuery extends ReaderFilter {
@Override
public Document createFilter() {
Document fitler = Document.parse(query);
+ logger.info("create MongoDB collection filter:{}", fitler.toJson());
return fitler;
}
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml b/tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml
new file mode 100644
index 000000000..411348474
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml
@@ -0,0 +1,58 @@
+
+
+
+
+
+ com.qlangtech.tis.plugins
+ tis-datax
+ ${revision}
+ ../pom.xml
+
+ 4.0.0
+ tpi
+
+ 3.4.1
+
+
+ tis-ds-mysql-mariadb-plugin
+
+
+ com.qlangtech.tis.plugins
+ tis-ds-mysql-plugin
+ ${project.version}
+
+
+
+
+ org.mariadb.jdbc
+ mariadb-java-client
+ ${mariadb-java-client.version}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java
new file mode 100644
index 000000000..a8cb59cc4
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.qlangtech.tis.plugin.ds.mysql;
+
+import com.google.common.collect.Lists;
+import com.qlangtech.tis.annotation.Public;
+import com.qlangtech.tis.extension.TISExtension;
+import com.qlangtech.tis.manage.common.TisUTF8;
+import com.qlangtech.tis.plugin.ds.DBConfig;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+/**
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2024-06-08 21:47
+ **/
+@Public
+public class MariaDBDataSourceFactory extends MySQLDataSourceFactory {
+
+ protected static final String DS_TYPE_MARIA_DB = "MariaDB";
+
+ private transient org.mariadb.jdbc.Driver driver;
+
+ @Override
+ public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
+ if (driver == null) {
+ driver = new org.mariadb.jdbc.Driver();
+ }
+ java.util.Properties info = new java.util.Properties();
+ if (this.userName != null) {
+ info.put("user", this.userName);
+ }
+ if (this.password != null) {
+ info.put("password", this.password);
+ }
+ if (verify) {
+ info.put("connectTimeout", "3000");
+ info.put("socketTimeout", "3000");
+ info.put("autoReconnect", "false");
+ }
+
+ return new JDBCConnection(driver.connect(jdbcUrl, info), jdbcUrl);
+ }
+
+ @Override
+ public String buidJdbcUrl(DBConfig db, String ip, String dbName) {
+
+ // https://mariadb.com/kb/en/about-mariadb-connector-j/#java-compatibility
+// StringBuffer jdbcUrl = new StringBuffer("jdbc:mariadb://" + ip + ":" + this.port + "/" + dbName +
+// "?useUnicode=yes&useCursorFetch=true&useSsl=false&serverTimezone=" + URLEncoder.encode(DEFAULT_SERVER_TIME_ZONE.getId(), TisUTF8.getName()));
+
+ StringBuffer jdbcUrl = new StringBuffer("jdbc:mariadb://" + ip + ":" + this.port + "/" + dbName);
+ if (this.useCompression != null) {
+ jdbcUrl.append("&useCompression=").append(this.useCompression);
+ }
+// if (org.apache.commons.lang.StringUtils.isNotEmpty(this.encode)) {
+// jdbcUrl.append("&characterEncoding=").append(this.encode);
+// }
+ if (org.apache.commons.lang.StringUtils.isNotEmpty(this.extraParams)) {
+ jdbcUrl.append("&" + this.extraParams);
+ }
+ return jdbcUrl.toString();
+
+ }
+
+ @Override
+ public void setReaderStatement(Statement stmt) throws SQLException {
+ com.mysql.jdbc.Statement statement = (com.mysql.jdbc.Statement) stmt;
+ statement.enableStreamingResults();
+ //statement.setFetchSize(0);
+ }
+
+ @TISExtension
+ public static class MariaDBDescriptor extends DefaultDescriptor {
+ @Override
+ protected String getDataSourceName() {
+ return DS_TYPE_MARIA_DB;
+ }
+
+ @Override
+ public final EndType getEndType() {
+ return EndType.MariaDB;
+ }
+
+ @Override
+ public List facadeSourceTypes() {
+ return Lists.newArrayList(DS_TYPE_MARIA_DB);
+ }
+
+ @Override
+ protected boolean validateMySQLVer(String mysqlVer) {
+ return StringUtils.containsIgnoreCase(mysqlVer, DS_TYPE_MARIA_DB);
+ }
+ }
+}
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json
new file mode 100644
index 000000000..95b85bae0
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json
@@ -0,0 +1,5 @@
+{
+ "encode": {
+ "disable": true
+ }
+}
\ No newline at end of file
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md
new file mode 100644
index 000000000..bebb07112
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md
@@ -0,0 +1,3 @@
+* 封装`MariaDB`数据源驱动
+
+ 驱动版本为[MariaDB Connector/J is for Java 8:3.4.1](https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector), 支持`MariaDB`数据源以JDBC的方式连接
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java
new file mode 100644
index 000000000..5b3786be6
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.qlangtech.tis.plugin.ds.mysql.TestMariaDBDataSourceFactory;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * @author: baisui 百岁
+ * @create: 2021-01-07 18:52
+ **/
+public class TestAll extends TestCase {
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(TestMariaDBDataSourceFactory.class);
+
+ return suite;
+ }
+}
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java
new file mode 100644
index 000000000..ecf63c898
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.qlangtech.tis.plugin.ds.mysql;
+
+import com.qlangtech.tis.plugin.ds.ColumnMetaData;
+import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
+import com.qlangtech.tis.trigger.util.JsonUtil;
+import junit.framework.TestCase;
+import org.junit.Ignore;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2021-12-30 10:56
+ **/
+public class TestMariaDBDataSourceFactory extends TestCase {
+ public void testGetTableMetadata() {
+ MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory();
+ dataSourceFactory.useCompression = true;
+ dataSourceFactory.password = "123456";
+ dataSourceFactory.dbName = "order2";
+ dataSourceFactory.encode = "utf8";
+ dataSourceFactory.port = 3306;
+ dataSourceFactory.userName = "root";
+ dataSourceFactory.nodeDesc = "192.168.28.200";
+
+
+ List baseColsMeta = dataSourceFactory.getTableMetadata(false, EntityName.parse("base"));
+ assertEquals(8, baseColsMeta.size());
+
+
+ JsonUtil.assertJSONEqual(TestMariaDBDataSourceFactory.class, "base-cols-meta.json"
+ , JsonUtil.toString(Collections.singletonMap("cols", baseColsMeta)), (msg, e, a) -> {
+ assertEquals(msg, e, a);
+ });
+ }
+
+
+ @Ignore
+ public void testAliyunAdsSource() {
+ MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory();
+ dataSourceFactory.useCompression = false;
+ dataSourceFactory.password = "SLK_20221218";
+ dataSourceFactory.dbName = "local-life";
+ dataSourceFactory.encode = "utf8";
+ dataSourceFactory.port = 3306;
+ dataSourceFactory.userName = "slk";
+ dataSourceFactory.nodeDesc = "am-2ev66ttmhd5ys6k3l167320o.ads.aliyuncs.com";
+
+ dataSourceFactory.visitFirstConnection((connection) -> {
+ try {
+// connection.execute(
+// "CREATE TABLE `cloudcanal_heartbeat_baisui` (\n" +
+// " `id` varchar(32) NOT NULL COMMENT '主键ID',\n" +
+// " `name` varchar(50) NOT NULL COMMENT '姓名',\n" +
+// " `is_valid` int(1) NOT NULL DEFAULT '1' COMMENT '是否有效,1:有效 0 无效',\n" +
+// " `create_time` bigint(20) NOT NULL COMMENT '创建时间',\n" +
+// " `op_time` bigint(20) DEFAULT NULL COMMENT '操作时间',\n" +
+// " `last_ver` int(11) NOT NULL DEFAULT '1' COMMENT '版本号',\n" +
+// " `op_user_id` varchar(32) NOT NULL COMMENT '操作人',\n" +
+// " `ext` varchar(1000) DEFAULT NULL,\n" +
+// " PRIMARY KEY (`id`)\n" +
+// ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
+// );
+
+// connection.execute(
+// "INSERT INTO `cloudcanal_heartbeat_baisui`(`id`, `name`, `is_valid`, `create_time`, `op_time`, `last_ver`, `op_user_id`, `ext`)\n" +
+// "VALUES ('3', 'cloudcanal_heartbeat', '1', 1690275535280, 1690791060013, '2', '', NULL)"
+// );
+
+ Connection cnn = connection.getConnection();
+ try (PreparedStatement prep = cnn.prepareStatement("insert into `cloudcanal_heartbeat_baisui`(`id`, `name`, `is_valid`, `create_time`, `op_time`, `last_ver`, `op_user_id`, `ext`)" +
+ " values (?,?,?,?,?,?,?,?)")) {
+
+ prep.setString(1, "11");
+ prep.setString(2, "baisui");
+ prep.setInt(3, 1);
+ prep.setLong(4, System.currentTimeMillis());
+ prep.setLong(5, System.currentTimeMillis());
+ prep.setInt(6, 5);
+ prep.setString(7, "22");
+ prep.setString(8, "xxx");
+
+ prep.execute();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ }
+}
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java
new file mode 100644
index 000000000..7bfc9826f
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.qlangtech.tis.plugin.ds.mysql;
+
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.qlangtech.tis.plugin.ds.DataSourceFactory;
+import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
+import junit.framework.TestCase;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+/**
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2023-05-08 10:41
+ **/
+public class TestMariaDBDataSourceReader extends TestCase {
+ public void testReader() {
+ MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory();
+ dataSourceFactory.useCompression = false;
+ dataSourceFactory.password = "123456";
+ dataSourceFactory.dbName = "item_center";
+ dataSourceFactory.encode = "utf8";
+ dataSourceFactory.port = 3306;
+ dataSourceFactory.userName = "root";
+ dataSourceFactory.nodeDesc = "192.168.28.200";
+
+
+ IDataSourceFactoryGetter dsGetter = new IDataSourceFactoryGetter() {
+ @Override
+ public DataSourceFactory getDataSourceFactory() {
+ return dataSourceFactory;
+ }
+
+ @Override
+ public Integer getRowFetchSize() {
+ return 2000;
+ }
+ };
+
+
+ dataSourceFactory.visitFirstConnection((conn) -> {
+ Connection connection = conn.getConnection();
+//com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader
+ String querySql = "select * from item";
+
+ Pair query = DBUtil.query(connection, querySql, 2000, dsGetter);
+ try {
+ int all = 0;
+ int count = 0;
+ try (ResultSet rs = query.getRight()) {
+ while (rs.next()) {
+ count++;
+ all++;
+ if (count > 50000) {
+ count = 0;
+ System.out.println(all);
+ }
+ }
+ }
+ } finally {
+ query.getLeft().close();
+ }
+
+
+ });
+
+ }
+}
diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json
new file mode 100644
index 000000000..2e0502588
--- /dev/null
+++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json
@@ -0,0 +1,101 @@
+{
+ "cols":[
+ {
+ "index":0,
+ "key":"base_id",
+ "name":"base_id",
+ "pk":true,
+ "type":{
+ "collapse":"Long",
+ "columnSize":10,
+ "type":4
+ },
+ "value":"base_id"
+ },
+ {
+ "index":1,
+ "key":"start_time",
+ "name":"start_time",
+ "pk":false,
+ "type":{
+ "collapse":"Date",
+ "columnSize":19,
+ "type":93
+ },
+ "value":"start_time"
+ },
+ {
+ "index":2,
+ "key":"update_date",
+ "name":"update_date",
+ "pk":false,
+ "type":{
+ "collapse":"Date",
+ "columnSize":10,
+ "type":91
+ },
+ "value":"update_date"
+ },
+ {
+ "index":3,
+ "key":"update_time",
+ "name":"update_time",
+ "pk":false,
+ "type":{
+ "collapse":"Date",
+ "columnSize":19,
+ "type":93
+ },
+ "value":"update_time"
+ },
+ {
+ "index":4,
+ "key":"price",
+ "name":"price",
+ "pk":false,
+ "type":{
+ "collapse":"Double",
+ "columnSize":5,
+ "decimalDigits":2,
+ "type":3
+ },
+ "value":"price"
+ },
+ {
+ "index":5,
+ "key":"json_content",
+ "name":"json_content",
+ "pk":false,
+ "type":{
+ "collapse":"STRING",
+ "columnSize":2000,
+ "type":12
+ },
+ "value":"json_content"
+ },
+ {
+ "index":6,
+ "key":"col_blob",
+ "name":"col_blob",
+ "pk":false,
+ "type":{
+ "collapse":"Bytes",
+ "columnSize":65535,
+ "type":-4
+ },
+ "value":"col_blob"
+ },
+ {
+ "index":7,
+ "key":"col_text",
+ "name":"col_text",
+ "pk":false,
+ "type":{
+ "collapse":"STRING",
+ "columnSize":65535,
+ "type":-1
+ },
+ "value":"col_text"
+ }
+ ]
+}
diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java
index a4f19e578..86c3f1929 100644
--- a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java
+++ b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java
@@ -501,7 +501,7 @@ public Optional getDefaultDataXReaderDescName() {
}
@Override
- public final EndType getEndType() {
+ public EndType getEndType() {
return EndType.MySQL;
}
diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json b/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json
index b65e3dfc8..131899769 100644
--- a/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json
+++ b/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json
@@ -1,6 +1,6 @@
{
"dbName": {
- "enum": "com.qlangtech.tis.util.PluginItems.getExistDbs(\"MySQL-V5\",\"MySQL-V8\")",
+ "enum": "com.qlangtech.tis.util.PluginItems.getExistDbs(\"MySQL-V5\",\"MySQL-V8\",\"MariaDB\")",
"creator": {
"plugin": [
{
@@ -8,6 +8,9 @@
},
{
"descName": "MySQL-V8"
+ },
+ {
+ "descName": "MariaDB"
}
]
}
diff --git a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java
index 827f5ef45..8d3d8faaa 100644
--- a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java
+++ b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java
@@ -20,6 +20,8 @@
import com.alibaba.citrus.turbine.Context;
import com.qlangtech.tis.datax.DefaultDataXProcessorManipulate;
+import com.qlangtech.tis.datax.IDataxProcessor;
+import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.manage.IAppSource;
import com.qlangtech.tis.plugin.IPluginStore.AfterPluginSaved;
@@ -35,6 +37,7 @@
import org.apache.commons.lang3.StringUtils;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -71,11 +74,14 @@ public void afterSaved(IPluginContext pluginContext, Optional context)
/**
* 先拷贝所有文件,与下面一步执行前后顺序不能颠倒
*/
- IPluginWithStore storePlugins = itemsProcessor.getStorePlugins();
- List pipelines = storePlugins.listPlugins();
- for (IAppSource pipeline : pipelines) {
- pipeline.copy(this.name);
- }
+ //IPluginWithStore storePlugins = itemsProcessor.getStorePlugins();
+ DataxProcessor copyFromPipeline = (DataxProcessor) DataxProcessor.load(null, originId[0]);
+ Objects.requireNonNull(copyFromPipeline, "name:" + originId[0] + " relevant pipeline can not be null");
+ copyFromPipeline.copy(this.name);
+// List pipelines = storePlugins.listPlugins();
+// for (IAppSource pipeline : pipelines) {
+// pipeline.copy(this.name);
+// }
/**
* 再将新的带有替换后的identityName名的实例保存
diff --git a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java
index 4082ee05b..0900f74a7 100644
--- a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java
+++ b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java
@@ -39,12 +39,15 @@
* @create: 2024-07-10 23:16
**/
public class ManipuldateUtils {
+
+
+
public static IPluginItemsProcessor cloneInstance(IPluginContext pluginContext, Context context, String newIdentityName
, Consumer pluginMetaConsumer
, Consumer originIdentityIdConsumer) {
Objects.requireNonNull(context, "param content can not be null");
JSONObject postContent = Objects.requireNonNull(pluginContext, "pluginContext can not be null").getJSONPostContent();
- JSONObject manipulateTarget = postContent.getJSONObject("manipulateTarget");
+ JSONObject manipulateTarget = postContent.getJSONObject(IUploadPluginMeta.KEY_JSON_MANIPULATE_TARGET);
final String keyManipulatePluginMeta = "manipulatePluginMeta";
String pluginType = postContent.getString(keyManipulatePluginMeta);
if (StringUtils.isEmpty(pluginType)) {
@@ -58,6 +61,7 @@ public static IPluginItemsProcessor cloneInstance(IPluginContext pluginContext,
throw new IllegalStateException("pluginMeta can not be empty");
}
for (IUploadPluginMeta meta : pluginMeta) {
+ meta.putExtraParams(DBIdentity.KEY_UPDATE, Boolean.FALSE.toString());
pluginMetaConsumer.accept(meta);
JSONArray itemsArray = new JSONArray();
diff --git a/tis-split-table-strategy-plugin/src/test/java/TestAll.java b/tis-split-table-strategy-plugin/src/test/java/TestAll.java
index f81bfae37..3bf95f392 100644
--- a/tis-split-table-strategy-plugin/src/test/java/TestAll.java
+++ b/tis-split-table-strategy-plugin/src/test/java/TestAll.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+import com.qlangtech.tis.plugin.ds.manipulate.TestManipuldateUtils;
import com.qlangtech.tis.plugin.ds.split.TestDefaultSplitTableStrategy;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -26,7 +27,8 @@
**/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- TestDefaultSplitTableStrategy.class})
+ TestDefaultSplitTableStrategy.class
+ , TestManipuldateUtils.class})
public class TestAll //extends TestCase
{
diff --git a/tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java b/tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java
new file mode 100644
index 000000000..e6f05dc39
--- /dev/null
+++ b/tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.qlangtech.tis.plugin.ds.manipulate;
+
+import com.alibaba.citrus.turbine.Context;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import com.qlangtech.tis.extension.impl.PropValRewrite;
+import com.qlangtech.tis.runtime.module.misc.IPostContent;
+import com.qlangtech.tis.test.TISEasyMock;
+import com.qlangtech.tis.trigger.util.JsonUtil;
+import com.qlangtech.tis.util.IPluginContext;
+import com.qlangtech.tis.util.IPluginItemsProcessor;
+import com.qlangtech.tis.util.IUploadPluginMeta;
+import com.qlangtech.tis.util.UploadPluginMeta;
+import org.apache.commons.lang3.tuple.Pair;
+import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * @author: 百岁(baisui@qlangtech.com)
+ * @create: 2024-07-20 08:28
+ **/
+public class TestManipuldateUtils implements TISEasyMock {
+
+ @Test
+ public void testCloneInstance() {
+
+ IPluginContext pluginContext = mock("pluginContext", IPluginContext.class);
+ IPluginItemsProcessor itemProcessor = mock("itemProcessor", IPluginItemsProcessor.class);
+ JSONObject postJson = JsonUtil.loadJSON(ManipuldateUtils.class, "post-manipulate-body.json");
+ EasyMock.expect(pluginContext.getJSONPostContent()).andReturn(postJson);
+
+ String meta = "appSource:require,update_true,justGetItemRelevant_true,dataxName_mysql_mysql,processModel_createDatax";
+ List pluginMetas = (UploadPluginMeta.parse(pluginContext, new String[]{meta}, false));
+ EasyMock.expect(pluginContext.parsePluginMeta(new String[]{meta}, false))
+ .andReturn(pluginMetas);
+
+
+ Context context = mock("context", Context.class);
+ EasyMock.expect(context.hasErrors()).andReturn(true);
+
+ for (IUploadPluginMeta m : pluginMetas) {
+
+ JSONArray itemsArray = new JSONArray();
+ itemsArray.add(postJson.getJSONObject(IUploadPluginMeta.KEY_JSON_MANIPULATE_TARGET));
+
+ pluginContext.getPluginItems(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(0), EasyMock.anyObject(), EasyMock.eq(false), EasyMock.anyObject());
+ IExpectationSetters> getPluginItemsSetters = EasyMock.expectLastCall();
+ getPluginItemsSetters.andStubDelegateTo(new DelegatePostContent(itemProcessor));
+
+ }
+
+ String newIdentityName = "test";
+ Consumer pluginMetaConsumer = (mt) -> {
+ };
+ Consumer originIdentityIdConsumer = (id) -> {
+ };
+
+ replay();
+ IPluginItemsProcessor itemsProcessor
+ = ManipuldateUtils.cloneInstance(pluginContext, context, newIdentityName, pluginMetaConsumer, originIdentityIdConsumer);
+ Assert.assertNull("because newIdentityName is duplicate, result itemsProcessor shall be null", itemsProcessor);
+
+ verifyAll();
+ }
+
+ private class DelegatePostContent implements IPostContent {
+ private final IPluginItemsProcessor itemsProcessor;
+
+ public DelegatePostContent(IPluginItemsProcessor itemsProcessor) {
+ this.itemsProcessor = itemsProcessor;
+ }
+
+ @Override
+ public Pair getPluginItems(
+ IUploadPluginMeta pluginMeta, Context context, int pluginIndex, JSONArray itemsArray, boolean verify, PropValRewrite propValRewrite) {
+ UploadPluginMeta meta = (UploadPluginMeta) pluginMeta;
+ // 要保证是insert 操作
+ Assert.assertFalse("shall not be update", meta.isUpdate());
+ return Pair.of(false, itemsProcessor);
+ }
+
+ @Override
+ public List parsePluginMeta(String[] plugins, boolean useCache) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JSONObject getJSONPostContent() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json b/tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json
new file mode 100644
index 000000000..529cd414c
--- /dev/null
+++ b/tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json
@@ -0,0 +1,247 @@
+{
+ "items": [
+ [{
+ "updateModel": false,
+ "impl": "com.qlangtech.tis.plugin.ds.manipulate.CloneDefaultDataXProcessor",
+ "vals": {
+ "name": {
+ "updateModel": false,
+ "_primaryVal": "test",
+ "has_set_primaryVal": false,
+ "disabled": false,
+ "key": "name",
+ "pk": true,
+ "_eprops": {
+ "help": "填写新实例名称,不能与已存在的数据管道实例重名",
+ "label": "新实例ID"
+ },
+ "placeholder": "",
+ "dateTimeFormat": "yyyy-MM-dd HH:mm:ss",
+ "required": true,
+ "type": 1
+ }
+ },
+ "displayName": "Clone",
+ "showAllField": false,
+ "dspt": {
+ "impl": "com.qlangtech.tis.plugin.ds.manipulate.CloneDefaultDataXProcessor",
+ "pkField": "name",
+ "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindsmanipulateclonedefaultdataxprocessor",
+ "displayName": "Clone",
+ "extendPoint": "com.qlangtech.tis.datax.DefaultDataXProcessorManipulate",
+ "containAdvance": false,
+ "veriflable": false,
+ "extractProps": {
+ "notebook": {
+ "activate": false,
+ "ability": false
+ }
+ },
+ "attrs": [{
+ "ord": 0,
+ "eprops": {
+ "help": "填写新实例名称,不能与已存在的数据管道实例重名",
+ "label": "新实例ID"
+ },
+ "describable": false,
+ "pk": true,
+ "type": 1,
+ "key": "name",
+ "required": true
+ }]
+ },
+ "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindsmanipulateclonedefaultdataxprocessor",
+ "_propVals": [{
+ "updateModel": false,
+ "_primaryVal": "test",
+ "has_set_primaryVal": false,
+ "disabled": false,
+ "key": "name",
+ "pk": true,
+ "_eprops": {
+ "help": "填写新实例名称,不能与已存在的数据管道实例重名",
+ "label": "新实例ID"
+ },
+ "placeholder": "",
+ "dateTimeFormat": "yyyy-MM-dd HH:mm:ss",
+ "required": true,
+ "type": 1
+ }]
+ }]
+ ],
+ "manipulateTarget": {
+ "updateModel": false,
+ "impl": "com.qlangtech.tis.plugin.datax.DefaultDataxProcessor",
+ "vals": {
+ "name": {
+ "updateModel": true,
+ "_primaryVal": "mysql_mysql",
+ "has_set_primaryVal": false,
+ "disabled": false,
+ "key": "name",
+ "pk": true,
+ "_eprops": {
+ "label": "实例名称",
+ "placeholder": "MySQL-import"
+ },
+ "placeholder": "MySQL-import",
+ "dateTimeFormat": "yyyy-MM-dd HH:mm:ss",
+ "required": true,
+ "type": 1
+ },
+ "globalCfg": {
+ "updateModel": true,
+ "_primaryVal": "datax-global-config",
+ "has_set_primaryVal": false,
+ "disabled": false,
+ "key": "globalCfg",
+ "pk": false,
+ "_eprops": {
+ "creator": {
+ "plugin": [{
+ "hetero": "params-cfg",
+ "descName": "DataX-global"
+ }],
+ "label": "配置"
+ },
+ "dftVal": "datax-global-config",
+ "label": "全局配置"
+ },
+ "dftVal": "datax-global-config",
+ "placeholder": "",
+ "dateTimeFormat": "yyyy-MM-dd HH:mm:ss",
+ "required": true,
+ "type": 6,
+ "options": [{
+ "impl": "com.qlangtech.tis.plugin.datax.DataXGlobalConfig",
+ "name": "datax-global-config"
+ }]
+ },
+ "dptId": {
+ "updateModel": true,
+ "_primaryVal": "2",
+ "has_set_primaryVal": false,
+ "disabled": false,
+ "key": "dptId",
+ "pk": false,
+ "_eprops": {
+ "creator": {
+ "routerLink": "/base/departmentlist",
+ "label": "部门管理"
+ },
+ "label": "所属部门",
+ "enum": [{
+ "val": "2",
+ "label": "/tis/default"
+ }]
+ },
+ "placeholder": "",
+ "dateTimeFormat": "yyyy-MM-dd HH:mm:ss",
+ "required": true,
+ "type": 5
+ },
+ "recept": {
+ "updateModel": true,
+ "_primaryVal": "小明",
+ "has_set_primaryVal": false,
+ "disabled": false,
+ "key": "recept",
+ "pk": false,
+ "_eprops": {
+ "label": "接口人",
+ "placeholder": "小明"
+ },
+ "placeholder": "小明",
+ "dateTimeFormat": "yyyy-MM-dd HH:mm:ss",
+ "required": true,
+ "type": 1
+ }
+ },
+ "displayName": "DataxProcessor",
+ "showAllField": false,
+ "dspt": {
+ "impl": "com.qlangtech.tis.plugin.datax.DefaultDataxProcessor",
+ "pkField": "name",
+ "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindataxdefaultdataxprocessor",
+ "displayName": "DataxProcessor",
+ "extendPoint": "com.qlangtech.tis.manage.IAppSource",
+ "containAdvance": false,
+ "veriflable": false,
+ "extractProps": {
+ "manipulate": {
+ "extendPoint": "com.qlangtech.tis.datax.DefaultDataXProcessorManipulate"
+ },
+ "notebook": {
+ "activate": false,
+ "ability": false
+ }
+ },
+ "attrs": [{
+ "ord": 0,
+ "eprops": {
+ "label": "实例名称",
+ "placeholder": "MySQL-import"
+ },
+ "describable": false,
+ "pk": true,
+ "type": 1,
+ "key": "name",
+ "required": true
+ }, {
+ "ord": 1,
+ "eprops": {
+ "creator": {
+ "plugin": [{
+ "hetero": "params-cfg",
+ "descName": "DataX-global"
+ }],
+ "label": "配置"
+ },
+ "dftVal": "datax-global-config",
+ "label": "全局配置"
+ },
+ "describable": false,
+ "options": [{
+ "impl": "com.qlangtech.tis.plugin.datax.DataXGlobalConfig",
+ "name": "datax-global-config"
+ }],
+ "pk": false,
+ "type": 6,
+ "key": "globalCfg",
+ "required": true
+ }, {
+ "ord": 2,
+ "eprops": {
+ "creator": {
+ "routerLink": "/base/departmentlist",
+ "label": "部门管理"
+ },
+ "label": "所属部门",
+ "enum": [{
+ "val": "2",
+ "label": "/tis/default"
+ }]
+ },
+ "describable": false,
+ "pk": false,
+ "type": 5,
+ "key": "dptId",
+ "required": true
+ }, {
+ "ord": 3,
+ "eprops": {
+ "label": "接口人",
+ "placeholder": "小明"
+ },
+ "describable": false,
+ "pk": false,
+ "type": 1,
+ "key": "recept",
+ "required": true
+ }]
+ },
+ "identityName": "mysql_mysql",
+ "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindataxdefaultdataxprocessor"
+ },
+ "manipulatePluginMeta": "appSource:require,update_true,justGetItemRelevant_true,dataxName_mysql_mysql,processModel_createDatax"
+}
\ No newline at end of file