Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh-3342: Handle different groups in View federated poc #3343

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.gov.gchq.gaffer.tinkerpop;

import org.apache.commons.collections4.IterableUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Property;
Expand All @@ -36,6 +37,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -215,8 +217,7 @@ private Vertex getVertex(final GafferPopVertex vertex) {
.build())
.build();

Iterable<? extends Element> result = graph().execute(findBasedOnID);

final Set<Element> result = new HashSet<>(IterableUtils.toList(graph().execute(findBasedOnID)));
final GafferPopElementGenerator generator = new GafferPopElementGenerator(graph());

Optional<Vertex> foundEntity = StreamSupport.stream(result.spliterator(), false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.graph.OperationView;
import uk.gov.gchq.gaffer.store.schema.Schema;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Utility class with static methods to help support the federated store.
*/
public final class FederatedUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedUtils.class);

private FederatedUtils() {
// utility class
}

/**
* Checks if graphs share any groups between their schemas.
*
* @param graphs Graphs to check.
* @return Do they share groups.
*/
public static boolean doGraphsShareGroups(final List<GraphSerialisable> graphs) {
// Check if any of the graphs have common groups in their schemas
List<Schema> schemas = graphs.stream()
.map(GraphSerialisable::getSchema)
.collect(Collectors.toList());

// Compare all schemas against each other
for (int i = 0; i < schemas.size() - 1; i++) {
for (int j = i + 1; j < schemas.size(); j++) {
// Compare each schema against the others to see if common groups
if (!Collections.disjoint(schemas.get(i).getGroups(), schemas.get(j).getGroups())) {
LOGGER.debug("Found common schema groups between requested graphs");
return true;
}
}
}
return false;
}

/**
* Get a version of the operation chain that satisfies the schema for the
* requested graph. This will ensure the operation will pass validation on
* the sub graph end as there is no universal way to skip validation.
*
* @param operation The operation.
* @param graphSerialisable The graph.
* @param depth Current recursion depth of this method.
* @param depthLimit Limit to the recursion depth.
* @return A valid version of the operation chain.
*/
public static OperationChain getValidOperationForGraph(final Operation operation, final GraphSerialisable graphSerialisable, final int depth, final int depthLimit) {
LOGGER.debug("Creating valid operation for graph, depth is: {}", depth);
final Collection<Operation> updatedOperations = new ArrayList<>();

// Fix the view so it will pass schema validation
if (operation instanceof OperationView
&& ((OperationView) operation).getView() != null
&& ((OperationView) operation).getView().hasGroups()) {

// Update the view for the graph
((OperationView) operation).setView(
getValidViewForGraph(((OperationView) operation).getView(), graphSerialisable));

updatedOperations.add(operation);

// Recursively go into operation chains to make sure everything is fixed
} else if (operation instanceof OperationChain) {
for (final Operation op : ((OperationChain<?>) operation).getOperations()) {
// Resolve if haven't hit the depth limit for validation
if (depth < depthLimit) {
updatedOperations.addAll(getValidOperationForGraph(op, graphSerialisable, depth + 1, depthLimit).getOperations());
} else {
LOGGER.warn(

Check warning on line 106 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java#L106

Added line #L106 was not covered by tests
"Hit depth limit of {} making the operation valid for graph. The View may be invalid for Graph: {}",
depthLimit,
graphSerialisable.getGraphId());
updatedOperations.add(op);

Check warning on line 110 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java#L108-L110

Added lines #L108 - L110 were not covered by tests
}
}
} else {
updatedOperations.add(operation);
}

// Create and return the fixed chain for the graph
OperationChain<?> newChain = new OperationChain<>();
newChain.setOptions(operation.getOptions());
newChain.updateOperations(updatedOperations);

return newChain;
}

