Skip to content

Commit

Permalink
Merge pull request #10 from flipkart-incubator/reliance
Browse files Browse the repository at this point in the history
Support for Optional Data Dependency
  • Loading branch information
bageshwar authored Sep 19, 2022
2 parents eb3d7ae + 64bcad1 commit 00660e6
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
</build>

<properties>
<revision>0.1.3</revision>
<revision>0.1.4</revision>
<guava.version>19.0</guava.version>
<guice.version>4.2.3</guice.version>
</properties>
Expand Down
99 changes: 88 additions & 11 deletions tef-impl/src/main/java/flipkart/tef/execution/FlowBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -48,20 +49,46 @@
/**
* Flow Builder takes in the list of bizlogics with any explicit dependencies and exclusions.
* Any implicit dependencies are discovered and a Execution DAG is generated.
*
*
* <p>
* <p>
* Date: 19/06/20
* Time: 5:02 PM
*/
public class FlowBuilder {
private final List<Class<? extends IBizlogic>> bizlogics;

/**
* A map keyed a bizlogic where the value represents the list of bizlogics
* that are dependent on the keyed bizlogic
*/
private final Multimap<Class<? extends IBizlogic>, Class<? extends IBizlogic>> bizlogicDependencyMap;

/**
* A map keyed by a bizlogic where the value represents the list of bizlogics
* that take the keyed bizlogic as a dependency
*/
private final Multimap<Class<? extends IBizlogic>, Class<? extends IBizlogic>> reverseBizlogicDependencyMap;
private final Multimap<Class<? extends IBizlogic>, DataAdapterKey<?>> dataDependencyMap;

/**
* A map keyed by a bizlogic where the value represents the list of data dependencies that the key has.
*/
private final Multimap<Class<? extends IBizlogic>, DataDependencyDetail> dataDependencyMap;

/**
* A 2-way map where the tuple represents the Data against the DataAdapter that emits it.
*/
private final BiMap<DataAdapterKey<?>, Class<? extends IDataBizlogic<?>>> dataAdapterMap;

private final Set<DataAdapterKey<?>> implicitDataBindings;
private final Set<Class<? extends IBizlogic>> excludedBizlogics;

/**
* This is used to sort the order of execution of bizlogic for the starting class.
* The logic defines any bizlogics with 0 dependencies to get picked first.
* This comparator ensures that set of bizlogics has a predictable order.
*/
private static final Comparator<Class<? extends IBizlogic>> classNameComparator = new ClassNameComparator();

FlowBuilder() {
bizlogics = new ArrayList<>();
bizlogicDependencyMap = ArrayListMultimap.create();
Expand Down Expand Up @@ -105,7 +132,7 @@ BiMap<DataAdapterKey<?>, Class<? extends IDataBizlogic<?>>> getDataAdapterMap()
}

@VisibleForTesting
Multimap<Class<? extends IBizlogic>, DataAdapterKey<?>> getDataDependencyMap() {
Multimap<Class<? extends IBizlogic>, DataDependencyDetail> getDataDependencyMap() {
return dataDependencyMap;
}

Expand Down Expand Up @@ -160,6 +187,8 @@ SimpleFlow build() {
}

Preconditions.checkArgument(startNodes.size() > 0, Messages.COULD_NOT_DEDUCE_THE_STARTING_STEP);
// To ensure we get a predictable start
startNodes.sort(classNameComparator);

// Clone for Mutation
Multimap<Class<? extends IBizlogic>, Class<? extends IBizlogic>> clonedBizlogicDependencyMap = ArrayListMultimap.create(bizlogicDependencyMap);
Expand All @@ -176,7 +205,8 @@ SimpleFlow build() {
}
}

Preconditions.checkArgument(bizlogicsInFlow.size() == bizlogics.size(), Messages.CYCLIC_GRAPHS_ARE_NOT_SUPPORTED);
Preconditions.checkArgument(bizlogicsInFlow.size() == bizlogics.size(),
String.format(Messages.CYCLIC_GRAPHS_ARE_NOT_SUPPORTED, clonedBizlogicDependencyMap));

return new SimpleFlow(bizlogicsInFlow, dataAdapterMap);
}
Expand All @@ -201,8 +231,9 @@ private void handleControlDependency(Class<? extends IBizlogic> bizlogic) {

private void convertDataDependencyToBizLogicDependency() {

for (Map.Entry<Class<? extends IBizlogic>, Collection<DataAdapterKey<?>>> entry : dataDependencyMap.asMap().entrySet()) {
for (DataAdapterKey<?> injectedData : entry.getValue()) {
for (Map.Entry<Class<? extends IBizlogic>, Collection<DataDependencyDetail>> entry : dataDependencyMap.asMap().entrySet()) {
for (DataDependencyDetail dependencyDetail : entry.getValue()) {
DataAdapterKey<?> injectedData = dependencyDetail.getDataAdapterKey();
if (implicitDataBindings.contains(injectedData)) {
/**
* If there are implicit bindings - i.e. data that will be passed as part of the data context
Expand All @@ -211,7 +242,15 @@ private void convertDataDependencyToBizLogicDependency() {
continue;
}
Class<? extends IDataBizlogic<?>> adapter = dataAdapterMap.get(injectedData);

// If there is an optional dependency, we don't have to fail if the data adapter is missing
// During injection, null would be injected in the bizlogic as expected.
if (dependencyDetail.getInjection().optional() && adapter == null) {
continue;
}

Preconditions.checkArgument(adapter != null, String.format(Messages.DATA_ADAPTER_NOT_RESOLVED_FOR,
injectedData.getName(),
injectedData.getResultClass().getCanonicalName(), entry.getKey().getName()));
addDependencies(entry.getKey(), adapter);
}
Expand Down Expand Up @@ -274,11 +313,11 @@ private Class<?> getReturnTypeFromBizlogicUsingSunApi(Class<? extends DataAdapte
}

static class Messages {
public static final String CYCLIC_GRAPHS_ARE_NOT_SUPPORTED = "Cyclic Graphs are not supported";
public static final String CYCLIC_GRAPHS_ARE_NOT_SUPPORTED = "Cyclic Graphs are not supported. Dependency chain: %s";
public static final String A_BIZLOGIC_CANNOT_DEPEND_ON_SELF = "A bizlogic cannot depend on Self";
public static final String MORE_THAN_1_DEPENDS_ON_ANNOTATIONS_FOUND = "More than 1 @DependsOn annotations found";
public static final String COULD_NOT_DEDUCE_THE_STARTING_STEP = "Could not deduce the starting step";
public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s in bizlogic %s";
public static final String DATA_ADAPTER_NOT_RESOLVED_FOR = "Data Adapter not resolved for %s %s in bizlogic %s";
}

private void addDependencies(Class<? extends IBizlogic> bizlogic, Class<? extends IBizlogic>... dependencies) {
Expand All @@ -301,9 +340,47 @@ private void handleDataDependency(Class<? extends IBizlogic> bizlogic) {

for (Field field : fieldList) {
InjectData injectable = field.getAnnotation(InjectData.class);
if (injectable != null && !injectable.optional()) {
dataDependencyMap.put(bizlogic, new DataAdapterKey<>(injectable.name(), field.getType()));
if (injectable != null) {
dataDependencyMap.put(bizlogic, new DataDependencyDetail(injectable, new DataAdapterKey<>(injectable.name(), field.getType())));
}
}
}

/**
* To sort classes in lexicographic order
*/
private static class ClassNameComparator implements Comparator<Class<? extends IBizlogic>> {
@Override
public int compare(Class<? extends IBizlogic> o1, Class<? extends IBizlogic> o2) {
return o2.getName().compareTo(o1.getName());
}
}

static class DataDependencyDetail {
private final InjectData injection;
private final DataAdapterKey<?> dataAdapterKey;

public DataDependencyDetail(InjectData injection, DataAdapterKey<?> dataAdapterKey) {
this.injection = injection;
this.dataAdapterKey = dataAdapterKey;
}

@Override
public boolean equals(Object o) {
return dataAdapterKey.equals(o);
}

@Override
public int hashCode() {
return dataAdapterKey.hashCode();
}

public InjectData getInjection() {
return injection;
}

public DataAdapterKey<?> getDataAdapterKey() {
return dataAdapterKey;
}
}
}
90 changes: 85 additions & 5 deletions tef-impl/src/test/java/flipkart/tef/execution/FlowBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ public void testBizlogicDataDependency() {
assertTrue(flowBuilder.getBizlogics().size() == 2);
assertTrue(flowBuilder.getBizlogics().contains(SimpleEnricher2.class));
assertTrue(flowBuilder.getDataDependencyMap().get(SimpleEnricher2.class).size() == 1);
assertTrue(flowBuilder.getDataDependencyMap().get(SimpleEnricher2.class).contains(getResultKey(SimpleData.class)));
assertTrue(flowBuilder.getDataDependencyMap().get(SimpleEnricher2.class).stream()
.findFirst().get().getDataAdapterKey()
.equals(getResultKey(SimpleData.class)));
}

@Test
Expand All @@ -157,16 +159,22 @@ public void testCyclicDependency() {
FlowBuilder flowBuilder = new FlowBuilder();
flowBuilder.add(SimpleValidator2.class);
flowBuilder.add(SimpleValidator4.class);
flowBuilder.add(SimpleDataAdapter.class);
flowBuilder.add(SimpleEnricher2.class);

assertEquals(2, flowBuilder.getBizlogics().size());
assertEquals(4, flowBuilder.getBizlogics().size());
assertTrue(flowBuilder.getBizlogics().contains(SimpleValidator2.class));
assertTrue(flowBuilder.getBizlogics().contains(SimpleValidator4.class));
assertTrue(flowBuilder.getBizlogics().contains(SimpleDataAdapter.class));
assertTrue(flowBuilder.getBizlogics().contains(SimpleEnricher2.class));

try {
flowBuilder.build();
fail("Validation Error was expected");
} catch (IllegalArgumentException e) {
assertEquals(FlowBuilder.Messages.CYCLIC_GRAPHS_ARE_NOT_SUPPORTED, e.getMessage());
assertTrue(e.getMessage().contains("Cyclic Graphs are not supported."));
assertTrue(e.getMessage().contains("flipkart.tef.execution.FlowBuilderTest$SimpleValidator3"));
assertTrue(e.getMessage().contains("flipkart.tef.execution.FlowBuilderTest$SimpleValidator2"));
}
}

Expand All @@ -186,7 +194,9 @@ public void testCyclicDataDependency() {
flowBuilder.build();
fail("Validation Error was expected");
} catch (IllegalArgumentException e) {
assertEquals(FlowBuilder.Messages.CYCLIC_GRAPHS_ARE_NOT_SUPPORTED, e.getMessage());
assertTrue(e.getMessage().contains("Cyclic Graphs are not supported."));
assertTrue(e.getMessage().contains("flipkart.tef.execution.FlowBuilderTest$DataAdapter2"));
assertTrue(e.getMessage().contains("flipkart.tef.execution.FlowBuilderTest$DataAdapter1"));
}
}

Expand Down Expand Up @@ -308,7 +318,23 @@ public void testDataDependencyAbsentInFlow() {
flowBuilder.build();
fail("Validation Error was expected");
} catch (IllegalArgumentException e) {
assertEquals("Data Adapter not resolved for flipkart.tef.execution.FlowBuilderTest.SimpleData in bizlogic flipkart.tef.execution.FlowBuilderTest$SimpleEnricher2", e.getMessage());
assertEquals("Data Adapter not resolved for flipkart.tef.execution.FlowBuilderTest.SimpleData in bizlogic flipkart.tef.execution.FlowBuilderTest$SimpleEnricher2", e.getMessage());
}
}

@Test
public void testNamedDataDependencyAbsentInFlow() {
FlowBuilder flowBuilder = new FlowBuilder();
flowBuilder.add(SimpleValidator7.class);

assertEquals(1, flowBuilder.getBizlogics().size());
assertTrue(flowBuilder.getBizlogics().contains(SimpleValidator7.class));

try {
flowBuilder.build();
fail("Validation Error was expected");
} catch (IllegalArgumentException e) {
assertEquals("Data Adapter not resolved for simpleData flipkart.tef.execution.FlowBuilderTest.SimpleData in bizlogic flipkart.tef.execution.FlowBuilderTest$SimpleValidator7", e.getMessage());
}
}

Expand Down Expand Up @@ -368,6 +394,24 @@ public void testDataAdapterExtensionWithoutTypeParam() {
}
}

/**
* This test creates a scenario where there is am ambiguity on the start node that will be picked (B or C)
* Since both of them have 0 dependencies. Test then asserts that the
* lexicographical order of the class name is picked
*/
@Test
public void testSortingAtStart() {
FlowBuilder flowBuilder = new FlowBuilder();
flowBuilder.add(BizlogicA.class);
flowBuilder.add(BizlogicB.class);
flowBuilder.add(BizlogicC.class);
SimpleFlow simpleFlow = flowBuilder.build();
assertEquals(3, simpleFlow.getBizlogics().size());
assertEquals(BizlogicB.class, simpleFlow.getBizlogics().get(0));
assertEquals(BizlogicC.class, simpleFlow.getBizlogics().get(1));
assertEquals(BizlogicA.class, simpleFlow.getBizlogics().get(2));
}


class SimpleData extends MapBaseData {

Expand Down Expand Up @@ -457,6 +501,17 @@ public void execute(TefContext tefContext) {
}
}

class SimpleValidator7 implements IBizlogic {

@InjectData(name = "simpleData")
private SimpleData simpleData;

@Override
public void execute(TefContext tefContext) {

}
}

class CyclicData1 {

}
Expand Down Expand Up @@ -529,4 +584,29 @@ public Object adapt(TefContext tefContext) throws TefExecutionException {
}
}

@DependsOn(BizlogicC.class)
class BizlogicA implements IBizlogic {

@Override
public void execute(TefContext tefContext) throws TefExecutionException {

}
}

class BizlogicB implements IBizlogic {

@Override
public void execute(TefContext tefContext) throws TefExecutionException {

}
}

class BizlogicC implements IBizlogic {

@Override
public void execute(TefContext tefContext) throws TefExecutionException {

}
}

}

0 comments on commit 00660e6

Please sign in to comment.