From e16aad05e16a34a6252663fc4475a62bb4369140 Mon Sep 17 00:00:00 2001
From: bageshwar <2353296+bageshwar@users.noreply.github.com>
Date: Mon, 20 Jun 2022 06:46:19 +0530
Subject: [PATCH 1/4] Async Data Adapters
---
.../bizlogics/AsyncDataAdapterBizlogic.java | 94 +++++++++
.../flipkart/tef/execution/FlowBuilder.java | 8 +-
.../AsyncDataAdapterBizlogicTest.java | 191 ++++++++++++++++++
.../tef/execution/FlowBuilderTest.java | 2 +-
4 files changed, 290 insertions(+), 5 deletions(-)
create mode 100644 tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
create mode 100644 tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
diff --git a/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
new file mode 100644
index 0000000..0939f16
--- /dev/null
+++ b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright [2021] [The Original Author]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flipkart.tef.bizlogics;
+
+import flipkart.tef.exception.TefExecutionException;
+
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * A DataAdapter which emits a future. During the injection phase the caller can either
+ * 1) do a blocking get (thread will stall till the result is available)
+ * 2) do a done and get check (to consume the result only if its available)
+ * 3) do a get with timeout (to wait for the result before consuming them)
+ *
+ * The semantics of the Generic Type parameter expect a `Future>` explicitly
+ * because during the flow building phase, reflection is used to figure the Data Injection metadata, and at that time
+ * the complete signature of the generic type interface needs to be present at impl or superclass.
+ * Only taking the final result type as generic parameter from implementation classes will break that contract.
+ *
+ * Date: 18/06/22
+ * Time: 7:42 PM
+ */
+public abstract class AsyncDataAdapterBizlogic>, U> extends DataAdapterBizlogic {
+
+ private final ThreadPoolExecutor threadPoolExecutor;
+ private final boolean bubbleException;
+
+ /**
+ * @param threadPoolExecutor Threadpool executor to which to task will be submitted
+ */
+ public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor) {
+ this(threadPoolExecutor, false);
+ }
+
+ /**
+ * @param threadPoolExecutor Threadpool executor to which to task will be submitted
+ * @param bubbleException if true, any exception thrown as part of computing the result will be rethrown,
+ * else `Optional.empty` will be returned.
+ */
+ public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor, boolean bubbleException) {
+ this.threadPoolExecutor = threadPoolExecutor;
+ this.bubbleException = bubbleException;
+ }
+
+ @Override
+ public final T adapt(final TefContext tefContext) {
+ /*
+ Submit the task to the threadpool
+ The `bubbleException` flag will be used to decide the behavior in case of an exception,
+ either to return an empty value, or rethrow the exception
+ */
+ return (T) threadPoolExecutor.submit(() -> getResultImpl(tefContext));
+ }
+
+ /**
+ * This method should compute and return the result. The flow execution will not be blocked on this method.
+ * i.e. This method will run in async
+ *
+ * @param tefContext TefContext for callers
+ * @return The result
+ * @throws TefExecutionException The retriability error codes are not honoured when executing this bizlogic
+ */
+ public abstract U getResult(TefContext tefContext) throws TefExecutionException;
+
+ private Optional getResultImpl(TefContext tefContext) throws Exception {
+ try {
+ U result = getResult(tefContext);
+ return Optional.of(result);
+ } catch (Exception e) {
+ // Catch-all block to minimize side effects
+ if (bubbleException) {
+ throw e;
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+}
diff --git a/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java b/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
index 105876c..733f176 100644
--- a/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
+++ b/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
@@ -212,7 +212,7 @@ private void convertDataDependencyToBizLogicDependency() {
}
Class extends IDataBizlogic>> adapter = dataAdapterMap.get(injectedData);
Preconditions.checkArgument(adapter != null, String.format(Messages.DATA_ADAPTER_NOT_RESOLVED_FOR,
- injectedData.getResultClass().getCanonicalName()));
+ injectedData.getResultClass().getCanonicalName(), entry.getKey().getName()));
addDependencies(entry.getKey(), adapter);
}
}
@@ -244,7 +244,7 @@ private void populateDataAdapterMap(Class extends IBizlogic> bizlogic) {
throw new AdapterConflictRuntimeException(returnType, bizlogic, dataAdapterMap.get(key));
}
}
- } catch (AdapterConflictRuntimeException|UnableToResolveDataFromAdapterRuntimeException e) {
+ } catch (AdapterConflictRuntimeException | UnableToResolveDataFromAdapterRuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -266,7 +266,7 @@ private Class> getReturnTypeFromBizlogicUsingSunApi(Class extends DataAdapte
// This could be a case of a data adapter being a subclass of another
// data adapter where type parameter is specified on the superclass
if(DataAdapterBizlogic.class.isAssignableFrom(dataAdapterBizLogic.getSuperclass())) {
- return getReturnTypeFromBizlogicUsingSunApi((Class extends DataAdapterBizlogic>>)dataAdapterBizLogic.getSuperclass(), classHierarchy);
+ return getReturnTypeFromBizlogicUsingSunApi((Class extends DataAdapterBizlogic>>) dataAdapterBizLogic.getSuperclass(), classHierarchy);
}
}
@@ -278,7 +278,7 @@ static class Messages {
public static final String A_BIZLOGIC_CANNOT_DEPEND_ON_SELF = "A bizlogic cannot depend on Self";
public static final String MORE_THAN_1_DEPENDS_ON_ANNOTATIONS_FOUND = "More than 1 @DependsOn annotations found";
public static final String COULD_NOT_DEDUCE_THE_STARTING_STEP = "Could not deduce the starting step";
- public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s";
+ public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s in bizlogic %s";
}
private void addDependencies(Class extends IBizlogic> bizlogic, Class extends IBizlogic>... dependencies) {
diff --git a/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
new file mode 100644
index 0000000..9804695
--- /dev/null
+++ b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
@@ -0,0 +1,191 @@
+package flipkart.tef.bizlogics;
+
+import com.google.common.collect.Queues;
+import flipkart.tef.TestTefContext;
+import flipkart.tef.annotations.EmitData;
+import flipkart.tef.annotations.InjectData;
+import flipkart.tef.exception.TefExecutionException;
+import flipkart.tef.execution.DataContext;
+import flipkart.tef.execution.DataDependencyException;
+import flipkart.tef.execution.FlowExecutor;
+import flipkart.tef.execution.FluentCapabilityBuilder;
+import flipkart.tef.execution.MyFlowExecutionListener;
+import flipkart.tef.flow.SimpleFlow;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AsyncDataAdapterBizlogicTest {
+
+ private static ThreadPoolExecutor threadPoolExecutor;
+
+ private FluentCapabilityBuilder flowBuilder;
+
+ @Before
+ public void setUp() {
+ flowBuilder = new FluentCapabilityBuilder();
+ BlockingQueue blockingQueue = Queues.newArrayBlockingQueue(5);
+ threadPoolExecutor = new ThreadPoolExecutor(5, 5, 100, TimeUnit.MILLISECONDS, blockingQueue, (r, executor) -> {
+ r.run();
+ });
+ }
+
+ @Test
+ public void testAsyncDataInjection() throws TefExecutionException, DataDependencyException, IllegalAccessException, InstantiationException {
+
+
+ flowBuilder.withAdapter(Sample1AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample2AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample3AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample4AsyncDataAdapterBizlogic.class);
+ flowBuilder.withBizlogic(SimpleBizlogic.class);
+ SimpleFlow dataflow = flowBuilder.dataflow();
+
+ assertEquals(5, dataflow.getBizlogics().size());
+ assertTrue(dataflow.getBizlogics().contains(Sample1AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample2AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample3AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample4AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(SimpleBizlogic.class));
+
+ DataContext dataContext = new DataContext();
+ FlowExecutor executor = new FlowExecutor(dataflow, dataContext, new TestTefContext());
+
+ MyFlowExecutionListener listener = new MyFlowExecutionListener();
+ executor.addListener(listener);
+ executor.execute();
+ }
+
+ static class SimpleBizlogic implements IBizlogic {
+
+ // Sleeps and returns
+ @InjectData(name = "1")
+ Future> asyncResult1;
+
+ // Sleeps and returns with bubbleException=true, but does not throw an exception
+ @InjectData(name = "2")
+ Future> asyncResult2;
+
+ // throws exception with bubbleException=true
+ @InjectData(name = "3")
+ Future> asyncResult3;
+
+ // throws exception without bubbleException=true
+ @InjectData(name = "4")
+ Future> asyncResult4;
+
+ @Override
+ public void execute(TefContext tefContext) throws TefExecutionException {
+ // 100ms sleep in workers ensures result is not ready instantly
+ assertFalse(asyncResult1.isDone());
+ assertFalse(asyncResult2.isDone());
+ assertFalse(asyncResult3.isDone());
+ assertFalse(asyncResult4.isDone());
+
+ // 200ms sleep ensures results are ready
+ sleep(200);
+ assertTrue(asyncResult1.isDone());
+ assertTrue(asyncResult2.isDone());
+ assertTrue(asyncResult3.isDone());
+ assertTrue(asyncResult4.isDone());
+
+ try {
+ // assert on results
+ assertEquals("1", asyncResult1.get().get());
+ assertEquals("2", asyncResult2.get().get());
+ // #4 does not return a result since it throws an exception
+ assertFalse(asyncResult4.get().isPresent());
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ try {
+ // #3 throws an exception with bubbleException=3
+ asyncResult3.get();
+ fail("Runtime exception was expected");
+ } catch (RuntimeException | InterruptedException | ExecutionException e) {
+ assertEquals("java.lang.RuntimeException: 3", e.getMessage());
+ }
+ }
+ }
+
+ static void sleep(long t) {
+ try {
+ Thread.sleep(t);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ static void sleep() {
+ sleep(100);
+ }
+
+
+ @EmitData(name = "1")
+ static class Sample1AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample1AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return "1";
+ }
+ }
+
+ @EmitData(name = "2")
+ static class Sample2AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample2AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return "2";
+ }
+ }
+
+ @EmitData(name = "3")
+ static class Sample3AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample3AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ throw new RuntimeException("3");
+ }
+ }
+
+ @EmitData(name = "4")
+ static class Sample4AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample4AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ throw new RuntimeException("4");
+ }
+ }
+}
\ No newline at end of file
diff --git a/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java b/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
index 8f4e179..310f9d8 100644
--- a/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
+++ b/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
@@ -308,7 +308,7 @@ public void testDataDependencyAbsentInFlow() {
flowBuilder.build();
fail("Validation Error was expected");
} catch (IllegalArgumentException e) {
- assertEquals("Data Adapter not resolved for flipkart.tef.execution.FlowBuilderTest.SimpleData", e.getMessage());
+ assertEquals("Data Adapter not resolved for flipkart.tef.execution.FlowBuilderTest.SimpleData in bizlogic flipkart.tef.execution.FlowBuilderTest$SimpleEnricher2", e.getMessage());
}
}
From 335b762ebecd5a183f10a5bf3df358a942137e66 Mon Sep 17 00:00:00 2001
From: bageshwar <2353296+bageshwar@users.noreply.github.com>
Date: Mon, 20 Jun 2022 09:14:35 +0530
Subject: [PATCH 2/4] version bump up
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 1846cc7..59b8359 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
- 0.1.0
+ 0.1.1
19.0
4.2.3
From 83bb4010db09f6fe5c325bf21ed3fcbe55ab2400 Mon Sep 17 00:00:00 2001
From: bageshwar <2353296+bageshwar@users.noreply.github.com>
Date: Mon, 20 Jun 2022 12:26:50 +0530
Subject: [PATCH 3/4] Comments fix.
---
.../tef/bizlogics/AsyncDataAdapterBizlogic.java | 14 ++++++++++----
.../bizlogics/AsyncDataAdapterBizlogicTest.java | 2 +-
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
index 0939f16..d763ee3 100644
--- a/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
+++ b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
@@ -28,10 +28,16 @@
* 2) do a done and get check (to consume the result only if its available)
* 3) do a get with timeout (to wait for the result before consuming them)
*
- * The semantics of the Generic Type parameter expect a `Future>` explicitly
- * because during the flow building phase, reflection is used to figure the Data Injection metadata, and at that time
- * the complete signature of the generic type interface needs to be present at impl or superclass.
- * Only taking the final result type as generic parameter from implementation classes will break that contract.
+ * The semantics of the Generic Type parameter expect a `Future>` explicitly
+ * instead of just because during the flow building phase, the complete signature of the
+ * generic type interface needs to be present at impl or superclass.
+ * Only taking the final result type as input from implementation classes (for the value of generic parameter)
+ * will break that contract.
+ *
+ * Since the flow builder uses reflection to get generic params to know what will the return type of data adapter,
+ * taking only final result type (say X) from implementation class, will appear to TEF as if
+ * the implementation classes return 'X' rather than the Future.
+ *
*
* Date: 18/06/22
* Time: 7:42 PM
diff --git a/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
index 9804695..61e367b 100644
--- a/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
+++ b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
@@ -111,7 +111,7 @@ public void execute(TefContext tefContext) throws TefExecutionException {
}
try {
- // #3 throws an exception with bubbleException=3
+ // #3 throws an exception with bubbleException=true
asyncResult3.get();
fail("Runtime exception was expected");
} catch (RuntimeException | InterruptedException | ExecutionException e) {
From d50baa25a09d441858e29344fc93e4390f522953 Mon Sep 17 00:00:00 2001
From: bageshwar <2353296+bageshwar@users.noreply.github.com>
Date: Mon, 20 Jun 2022 13:15:08 +0530
Subject: [PATCH 4/4] Handle data adapters returning null.
---
.../bizlogics/AsyncDataAdapterBizlogic.java | 2 +-
.../AsyncDataAdapterBizlogicTest.java | 61 ++++++++++++++++++-
2 files changed, 59 insertions(+), 4 deletions(-)
diff --git a/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
index d763ee3..8d47d3c 100644
--- a/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
+++ b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
@@ -87,7 +87,7 @@ public final T adapt(final TefContext tefContext) {
private Optional getResultImpl(TefContext tefContext) throws Exception {
try {
U result = getResult(tefContext);
- return Optional.of(result);
+ return Optional.ofNullable(result);
} catch (Exception e) {
// Catch-all block to minimize side effects
if (bubbleException) {
diff --git a/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
index 61e367b..e371a07 100644
--- a/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
+++ b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
@@ -35,8 +35,8 @@ public class AsyncDataAdapterBizlogicTest {
@Before
public void setUp() {
flowBuilder = new FluentCapabilityBuilder();
- BlockingQueue blockingQueue = Queues.newArrayBlockingQueue(5);
- threadPoolExecutor = new ThreadPoolExecutor(5, 5, 100, TimeUnit.MILLISECONDS, blockingQueue, (r, executor) -> {
+ BlockingQueue blockingQueue = Queues.newArrayBlockingQueue(10);
+ threadPoolExecutor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, blockingQueue, (r, executor) -> {
r.run();
});
}
@@ -49,14 +49,18 @@ public void testAsyncDataInjection() throws TefExecutionException, DataDependenc
flowBuilder.withAdapter(Sample2AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample3AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample4AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample5AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample6AsyncDataAdapterBizlogic.class);
flowBuilder.withBizlogic(SimpleBizlogic.class);
SimpleFlow dataflow = flowBuilder.dataflow();
- assertEquals(5, dataflow.getBizlogics().size());
+ assertEquals(7, dataflow.getBizlogics().size());
assertTrue(dataflow.getBizlogics().contains(Sample1AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample2AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample3AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample4AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample5AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample6AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(SimpleBizlogic.class));
DataContext dataContext = new DataContext();
@@ -85,6 +89,14 @@ static class SimpleBizlogic implements IBizlogic {
@InjectData(name = "4")
Future> asyncResult4;
+ // returns null with bubbleException=true
+ @InjectData(name = "5")
+ Future> asyncResult5;
+
+ // returns null with bubbleException=false
+ @InjectData(name = "6")
+ Future> asyncResult6;
+
@Override
public void execute(TefContext tefContext) throws TefExecutionException {
// 100ms sleep in workers ensures result is not ready instantly
@@ -92,6 +104,8 @@ public void execute(TefContext tefContext) throws TefExecutionException {
assertFalse(asyncResult2.isDone());
assertFalse(asyncResult3.isDone());
assertFalse(asyncResult4.isDone());
+ assertFalse(asyncResult5.isDone());
+ assertFalse(asyncResult6.isDone());
// 200ms sleep ensures results are ready
sleep(200);
@@ -99,6 +113,8 @@ public void execute(TefContext tefContext) throws TefExecutionException {
assertTrue(asyncResult2.isDone());
assertTrue(asyncResult3.isDone());
assertTrue(asyncResult4.isDone());
+ assertTrue(asyncResult5.isDone());
+ assertTrue(asyncResult6.isDone());
try {
// assert on results
@@ -107,6 +123,7 @@ public void execute(TefContext tefContext) throws TefExecutionException {
// #4 does not return a result since it throws an exception
assertFalse(asyncResult4.get().isPresent());
} catch (Exception e) {
+ e.printStackTrace();
fail("Unexpected exception");
}
@@ -117,6 +134,16 @@ public void execute(TefContext tefContext) throws TefExecutionException {
} catch (RuntimeException | InterruptedException | ExecutionException e) {
assertEquals("java.lang.RuntimeException: 3", e.getMessage());
}
+
+ try {
+ // irrespective of bubbleException state,
+ // data adapters returning nulls, should land as empty data injections
+ assertFalse(asyncResult5.get().isPresent());
+ assertFalse(asyncResult6.get().isPresent());
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
}
}
@@ -188,4 +215,32 @@ public String getResult(TefContext tefContext) throws TefExecutionException {
throw new RuntimeException("4");
}
}
+
+ @EmitData(name = "5")
+ static class Sample5AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample5AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return null;
+ }
+ }
+
+ @EmitData(name = "6")
+ static class Sample6AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample6AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return null;
+ }
+ }
}
\ No newline at end of file