Skip to content

Commit

Permalink
Gh-3342: Handle different groups in View federated poc (#3343)
Browse files Browse the repository at this point in the history
* New utils class

* integrate checks for views and schema compatibility

* update unit testing for new class

* tidy

* address comments

* typo
  • Loading branch information
tb06904 authored Dec 5, 2024
1 parent 7726ff3 commit 1a91fcf
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.gov.gchq.gaffer.tinkerpop;

import org.apache.commons.collections4.IterableUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Property;
Expand All @@ -36,6 +37,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -215,8 +217,7 @@ private Vertex getVertex(final GafferPopVertex vertex) {
.build())
.build();

Iterable<? extends Element> result = graph().execute(findBasedOnID);

final Set<Element> result = new HashSet<>(IterableUtils.toList(graph().execute(findBasedOnID)));
final GafferPopElementGenerator generator = new GafferPopElementGenerator(graph());

Optional<Vertex> foundEntity = StreamSupport.stream(result.spliterator(), false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.graph.OperationView;
import uk.gov.gchq.gaffer.store.schema.Schema;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Utility class with static methods to help support the federated store.
*/
public final class FederatedUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedUtils.class);

private FederatedUtils() {
// utility class
}

/**
* Checks if graphs share any groups between their schemas.
*
* @param graphs Graphs to check.
* @return Do they share groups.
*/
public static boolean doGraphsShareGroups(final List<GraphSerialisable> graphs) {
// Check if any of the graphs have common groups in their schemas
List<Schema> schemas = graphs.stream()
.map(GraphSerialisable::getSchema)
.collect(Collectors.toList());

// Compare all schemas against each other
for (int i = 0; i < schemas.size() - 1; i++) {
for (int j = i + 1; j < schemas.size(); j++) {
// Compare each schema against the others to see if common groups
if (!Collections.disjoint(schemas.get(i).getGroups(), schemas.get(j).getGroups())) {
LOGGER.debug("Found common schema groups between requested graphs");
return true;
}
}
}
return false;
}

/**
* Get a version of the operation chain that satisfies the schema for the
* requested graph. This will ensure the operation will pass validation on
* the sub graph end as there is no universal way to skip validation.
*
* @param operation The operation.
* @param graphSerialisable The graph.
* @param depth Current recursion depth of this method.
* @param depthLimit Limit to the recursion depth.
* @return A valid version of the operation chain.
*/
public static OperationChain getValidOperationForGraph(final Operation operation, final GraphSerialisable graphSerialisable, final int depth, final int depthLimit) {
LOGGER.debug("Creating valid operation for graph, depth is: {}", depth);
final Collection<Operation> updatedOperations = new ArrayList<>();

// Fix the view so it will pass schema validation
if (operation instanceof OperationView
&& ((OperationView) operation).getView() != null
&& ((OperationView) operation).getView().hasGroups()) {

// Update the view for the graph
((OperationView) operation).setView(
getValidViewForGraph(((OperationView) operation).getView(), graphSerialisable));

updatedOperations.add(operation);

// Recursively go into operation chains to make sure everything is fixed
} else if (operation instanceof OperationChain) {
for (final Operation op : ((OperationChain<?>) operation).getOperations()) {
// Resolve if haven't hit the depth limit for validation
if (depth < depthLimit) {
updatedOperations.addAll(getValidOperationForGraph(op, graphSerialisable, depth + 1, depthLimit).getOperations());
} else {
LOGGER.warn(
"Hit depth limit of {} making the operation valid for graph. The View may be invalid for Graph: {}",
depthLimit,
graphSerialisable.getGraphId());
updatedOperations.add(op);
}
}
} else {
updatedOperations.add(operation);
}

// Create and return the fixed chain for the graph
OperationChain<?> newChain = new OperationChain<>();
newChain.setOptions(operation.getOptions());
newChain.updateOperations(updatedOperations);

return newChain;
}

/**
* Returns a {@link View} that contains groups only relevant to the graph. If
* the supplied view does not require modification it will just be returned.
*
* @param view The view to make valid.
* @param graphSerialisable The relevant graph.
* @return A version of the view valid for the graph.
*/
public static View getValidViewForGraph(final View view, final GraphSerialisable graphSerialisable) {
final Schema schema = graphSerialisable.getSchema();

// Figure out all the groups relevant to the graph
final Set<String> validEntities = new HashSet<>(view.getEntityGroups());
final Set<String> validEdges = new HashSet<>(view.getEdgeGroups());
validEntities.retainAll(schema.getEntityGroups());
validEdges.retainAll(schema.getEdgeGroups());

if (!validEntities.equals(view.getEntityGroups()) || !validEdges.equals(view.getEdgeGroups())) {
// Need to make changes to the view so start by cloning the view
// and clearing all the edges and entities
final View.Builder builder = new View.Builder()
.merge(view)
.entities(Collections.emptyMap())
.edges(Collections.emptyMap());
validEntities.forEach(e -> builder.entity(e, view.getEntity(e)));
validEdges.forEach(e -> builder.edge(e, view.getEdge(e)));
final View newView = builder.build();
// If the View has no groups left after fixing then this is likely an issue so throw
if (!newView.hasEntities() && !newView.hasEdges()) {
throw new IllegalArgumentException(String.format(
"No groups specified in View are relevant to Graph: '%1$s'. " +
"Please refine your Graphs/View or specify following option to skip execution on offending Graph: '%2$s' ",
graphSerialisable.getGraphId(),
FederatedOperationHandler.OPT_SKIP_FAILED_EXECUTE));
}
return newView;
}

// Nothing to do return unmodified view
return view;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
Expand Down Expand Up @@ -90,9 +91,16 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_SEPARATE_RESULTS = "federated.separateResults";

/**
* Depth should go to when making an operation chain relevant to specified
* graphs e.g. fix the View.
*/
public static final String OPT_FIX_OP_LIMIT = "federated.fixOperationLimit";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
Expand All @@ -112,20 +120,17 @@ public Object doOperation(final P operation, final Context context, final Store
// Execute the operation chain on each graph
for (final GraphSerialisable gs : graphsToExecute) {
try {
gs.getGraph().execute(operation, context.getUser());
} catch (final OperationException | UnsupportedOperationException e) {
gs.getGraph().execute(
FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit),
context.getUser());
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;
}
LOGGER.info("Continuing operation execution on sub graphs");
} catch (final IllegalArgumentException e) {
// An operation may fail validation for a sub graph this is not really an error.
// We can just continue to execute on the rest of the graphs
LOGGER.warn("Operation contained invalid arguments for a sub graph, skipped execution on graph: {}", gs.getGraphId());
}

}

// Assume no output, we've already checked above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.federated.simple.merge.DefaultResultAccumulator;
import uk.gov.gchq.gaffer.federated.simple.merge.FederatedResultAccumulator;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
Expand All @@ -44,8 +46,10 @@ public class FederatedOutputHandler<P extends Output<O>, O>

@Override
public O doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

// No-op
if (graphsToExecute.isEmpty()) {
return null;
}
Expand All @@ -54,18 +58,15 @@ public O doOperation(final P operation, final Context context, final Store store
List<O> graphResults = new ArrayList<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
graphResults.add(gs.getGraph().execute(operation, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit);
graphResults.add(gs.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;
}
LOGGER.info("Continuing operation execution on sub graphs");
} catch (final IllegalArgumentException e) {
// An operation may fail validation for a sub graph this is not really an error.
// We can just continue to execute on the rest of the graphs
LOGGER.warn("Operation contained invalid arguments for a sub graph, skipped execution on graph: {}", gs.getGraphId());
}
}

Expand All @@ -80,15 +81,45 @@ public O doOperation(final P operation, final Context context, final Store store
combinedProps.putAll(operation.getOptions());
}

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator = getResultAccumulator((FederatedStore) store, operation, graphsToExecute);

// Should now have a list of <O> objects so need to reduce to just one
return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0));
}


