diff --git a/pom.xml b/pom.xml
index 1846cc7..59b8359 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
- 0.1.0
+ 0.1.1
19.0
4.2.3
diff --git a/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
new file mode 100644
index 0000000..8d47d3c
--- /dev/null
+++ b/tef-impl/src/main/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogic.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright [2021] [The Original Author]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flipkart.tef.bizlogics;
+
+import flipkart.tef.exception.TefExecutionException;
+
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * A DataAdapter which emits a future. During the injection phase the caller can either
+ * 1) do a blocking get (thread will stall till the result is available)
+ * 2) do a done and get check (to consume the result only if its available)
+ * 3) do a get with timeout (to wait for the result before consuming them)
+ *
+ * The semantics of the Generic Type parameter expect a `Future>` explicitly
+ * instead of just because during the flow building phase, the complete signature of the
+ * generic type interface needs to be present at impl or superclass.
+ * Only taking the final result type as input from implementation classes (for the value of generic parameter)
+ * will break that contract.
+ *
+ * Since the flow builder uses reflection to get generic params to know what will the return type of data adapter,
+ * taking only final result type (say X) from implementation class, will appear to TEF as if
+ * the implementation classes return 'X' rather than the Future.
+ *
+ *
+ * Date: 18/06/22
+ * Time: 7:42 PM
+ */
+public abstract class AsyncDataAdapterBizlogic>, U> extends DataAdapterBizlogic {
+
+ private final ThreadPoolExecutor threadPoolExecutor;
+ private final boolean bubbleException;
+
+ /**
+ * @param threadPoolExecutor Threadpool executor to which to task will be submitted
+ */
+ public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor) {
+ this(threadPoolExecutor, false);
+ }
+
+ /**
+ * @param threadPoolExecutor Threadpool executor to which to task will be submitted
+ * @param bubbleException if true, any exception thrown as part of computing the result will be rethrown,
+ * else `Optional.empty` will be returned.
+ */
+ public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor, boolean bubbleException) {
+ this.threadPoolExecutor = threadPoolExecutor;
+ this.bubbleException = bubbleException;
+ }
+
+ @Override
+ public final T adapt(final TefContext tefContext) {
+ /*
+ Submit the task to the threadpool
+ The `bubbleException` flag will be used to decide the behavior in case of an exception,
+ either to return an empty value, or rethrow the exception
+ */
+ return (T) threadPoolExecutor.submit(() -> getResultImpl(tefContext));
+ }
+
+ /**
+ * This method should compute and return the result. The flow execution will not be blocked on this method.
+ * i.e. This method will run in async
+ *
+ * @param tefContext TefContext for callers
+ * @return The result
+ * @throws TefExecutionException The retriability error codes are not honoured when executing this bizlogic
+ */
+ public abstract U getResult(TefContext tefContext) throws TefExecutionException;
+
+ private Optional getResultImpl(TefContext tefContext) throws Exception {
+ try {
+ U result = getResult(tefContext);
+ return Optional.ofNullable(result);
+ } catch (Exception e) {
+ // Catch-all block to minimize side effects
+ if (bubbleException) {
+ throw e;
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+}
diff --git a/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java b/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
index 105876c..733f176 100644
--- a/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
+++ b/tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
@@ -212,7 +212,7 @@ private void convertDataDependencyToBizLogicDependency() {
}
Class extends IDataBizlogic>> adapter = dataAdapterMap.get(injectedData);
Preconditions.checkArgument(adapter != null, String.format(Messages.DATA_ADAPTER_NOT_RESOLVED_FOR,
- injectedData.getResultClass().getCanonicalName()));
+ injectedData.getResultClass().getCanonicalName(), entry.getKey().getName()));
addDependencies(entry.getKey(), adapter);
}
}
@@ -244,7 +244,7 @@ private void populateDataAdapterMap(Class extends IBizlogic> bizlogic) {
throw new AdapterConflictRuntimeException(returnType, bizlogic, dataAdapterMap.get(key));
}
}
- } catch (AdapterConflictRuntimeException|UnableToResolveDataFromAdapterRuntimeException e) {
+ } catch (AdapterConflictRuntimeException | UnableToResolveDataFromAdapterRuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -266,7 +266,7 @@ private Class> getReturnTypeFromBizlogicUsingSunApi(Class extends DataAdapte
// This could be a case of a data adapter being a subclass of another
// data adapter where type parameter is specified on the superclass
if(DataAdapterBizlogic.class.isAssignableFrom(dataAdapterBizLogic.getSuperclass())) {
- return getReturnTypeFromBizlogicUsingSunApi((Class extends DataAdapterBizlogic>>)dataAdapterBizLogic.getSuperclass(), classHierarchy);
+ return getReturnTypeFromBizlogicUsingSunApi((Class extends DataAdapterBizlogic>>) dataAdapterBizLogic.getSuperclass(), classHierarchy);
}
}
@@ -278,7 +278,7 @@ static class Messages {
public static final String A_BIZLOGIC_CANNOT_DEPEND_ON_SELF = "A bizlogic cannot depend on Self";
public static final String MORE_THAN_1_DEPENDS_ON_ANNOTATIONS_FOUND = "More than 1 @DependsOn annotations found";
public static final String COULD_NOT_DEDUCE_THE_STARTING_STEP = "Could not deduce the starting step";
- public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s";
+ public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s in bizlogic %s";
}
private void addDependencies(Class extends IBizlogic> bizlogic, Class extends IBizlogic>... dependencies) {
diff --git a/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
new file mode 100644
index 0000000..e371a07
--- /dev/null
+++ b/tef-impl/src/test/java/flipkart/tef/bizlogics/AsyncDataAdapterBizlogicTest.java
@@ -0,0 +1,246 @@
+package flipkart.tef.bizlogics;
+
+import com.google.common.collect.Queues;
+import flipkart.tef.TestTefContext;
+import flipkart.tef.annotations.EmitData;
+import flipkart.tef.annotations.InjectData;
+import flipkart.tef.exception.TefExecutionException;
+import flipkart.tef.execution.DataContext;
+import flipkart.tef.execution.DataDependencyException;
+import flipkart.tef.execution.FlowExecutor;
+import flipkart.tef.execution.FluentCapabilityBuilder;
+import flipkart.tef.execution.MyFlowExecutionListener;
+import flipkart.tef.flow.SimpleFlow;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AsyncDataAdapterBizlogicTest {
+
+ private static ThreadPoolExecutor threadPoolExecutor;
+
+ private FluentCapabilityBuilder flowBuilder;
+
+ @Before
+ public void setUp() {
+ flowBuilder = new FluentCapabilityBuilder();
+ BlockingQueue blockingQueue = Queues.newArrayBlockingQueue(10);
+ threadPoolExecutor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, blockingQueue, (r, executor) -> {
+ r.run();
+ });
+ }
+
+ @Test
+ public void testAsyncDataInjection() throws TefExecutionException, DataDependencyException, IllegalAccessException, InstantiationException {
+
+
+ flowBuilder.withAdapter(Sample1AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample2AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample3AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample4AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample5AsyncDataAdapterBizlogic.class);
+ flowBuilder.withAdapter(Sample6AsyncDataAdapterBizlogic.class);
+ flowBuilder.withBizlogic(SimpleBizlogic.class);
+ SimpleFlow dataflow = flowBuilder.dataflow();
+
+ assertEquals(7, dataflow.getBizlogics().size());
+ assertTrue(dataflow.getBizlogics().contains(Sample1AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample2AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample3AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample4AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample5AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(Sample6AsyncDataAdapterBizlogic.class));
+ assertTrue(dataflow.getBizlogics().contains(SimpleBizlogic.class));
+
+ DataContext dataContext = new DataContext();
+ FlowExecutor executor = new FlowExecutor(dataflow, dataContext, new TestTefContext());
+
+ MyFlowExecutionListener listener = new MyFlowExecutionListener();
+ executor.addListener(listener);
+ executor.execute();
+ }
+
+ static class SimpleBizlogic implements IBizlogic {
+
+ // Sleeps and returns
+ @InjectData(name = "1")
+ Future> asyncResult1;
+
+ // Sleeps and returns with bubbleException=true, but does not throw an exception
+ @InjectData(name = "2")
+ Future> asyncResult2;
+
+ // throws exception with bubbleException=true
+ @InjectData(name = "3")
+ Future> asyncResult3;
+
+ // throws exception without bubbleException=true
+ @InjectData(name = "4")
+ Future> asyncResult4;
+
+ // returns null with bubbleException=true
+ @InjectData(name = "5")
+ Future> asyncResult5;
+
+ // returns null with bubbleException=false
+ @InjectData(name = "6")
+ Future> asyncResult6;
+
+ @Override
+ public void execute(TefContext tefContext) throws TefExecutionException {
+ // 100ms sleep in workers ensures result is not ready instantly
+ assertFalse(asyncResult1.isDone());
+ assertFalse(asyncResult2.isDone());
+ assertFalse(asyncResult3.isDone());
+ assertFalse(asyncResult4.isDone());
+ assertFalse(asyncResult5.isDone());
+ assertFalse(asyncResult6.isDone());
+
+ // 200ms sleep ensures results are ready
+ sleep(200);
+ assertTrue(asyncResult1.isDone());
+ assertTrue(asyncResult2.isDone());
+ assertTrue(asyncResult3.isDone());
+ assertTrue(asyncResult4.isDone());
+ assertTrue(asyncResult5.isDone());
+ assertTrue(asyncResult6.isDone());
+
+ try {
+ // assert on results
+ assertEquals("1", asyncResult1.get().get());
+ assertEquals("2", asyncResult2.get().get());
+ // #4 does not return a result since it throws an exception
+ assertFalse(asyncResult4.get().isPresent());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+
+ try {
+ // #3 throws an exception with bubbleException=true
+ asyncResult3.get();
+ fail("Runtime exception was expected");
+ } catch (RuntimeException | InterruptedException | ExecutionException e) {
+ assertEquals("java.lang.RuntimeException: 3", e.getMessage());
+ }
+
+ try {
+ // irrespective of bubbleException state,
+ // data adapters returning nulls, should land as empty data injections
+ assertFalse(asyncResult5.get().isPresent());
+ assertFalse(asyncResult6.get().isPresent());
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ }
+ }
+
+ static void sleep(long t) {
+ try {
+ Thread.sleep(t);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ static void sleep() {
+ sleep(100);
+ }
+
+
+ @EmitData(name = "1")
+ static class Sample1AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample1AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return "1";
+ }
+ }
+
+ @EmitData(name = "2")
+ static class Sample2AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample2AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return "2";
+ }
+ }
+
+ @EmitData(name = "3")
+ static class Sample3AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample3AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ throw new RuntimeException("3");
+ }
+ }
+
+ @EmitData(name = "4")
+ static class Sample4AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample4AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ throw new RuntimeException("4");
+ }
+ }
+
+ @EmitData(name = "5")
+ static class Sample5AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample5AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return null;
+ }
+ }
+
+ @EmitData(name = "6")
+ static class Sample6AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic>, String> {
+
+ public Sample6AsyncDataAdapterBizlogic() {
+ super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
+ }
+
+ @Override
+ public String getResult(TefContext tefContext) throws TefExecutionException {
+ sleep();
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java b/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
index 8f4e179..310f9d8 100644
--- a/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
+++ b/tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
@@ -308,7 +308,7 @@ public void testDataDependencyAbsentInFlow() {
flowBuilder.build();
fail("Validation Error was expected");
} catch (IllegalArgumentException e) {
- assertEquals("Data Adapter not resolved for flipkart.tef.execution.FlowBuilderTest.SimpleData", e.getMessage());
+ assertEquals("Data Adapter not resolved for flipkart.tef.execution.FlowBuilderTest.SimpleData in bizlogic flipkart.tef.execution.FlowBuilderTest$SimpleEnricher2", e.getMessage());
}
}