Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tb06904 committed Dec 4, 2024
1 parent 5417985 commit a99dad8
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.gov.gchq.gaffer.tinkerpop;

import org.apache.commons.collections4.IterableUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Property;
Expand All @@ -36,6 +37,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -215,8 +217,7 @@ private Vertex getVertex(final GafferPopVertex vertex) {
.build())
.build();

Iterable<? extends Element> result = graph().execute(findBasedOnID);

final Set<Element> result = new HashSet<>(IterableUtils.toList(graph().execute(findBasedOnID)));
final GafferPopElementGenerator generator = new GafferPopElementGenerator(graph());

Optional<Vertex> foundEntity = StreamSupport.stream(result.spliterator(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
Expand Down Expand Up @@ -77,9 +78,10 @@ public static boolean doGraphsShareGroups(final List<GraphSerialisable> graphs)
* @param operation The operation.
* @param graphSerialisable The graph.
* @param depth Current recursion depth of this method.
* @param depthLimit Limit to the recursion depth.
* @return A valid version of the operation chain.
*/
public static OperationChain getValidOperationForGraph(final Operation operation, final GraphSerialisable graphSerialisable, final int depth) {
public static OperationChain getValidOperationForGraph(final Operation operation, final GraphSerialisable graphSerialisable, final int depth, final int depthLimit) {
LOGGER.debug("Creating valid operation for graph, depth is: {}", depth);
final Collection<Operation> updatedOperations = new ArrayList<>();

Expand All @@ -98,10 +100,13 @@ public static OperationChain getValidOperationForGraph(final Operation operation
} else if (operation instanceof OperationChain) {
for (final Operation op : ((OperationChain<?>) operation).getOperations()) {
// Resolve if haven't hit the depth limit for validation
if (depth < 5) {
updatedOperations.addAll(getValidOperationForGraph(op, graphSerialisable, depth + 1).getOperations());
if (depth < depthLimit) {
updatedOperations.addAll(getValidOperationForGraph(op, graphSerialisable, depth + 1, depthLimit).getOperations());
} else {
LOGGER.warn("Hit depth limit of 5 whilst making the operation valid for graph. The View may be invalid for Graph: {}", graphSerialisable.getGraphId());
LOGGER.warn(
"Hit depth limit of {} making the operation valid for graph. The View may be invalid for Graph: {}",
depthLimit,
graphSerialisable.getGraphId());
updatedOperations.add(op);
}
}
Expand Down Expand Up @@ -138,11 +143,20 @@ public static View getValidViewForGraph(final View view, final GraphSerialisable
// Need to make changes to the view so start by cloning the view
// and clearing all the edges and entities
final View.Builder builder = new View.Builder()
.merge(view)
.entities(Collections.emptyMap())
.edges(Collections.emptyMap());
.merge(view)
.entities(Collections.emptyMap())
.edges(Collections.emptyMap());
validEntities.forEach(e -> builder.entity(e, view.getEntity(e)));
validEdges.forEach(e -> builder.edge(e, view.getEdge(e)));
final View newView = builder.build();
// If the View has no groups left after fixing then this is likely an issue so throw
if (!newView.hasEntities() && !newView.hasEdges()) {
throw new IllegalArgumentException(String.format(
"No groups specified in View are relevant to Graph: '%1$s'. " +
"Please refine your Graphs/View or specify following option to skip execution on offending Graph: '%2$s' ",
graphSerialisable.getGraphId(),
FederatedOperationHandler.OPT_SKIP_FAILED_EXECUTE));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_SEPARATE_RESULTS = "federated.separateResults";

/**
* Depth should go to when making an operation chain relevant to specified
* graphs e.g. fix the View.
*/
public static final String OPT_FIX_OP_LIMIT = "federated.fixOperationLimit";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
Expand All @@ -113,7 +120,9 @@ public Object doOperation(final P operation, final Context context, final Store
// Execute the operation chain on each graph
for (final GraphSerialisable gs : graphsToExecute) {
try {
gs.getGraph().execute(FederatedUtils.getValidOperationForGraph(operation, gs, 0), context.getUser());
gs.getGraph().execute(
FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit),
context.getUser());
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public class FederatedOutputHandler<P extends Output<O>, O>

@Override
public O doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

// No-op
if (graphsToExecute.isEmpty()) {
return null;
}
Expand All @@ -56,7 +58,7 @@ public O doOperation(final P operation, final Context context, final Store store
List<O> graphResults = new ArrayList<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0);
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit);
graphResults.add(gs.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
Expand All @@ -79,9 +81,34 @@ public O doOperation(final P operation, final Context context, final Store store
combinedProps.putAll(operation.getOptions());
}

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator = getResultAccumulator((FederatedStore) store, operation, graphsToExecute);

// Should now have a list of <O> objects so need to reduce to just one
return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0));
}


/**
* Sets up a {@link FederatedResultAccumulator} for the specified operation
* and graphs.
*
* @param store The federated store.
* @param operation The original operation.
* @param graphsToExecute The graphs executed on.
* @return A set up accumulator.
*/
protected FederatedResultAccumulator<O> getResultAccumulator(final FederatedStore store, final P operation, final List<GraphSerialisable> graphsToExecute) {
// Merge the store props with the operation options for setting up the
// accumulator
Properties combinedProps = store.getProperties().getProperties();
if (operation.getOptions() != null) {
combinedProps.putAll(operation.getOptions());
}

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator = new DefaultResultAccumulator<>(combinedProps);
resultAccumulator.setSchema(((FederatedStore) store).getSchema(graphsToExecute));
resultAccumulator.setSchema(store.getSchema(graphsToExecute));

// Check if user has specified to aggregate
if (operation.containsOption(OPT_AGGREGATE_ELEMENTS)) {
Expand All @@ -92,8 +119,7 @@ public O doOperation(final P operation, final Context context, final Store store
resultAccumulator.setAggregateElements(false);
}

// Should now have a list of <O> objects so need to reduce to just one
return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0));
return resultAccumulator;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import uk.gov.gchq.gaffer.store.schema.SchemaEntityDefinition;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -64,6 +65,25 @@ void shouldRemoveGroupFromViewIfNotInSchema() {
assertThat(fixedView.getEdgeGroups()).containsOnly(edgeInSchema);
}

@Test
void shouldPreventExecutionIfNoGroupsInViewAreRelevant() {
// Given
View testView = new View.Builder()
.entity("entityNotInSchema")
.edge("edgeNotInSchema")
.build();
GraphSerialisable graph = new GraphSerialisable(
new GraphConfig("test"),
new Schema.Builder()
.entity("entityInSchema", new SchemaEntityDefinition())
.edge("edgeInSchema", new SchemaEdgeDefinition()).build(),
new StoreProperties());

// When
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> FederatedUtils.getValidViewForGraph(testView, graph));
}

@Test
void shouldChangeViewsOfNestedOperations() {
String entityInSchema = "entityInSchema";
Expand Down Expand Up @@ -95,7 +115,7 @@ void shouldChangeViewsOfNestedOperations() {
.build();

// Get a fixed operation chain
List<Operation> newChain = FederatedUtils.getValidOperationForGraph(nestedViewChain, graph, 0).flatten();
List<Operation> newChain = FederatedUtils.getValidOperationForGraph(nestedViewChain, graph, 0, 5).flatten();
List<OperationView> fixedOperations = newChain.stream()
.filter(op -> op instanceof OperationView)
.map(op -> (OperationView) op)
Expand Down

0 comments on commit a99dad8

Please sign in to comment.