diff --git a/release-note.md b/release-note.md index 1c09b9af266..cbf9b562a24 100644 --- a/release-note.md +++ b/release-note.md @@ -61,6 +61,7 @@ - [Zeta] Fix task `notifyTaskStatusToMaster` failed when job not running or failed before run (#4847) - [Zeta] Fix cpu load problem (#4828) - [zeta] Fix the deadlock issue with JDBC driver loading (#4878) +- [zeta] dynamically replace the value of the variable at runtime (#4950) ### E2E diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index 9504c9cb2c1..eefcc1a0a62 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; @EqualsAndHashCode(callSuper = true) @Data @@ -102,6 +103,7 @@ public class ClientCommandArgs extends AbstractCommandArgs { @Override public Command buildCommand() { Common.setDeployMode(getDeployMode()); + userParamsToSysEnv(); if (checkConfig) { return new SeaTunnelConfValidateCommand(this); } @@ -114,6 +116,16 @@ public Command buildCommand() { return new ClientExecuteCommand(this); } + private void userParamsToSysEnv() { + if (!this.variables.isEmpty()) { + variables.stream() + .filter(Objects::nonNull) + .map(variable -> variable.split("=", 2)) + .filter(pair -> pair.length == 2) + .forEach(pair -> System.setProperty(pair[0], pair[1])); + } + } + public DeployMode getDeployMode() { return DeployMode.CLIENT; } diff --git a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java new file mode 100644 index 00000000000..5f197367d0d --- /dev/null +++ b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.seatunnel.args; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions; + +import org.apache.seatunnel.core.starter.utils.CommandLineUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.List; + +public class ClientCommandArgsTest { + @Test + public void testUserDefinedParamsCommand() throws URISyntaxException { + int fakeParallelism = 16; + String username = "seatunnel=2.3.1"; + String password = "dsjr42=4wfskahdsd=w1chh"; + String fakeSourceTable = "fake"; + String fakeSinkTable = "sink"; + String[] args = { + "-c", + "/args/user_defined_params.conf", + "-e", + "local", + "-i", + "fake_source_table=" + fakeSourceTable, + "-i", + "fake_parallelism=" + fakeParallelism, + "-i", + "fake_sink_table=" + fakeSinkTable, + "-i", + "password=" + password, + "-i", + "username=" + username + }; + ClientCommandArgs clientCommandArgs = + CommandLineUtils.parse(args, new ClientCommandArgs(), "seatunnel-zeta", true); + clientCommandArgs.buildCommand(); + URL resource = ClientCommandArgsTest.class.getResource("/args/user_defined_params.conf"); + + Config config = + ConfigFactory.parseFile(Paths.get(resource.toURI()).toFile()) + .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)) + .resolveWith( + ConfigFactory.systemProperties(), + ConfigResolveOptions.defaults().setAllowUnresolved(true)); + List sourceConfigs = config.getObjectList("source"); + for (ConfigObject configObject : sourceConfigs) { + Config sourceConfig = configObject.toConfig(); + + String tableName = sourceConfig.getString("result_table_name"); + Assertions.assertEquals(tableName, fakeSourceTable); + + int parallelism = Integer.parseInt(sourceConfig.getString("parallelism")); + Assertions.assertEquals(fakeParallelism, parallelism); + + Assertions.assertEquals(sourceConfig.getString("username"), username); + Assertions.assertEquals(sourceConfig.getString("password"), password); + } + List sinkConfigs = config.getObjectList("sink"); + for (ConfigObject sinkObject : sinkConfigs) { + Config sinkConfig = sinkObject.toConfig(); + String tableName = sinkConfig.getString("result_table_name"); + Assertions.assertEquals(tableName, fakeSinkTable); + + Assertions.assertEquals(sinkConfig.getString("username"), username); + Assertions.assertEquals(sinkConfig.getString("password"), password); + } + } +} diff --git a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf new file mode 100644 index 00000000000..9dfde35dd6a --- /dev/null +++ b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 5000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = ${fake_source_table} + parallelism = ${fake_parallelism} + username = ${username} + password = ${password} + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + console { + result_table_name = ${fake_sink_table} + username = ${username} + password = ${password} + } +} \ No newline at end of file