Skip to content

Commit

Permalink
refactor abstract AbstractJobEnvironment for JobImmutableInformationE…
Browse files Browse the repository at this point in the history
…nv and JobExecutionEnvironment
  • Loading branch information
zhibinF committed Aug 4, 2023
1 parent 6c7ffe8 commit d214815
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,19 @@
package org.apache.seatunnel.engine.client.job;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class JobExecutionEnvironment {

private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class);

private final boolean isStartWithSavePoint;

private final JobConfig jobConfig;

private final List<Action> actions = new ArrayList<>();

private final Set<URL> jarUrls = new HashSet<>();

private final List<URL> commonPluginJars = new ArrayList<>();
public class JobExecutionEnvironment extends AbstractJobEnvironment {

private final String jobFilePath;

private final IdGenerator idGenerator;

private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;

private final JobClient jobClient;
Expand All @@ -78,35 +42,12 @@ public JobExecutionEnvironment(
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
boolean isStartWithSavePoint,
Long jobId) {
this.jobConfig = jobConfig;
super(jobConfig, isStartWithSavePoint);
this.jobFilePath = jobFilePath;
this.idGenerator = new IdGenerator();
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.isStartWithSavePoint = isStartWithSavePoint;
this.jobConfig.setJobContext(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}

public JobExecutionEnvironment(
Expand All @@ -117,27 +58,12 @@ public JobExecutionEnvironment(
}

/** Search all jars in SEATUNNEL_HOME/plugins */
private Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
}
} catch (IOException | SeaTunnelEngineException e) {
LOGGER.warning(
String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
}
return Collections.emptySet();
}

private MultipleTableJobConfigParser getJobConfigParser() {
@Override
protected MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
}

private LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}

public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
Expand All @@ -150,11 +76,4 @@ public ClientJobProxy execute() throws ExecutionException, InterruptedException

return jobClient.createJobProxy(jobImmutableInformation);
}

private LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
actions.addAll(immutablePair.getLeft());
jarUrls.addAll(immutablePair.getRight());
return getLogicalDagGenerator().generate();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.apache.seatunnel.engine.core.job;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;

public abstract class AbstractJobEnvironment {
protected static ILogger LOGGER = null;

protected final boolean isStartWithSavePoint;

protected final List<Action> actions = new ArrayList<>();
protected final Set<URL> jarUrls = new HashSet<>();

protected final JobConfig jobConfig;

protected final IdGenerator idGenerator;

protected final List<URL> commonPluginJars = new ArrayList<>();

public AbstractJobEnvironment(JobConfig jobConfig, boolean isStartWithSavePoint) {
LOGGER = Logger.getLogger(getClass().getName());
this.jobConfig = jobConfig;
this.isStartWithSavePoint = isStartWithSavePoint;
this.idGenerator = new IdGenerator();
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}

protected Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
}
} catch (IOException | SeaTunnelEngineException e) {
LOGGER.warning(
String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
}
return Collections.emptySet();
}

protected abstract MultipleTableJobConfigParser getJobConfigParser();

protected LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}

protected LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
actions.addAll(immutablePair.getLeft());
jarUrls.addAll(immutablePair.getRight());
return getLogicalDagGenerator().generate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,20 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;

import org.apache.commons.lang3.tuple.ImmutablePair;

import com.hazelcast.instance.impl.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngineImpl;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class JobImmutableInformationEnv {
private static final ILogger LOGGER = Logger.getLogger(JobImmutableInformationEnv.class);

private final boolean isStartWithSavePoint;

private final JobConfig jobConfig;

private final List<Action> actions = new ArrayList<>();

private final Set<URL> jarUrls = new HashSet<>();

private final List<URL> commonPluginJars = new ArrayList<>();

public class JobImmutableInformationEnv extends AbstractJobEnvironment {
private final Config seaTunnelJobConfig;

private final IdGenerator idGenerator;

private final NodeEngineImpl nodeEngine;

private final Long jobId;
Expand All @@ -79,9 +44,7 @@ public JobImmutableInformationEnv(
Node node,
boolean isStartWithSavePoint,
Long jobId) {
this.jobConfig = jobConfig;
this.isStartWithSavePoint = isStartWithSavePoint;
this.idGenerator = new IdGenerator();
super(jobConfig, isStartWithSavePoint);
this.seaTunnelJobConfig = seaTunnelJobConfig;
this.nodeEngine = node.getNodeEngine();
this.jobConfig.setJobContext(
Expand All @@ -93,61 +56,18 @@ public JobImmutableInformationEnv(
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
.newId()));
this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId());
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
Common.getThirdPartyJars(
jobConfig
.getEnvOptions()
.getOrDefault(EnvCommonOptions.JARS.key(), "")
.toString())
.stream()
.map(Path::toUri)
.map(
uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(
"the uri of jar illegal:" + uri, e);
}
})
.collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}

public Long getJobId() {
return jobId;
}

private Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
}
} catch (IOException | SeaTunnelEngineException e) {
LOGGER.warning(
String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
}
return Collections.emptySet();
}

private MultipleTableJobConfigParser getJobConfigParser() {
@Override
protected MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars);
}

private LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}

private LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse();
actions.addAll(immutablePair.getLeft());
jarUrls.addAll(immutablePair.getRight());
return getLogicalDagGenerator().generate();
}

public JobImmutableInformation build() {
return new JobImmutableInformation(
Long.parseLong(jobConfig.getJobContext().getJobId()),
Expand Down

0 comments on commit d214815

Please sign in to comment.