/**
* Sets up a {@link FederatedResultAccumulator} for the specified operation
* and graphs.
*
* @param store The federated store.
* @param operation The original operation.
* @param graphsToExecute The graphs executed on.
* @return A set up accumulator.
*/
protected FederatedResultAccumulator<O> getResultAccumulator(final FederatedStore store, final P operation, final List<GraphSerialisable> graphsToExecute) {
// Merge the store props with the operation options for setting up the
// accumulator
Properties combinedProps = store.getProperties().getProperties();
if (operation.getOptions() != null) {
combinedProps.putAll(operation.getOptions());
}

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator = new DefaultResultAccumulator<>(combinedProps);
resultAccumulator.setSchema(((FederatedStore) store).getSchema(graphsToExecute));
resultAccumulator.setSchema(store.getSchema(graphsToExecute));

// Check if user has specified to aggregate
if (operation.containsOption(OPT_AGGREGATE_ELEMENTS)) {
resultAccumulator.setAggregateElements(Boolean.parseBoolean(operation.getOption(OPT_AGGREGATE_ELEMENTS)));
}
// Should now have a list of <O> objects so need to reduce to just one
return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0));
// Turn aggregation off if there are no shared groups
if (!FederatedUtils.doGraphsShareGroups(graphsToExecute)) {
resultAccumulator.setAggregateElements(false);
}

return resultAccumulator;
}

}
Loading

0 comments on commit 1a91fcf

Please sign in to comment.