/**
* Returns a {@link View} that contains groups only relevant to the graph. If
* the supplied view does not require modification it will just be returned.
*
* @param view The view to make valid.
* @param graphSerialisable The relevant graph.
* @return A version of the view valid for the graph.
*/
public static View getValidViewForGraph(final View view, final GraphSerialisable graphSerialisable) {
final Schema schema = graphSerialisable.getSchema();

// Figure out all the groups relevant to the graph
final Set<String> validEntities = new HashSet<>(view.getEntityGroups());
final Set<String> validEdges = new HashSet<>(view.getEdgeGroups());
validEntities.retainAll(schema.getEntityGroups());
validEdges.retainAll(schema.getEdgeGroups());

if (!validEntities.equals(view.getEntityGroups()) || !validEdges.equals(view.getEdgeGroups())) {
// Need to make changes to the view so start by cloning the view
// and clearing all the edges and entities
final View.Builder builder = new View.Builder()
.merge(view)
.entities(Collections.emptyMap())
.edges(Collections.emptyMap());
validEntities.forEach(e -> builder.entity(e, view.getEntity(e)));
validEdges.forEach(e -> builder.edge(e, view.getEdge(e)));
final View newView = builder.build();
// If the View has no groups left after fixing then this is likely an issue so throw
if (!newView.hasEntities() && !newView.hasEdges()) {
throw new IllegalArgumentException(String.format(
"No groups specified in View are relevant to Graph: '%1$s'. " +
"Please refine your Graphs/View or specify following option to skip execution on offending Graph: '%2$s' ",
graphSerialisable.getGraphId(),
FederatedOperationHandler.OPT_SKIP_FAILED_EXECUTE));
}
return newView;
}

// Nothing to do return unmodified view
return view;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
Expand Down Expand Up @@ -90,9 +91,16 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_SEPARATE_RESULTS = "federated.separateResults";

/**
* Depth should go to when making an operation chain relevant to specified
* graphs e.g. fix the View.
*/
public static final String OPT_FIX_OP_LIMIT = "federated.fixOperationLimit";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
Expand All @@ -112,20 +120,17 @@ public Object doOperation(final P operation, final Context context, final Store
// Execute the operation chain on each graph
for (final GraphSerialisable gs : graphsToExecute) {
try {
gs.getGraph().execute(operation, context.getUser());
} catch (final OperationException | UnsupportedOperationException e) {
gs.getGraph().execute(
FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit),
context.getUser());
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;
}
LOGGER.info("Continuing operation execution on sub graphs");
} catch (final IllegalArgumentException e) {
// An operation may fail validation for a sub graph this is not really an error.
// We can just continue to execute on the rest of the graphs
LOGGER.warn("Operation contained invalid arguments for a sub graph, skipped execution on graph: {}", gs.getGraphId());
}

}

// Assume no output, we've already checked above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.federated.simple.merge.DefaultResultAccumulator;
import uk.gov.gchq.gaffer.federated.simple.merge.FederatedResultAccumulator;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
Expand All @@ -44,8 +46,10 @@ public class FederatedOutputHandler<P extends Output<O>, O>

@Override
public O doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

// No-op
if (graphsToExecute.isEmpty()) {
return null;
}
Expand All @@ -54,18 +58,15 @@ public O doOperation(final P operation, final Context context, final Store store
List<O> graphResults = new ArrayList<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
graphResults.add(gs.getGraph().execute(operation, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit);
graphResults.add(gs.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;
}
LOGGER.info("Continuing operation execution on sub graphs");
} catch (final IllegalArgumentException e) {
// An operation may fail validation for a sub graph this is not really an error.
// We can just continue to execute on the rest of the graphs
LOGGER.warn("Operation contained invalid arguments for a sub graph, skipped execution on graph: {}", gs.getGraphId());
}
}

Expand All @@ -80,15 +81,45 @@ public O doOperation(final P operation, final Context context, final Store store
combinedProps.putAll(operation.getOptions());
}

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator = getResultAccumulator((FederatedStore) store, operation, graphsToExecute);

// Should now have a list of <O> objects so need to reduce to just one
return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0));
}


/**
* Sets up a {@link FederatedResultAccumulator} for the specified operation
* and graphs.
*
* @param store The federated store.
* @param operation The original operation.
* @param graphsToExecute The graphs executed on.
* @return A set up accumulator.
*/
protected FederatedResultAccumulator<O> getResultAccumulator(final FederatedStore store, final P operation, final List<GraphSerialisable> graphsToExecute) {
// Merge the store props with the operation options for setting up the
// accumulator
Properties combinedProps = store.getProperties().getProperties();
if (operation.getOptions() != null) {
combinedProps.putAll(operation.getOptions());
}

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator = new DefaultResultAccumulator<>(combinedProps);
resultAccumulator.setSchema(((FederatedStore) store).getSchema(graphsToExecute));
resultAccumulator.setSchema(store.getSchema(graphsToExecute));

// Check if user has specified to aggregate
if (operation.containsOption(OPT_AGGREGATE_ELEMENTS)) {
resultAccumulator.setAggregateElements(Boolean.parseBoolean(operation.getOption(OPT_AGGREGATE_ELEMENTS)));
}
// Should now have a list of <O> objects so need to reduce to just one
return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0));
// Turn aggregation off if there are no shared groups
if (!FederatedUtils.doGraphsShareGroups(graphsToExecute)) {
resultAccumulator.setAggregateElements(false);
}

return resultAccumulator;
}

}
Loading
Loading