Skip to content

Commit

Permalink
Job IDs are now incremented
Browse files Browse the repository at this point in the history
Signed-off-by: Norman Jordan <norman.jordan@improving.com>
  • Loading branch information
normanj-bitquill committed Jan 13, 2025
1 parent c43791f commit 63e6b91
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docker/integ-test/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ services:
- plugins.security.system_indices.permission.enabled=false
- plugins.security.ssl.http.enabled=false
- plugins.query.datasources.encryption.masterkey=9a515c99d4313f140a6607053502f4d6
- OPENSEARCH_JAVA_OPTS=-Xms${OPENSEARCH_NODE_MEMORY:-512m} -Xmx${OPENSEARCH_NODE_MEMORY:-512m} -DEMR_SERVERLESS_CLIENT_FACTORY_CLASS=org.opensearch.sql.spark.client.DockerEMRServerlessClientFactory
- OPENSEARCH_JAVA_OPTS=-Xms${OPENSEARCH_NODE_MEMORY:-512m} -Xmx${OPENSEARCH_NODE_MEMORY:-512m}
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_ADMIN_PASSWORD}
ulimits:
memlock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class DockerEMRServerlessClient extends AmazonWebServiceClient implements AWSEMRServerless {
private static final AtomicInteger JOB_ID = new AtomicInteger(1);

public DockerEMRServerlessClient(ClientConfiguration clientConfiguration) {
super(clientConfiguration);
setEndpointPrefix("emr");
Expand Down Expand Up @@ -114,12 +117,14 @@ public StartJobRunResult startJobRun(final StartJobRunRequest startJobRunRequest
List<String> entryPointArguments = startJobRunRequest.getJobDriver().getSparkSubmit().getEntryPointArguments();
String sparkSubmitParameters = startJobRunRequest.getJobDriver().getSparkSubmit().getSparkSubmitParameters();

final int jobId = JOB_ID.getAndIncrement();

List<String> runContainerCmd = new ArrayList<>();
runContainerCmd.add("run");
runContainerCmd.add("-d");
runContainerCmd.add("--rm");
runContainerCmd.add("--env");
runContainerCmd.add("SERVERLESS_EMR_JOB_ID=1");
runContainerCmd.add("SERVERLESS_EMR_JOB_ID=" + jobId);
runContainerCmd.add("--network");
runContainerCmd.add("integ-test_opensearch-net");
runContainerCmd.add("integ-test-spark-submit:latest");
Expand Down Expand Up @@ -171,8 +176,8 @@ public StartJobRunResult startJobRun(final StartJobRunRequest startJobRunRequest
if (exitCodeFile.exists()) {
StartJobRunResult startJobResult = new StartJobRunResult();
startJobResult.setApplicationId(startJobRunRequest.getApplicationId());
startJobResult.setArn("arn:aws:emr-containers:foo:123456789012:/virtualclusters/0/jobruns/1");
startJobResult.setJobRunId("1");
startJobResult.setArn("arn:aws:emr-containers:foo:123456789012:/virtualclusters/0/jobruns/" + jobId);
startJobResult.setJobRunId(Integer.toString(jobId));
return startJobResult;
}
} catch (IOException | InterruptedException e) {
Expand Down

0 comments on commit 63e6b91

Please sign in to comment.