Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Async Data Adapters #6

Merged
merged 4 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
</build>

<properties>
<revision>0.1.0</revision>
<revision>0.1.1</revision>
<guava.version>19.0</guava.version>
<guice.version>4.2.3</guice.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright [2021] [The Original Author]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package flipkart.tef.bizlogics;

import flipkart.tef.exception.TefExecutionException;

import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
* A DataAdapter which emits a future. During the injection phase the caller can either
* 1) do a blocking get (thread will stall till the result is available)
* 2) do a done and get check (to consume the result only if its available)
* 3) do a get with timeout (to wait for the result before consuming them)
* <p>
* The semantics of the Generic Type parameter expect a `Future<Optional<X>>` explicitly
* instead of just <X> because during the flow building phase, the complete signature of the
* generic type interface needs to be present at impl or superclass.
* Only taking the final result type as input from implementation classes (for the value of generic parameter)
* will break that contract.
* <p>
* Since the flow builder uses reflection to get generic params to know what will the return type of data adapter,
* taking only final result type (say X) from implementation class, will appear to TEF as if
* the implementation classes return 'X' rather than the Future.
* <p>
* <p>
* Date: 18/06/22
* Time: 7:42 PM
*/
public abstract class AsyncDataAdapterBizlogic<T extends Future<Optional<U>>, U> extends DataAdapterBizlogic<T> {

private final ThreadPoolExecutor threadPoolExecutor;
private final boolean bubbleException;

/**
* @param threadPoolExecutor Threadpool executor to which to task will be submitted
*/
public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor) {
this(threadPoolExecutor, false);
}

/**
* @param threadPoolExecutor Threadpool executor to which to task will be submitted
* @param bubbleException if true, any exception thrown as part of computing the result will be rethrown,
* else `Optional.empty` will be returned.
*/
public AsyncDataAdapterBizlogic(ThreadPoolExecutor threadPoolExecutor, boolean bubbleException) {
this.threadPoolExecutor = threadPoolExecutor;
this.bubbleException = bubbleException;
}

@Override
public final T adapt(final TefContext tefContext) {
/*
Submit the task to the threadpool
The `bubbleException` flag will be used to decide the behavior in case of an exception,
either to return an empty value, or rethrow the exception
*/
return (T) threadPoolExecutor.submit(() -> getResultImpl(tefContext));
}

/**
* This method should compute and return the result. The flow execution will not be blocked on this method.
* i.e. This method will run in async
*
* @param tefContext TefContext for callers
* @return The result
* @throws TefExecutionException The retriability error codes are not honoured when executing this bizlogic
*/
public abstract U getResult(TefContext tefContext) throws TefExecutionException;

