From 93a3a3d990915a30138cfb02079a2910768969a8 Mon Sep 17 00:00:00 2001 From: ms9698 Date: Mon, 11 Nov 2024 21:29:54 +0000 Subject: [PATCH] update operation job details without jobtracker --- .../java/uk/gov/gchq/gaffer/store/Store.java | 25 +++++++++++++++++++ .../rest/service/v2/OperationServiceV2IT.java | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java b/core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java index 81384a25a17..8b658eba7be 100644 --- a/core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java +++ b/core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java @@ -23,6 +23,7 @@ import uk.gov.gchq.gaffer.cache.Cache; import uk.gov.gchq.gaffer.cache.CacheServiceLoader; +import uk.gov.gchq.gaffer.cache.exception.CacheOperationException; import uk.gov.gchq.gaffer.commonutil.CloseableUtil; import uk.gov.gchq.gaffer.commonutil.ExecutorService; import uk.gov.gchq.gaffer.core.exception.GafferRuntimeException; @@ -186,7 +187,10 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; import static java.util.Collections.unmodifiableList; @@ -384,8 +388,29 @@ public void execute(final Operation operation, final Context context) throws Ope public O execute(final Output operation, final Context context) throws OperationException { return execute(OperationChain.wrap(operation), context); } + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); + + private void validateJobDetail(final JobDetail jobDetail) { + if (null == jobDetail) { + throw new IllegalArgumentException("JobDetail is required"); + } + + if (null == jobDetail.getJobId() || jobDetail.getJobId().isEmpty()) { + throw new IllegalArgumentException("jobId is required"); + } + } + protected O execute(final OperationChain operation, final Context context) throws OperationException { + final JobDetail newJobDetail = new JobDetail(context.getJobId(), context.getUser(), operation, JobStatus.RUNNING, null); final O result = (O) handleOperation(operation, context); + validateJobDetail(newJobDetail); + executor.submit(() -> { + try { + super.addToCache(newJobDetail.getJobId(), newJobDetail, true); + } catch (final CacheOperationException e) { + LOGGER.error("Failed to add jobDetail " + newJobDetail.toString() + " to the cache", e); + } + }); return result; } diff --git a/rest-api/core-rest/src/test/java/uk/gov/gchq/gaffer/rest/service/v2/OperationServiceV2IT.java b/rest-api/core-rest/src/test/java/uk/gov/gchq/gaffer/rest/service/v2/OperationServiceV2IT.java index ea5f944f055..59a0103c4d6 100644 --- a/rest-api/core-rest/src/test/java/uk/gov/gchq/gaffer/rest/service/v2/OperationServiceV2IT.java +++ b/rest-api/core-rest/src/test/java/uk/gov/gchq/gaffer/rest/service/v2/OperationServiceV2IT.java @@ -119,7 +119,7 @@ public void shouldReturnSameJobIdInHeaderAsGetAllJobDetailsOperation() throws IO // When final Response response = client.executeOperation(new GetAllJobDetails()); - + // System.out.println(response.readEntity(String.class)); // Then assertTrue(response.readEntity(String.class).contains(response.getHeaderString("job-id"))); }