private Optional<U> getResultImpl(TefContext tefContext) throws Exception {
try {
U result = getResult(tefContext);
return Optional.ofNullable(result);
} catch (Exception e) {
// Catch-all block to minimize side effects
if (bubbleException) {
throw e;
} else {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void convertDataDependencyToBizLogicDependency() {
}
Class<? extends IDataBizlogic<?>> adapter = dataAdapterMap.get(injectedData);
Preconditions.checkArgument(adapter != null, String.format(Messages.DATA_ADAPTER_NOT_RESOLVED_FOR,
injectedData.getResultClass().getCanonicalName()));
injectedData.getResultClass().getCanonicalName(), entry.getKey().getName()));
addDependencies(entry.getKey(), adapter);
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ private void populateDataAdapterMap(Class<? extends IBizlogic> bizlogic) {
throw new AdapterConflictRuntimeException(returnType, bizlogic, dataAdapterMap.get(key));
}
}
} catch (AdapterConflictRuntimeException|UnableToResolveDataFromAdapterRuntimeException e) {
} catch (AdapterConflictRuntimeException | UnableToResolveDataFromAdapterRuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -266,7 +266,7 @@ private Class<?> getReturnTypeFromBizlogicUsingSunApi(Class<? extends DataAdapte
// This could be a case of a data adapter being a subclass of another
// data adapter where type parameter is specified on the superclass
if(DataAdapterBizlogic.class.isAssignableFrom(dataAdapterBizLogic.getSuperclass())) {
return getReturnTypeFromBizlogicUsingSunApi((Class<? extends DataAdapterBizlogic<?>>)dataAdapterBizLogic.getSuperclass(), classHierarchy);
return getReturnTypeFromBizlogicUsingSunApi((Class<? extends DataAdapterBizlogic<?>>) dataAdapterBizLogic.getSuperclass(), classHierarchy);
}
}

Expand All @@ -278,7 +278,7 @@ static class Messages {
public static final String A_BIZLOGIC_CANNOT_DEPEND_ON_SELF = "A bizlogic cannot depend on Self";
public static final String MORE_THAN_1_DEPENDS_ON_ANNOTATIONS_FOUND = "More than 1 @DependsOn annotations found";
public static final String COULD_NOT_DEDUCE_THE_STARTING_STEP = "Could not deduce the starting step";
public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s";
public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s in bizlogic %s";
}

private void addDependencies(Class<? extends IBizlogic> bizlogic, Class<? extends IBizlogic>... dependencies) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package flipkart.tef.bizlogics;

import com.google.common.collect.Queues;
import flipkart.tef.TestTefContext;
import flipkart.tef.annotations.EmitData;
import flipkart.tef.annotations.InjectData;
import flipkart.tef.exception.TefExecutionException;
import flipkart.tef.execution.DataContext;
import flipkart.tef.execution.DataDependencyException;
import flipkart.tef.execution.FlowExecutor;
import flipkart.tef.execution.FluentCapabilityBuilder;
import flipkart.tef.execution.MyFlowExecutionListener;
import flipkart.tef.flow.SimpleFlow;
import org.junit.Before;
import org.junit.Test;

import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class AsyncDataAdapterBizlogicTest {

private static ThreadPoolExecutor threadPoolExecutor;

private FluentCapabilityBuilder flowBuilder;

@Before
public void setUp() {
flowBuilder = new FluentCapabilityBuilder();
BlockingQueue<Runnable> blockingQueue = Queues.newArrayBlockingQueue(10);
threadPoolExecutor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, blockingQueue, (r, executor) -> {
r.run();
});
}

@Test
public void testAsyncDataInjection() throws TefExecutionException, DataDependencyException, IllegalAccessException, InstantiationException {


flowBuilder.withAdapter(Sample1AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample2AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample3AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample4AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample5AsyncDataAdapterBizlogic.class);
flowBuilder.withAdapter(Sample6AsyncDataAdapterBizlogic.class);
flowBuilder.withBizlogic(SimpleBizlogic.class);
SimpleFlow dataflow = flowBuilder.dataflow();

assertEquals(7, dataflow.getBizlogics().size());
assertTrue(dataflow.getBizlogics().contains(Sample1AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample2AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample3AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample4AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample5AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(Sample6AsyncDataAdapterBizlogic.class));
assertTrue(dataflow.getBizlogics().contains(SimpleBizlogic.class));

DataContext dataContext = new DataContext();
FlowExecutor executor = new FlowExecutor(dataflow, dataContext, new TestTefContext());

MyFlowExecutionListener listener = new MyFlowExecutionListener();
executor.addListener(listener);
executor.execute();
}

static class SimpleBizlogic implements IBizlogic {

// Sleeps and returns
@InjectData(name = "1")
Future<Optional<String>> asyncResult1;

// Sleeps and returns with bubbleException=true, but does not throw an exception
@InjectData(name = "2")
Future<Optional<String>> asyncResult2;

// throws exception with bubbleException=true
@InjectData(name = "3")
Future<Optional<String>> asyncResult3;

// throws exception without bubbleException=true
@InjectData(name = "4")
Future<Optional<String>> asyncResult4;

// returns null with bubbleException=true
@InjectData(name = "5")
Future<Optional<String>> asyncResult5;

// returns null with bubbleException=false
@InjectData(name = "6")
Future<Optional<String>> asyncResult6;

@Override
public void execute(TefContext tefContext) throws TefExecutionException {
// 100ms sleep in workers ensures result is not ready instantly
assertFalse(asyncResult1.isDone());
assertFalse(asyncResult2.isDone());
assertFalse(asyncResult3.isDone());
assertFalse(asyncResult4.isDone());
assertFalse(asyncResult5.isDone());
assertFalse(asyncResult6.isDone());

// 200ms sleep ensures results are ready
sleep(200);
assertTrue(asyncResult1.isDone());
assertTrue(asyncResult2.isDone());
assertTrue(asyncResult3.isDone());
assertTrue(asyncResult4.isDone());
assertTrue(asyncResult5.isDone());
assertTrue(asyncResult6.isDone());

try {
// assert on results
assertEquals("1", asyncResult1.get().get());
assertEquals("2", asyncResult2.get().get());
// #4 does not return a result since it throws an exception
assertFalse(asyncResult4.get().isPresent());
} catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}

try {
// #3 throws an exception with bubbleException=true
asyncResult3.get();
fail("Runtime exception was expected");
} catch (RuntimeException | InterruptedException | ExecutionException e) {
assertEquals("java.lang.RuntimeException: 3", e.getMessage());
}

try {
// irrespective of bubbleException state,
// data adapters returning nulls, should land as empty data injections
assertFalse(asyncResult5.get().isPresent());
assertFalse(asyncResult6.get().isPresent());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
fail("Unexpected exception");
}
}
}

static void sleep(long t) {
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static void sleep() {
sleep(100);
}


@EmitData(name = "1")
static class Sample1AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample1AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return "1";
}
}

@EmitData(name = "2")
static class Sample2AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample2AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return "2";
}
}

@EmitData(name = "3")
static class Sample3AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample3AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
throw new RuntimeException("3");
}
}

@EmitData(name = "4")
static class Sample4AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample4AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
throw new RuntimeException("4");
}
}

@EmitData(name = "5")
static class Sample5AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample5AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor, true);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return null;
}
}

@EmitData(name = "6")
static class Sample6AsyncDataAdapterBizlogic extends AsyncDataAdapterBizlogic<Future<Optional<String>>, String> {

public Sample6AsyncDataAdapterBizlogic() {
super(AsyncDataAdapterBizlogicTest.threadPoolExecutor);
}

@Override
public String getResult(TefContext tefContext) throws TefExecutionException {
sleep();
return null;
}
}
}
Loading