diff --git a/library/tinkerpop/src/main/java/uk/gov/gchq/gaffer/tinkerpop/GafferPopEdge.java b/library/tinkerpop/src/main/java/uk/gov/gchq/gaffer/tinkerpop/GafferPopEdge.java index be0a7fdf10c..a70792f6062 100644 --- a/library/tinkerpop/src/main/java/uk/gov/gchq/gaffer/tinkerpop/GafferPopEdge.java +++ b/library/tinkerpop/src/main/java/uk/gov/gchq/gaffer/tinkerpop/GafferPopEdge.java @@ -16,6 +16,7 @@ package uk.gov.gchq.gaffer.tinkerpop; +import org.apache.commons.collections4.IterableUtils; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Property; @@ -36,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Optional; @@ -215,8 +217,7 @@ private Vertex getVertex(final GafferPopVertex vertex) { .build()) .build(); - Iterable result = graph().execute(findBasedOnID); - + final Set result = new HashSet<>(IterableUtils.toList(graph().execute(findBasedOnID))); final GafferPopElementGenerator generator = new GafferPopElementGenerator(graph()); Optional foundEntity = StreamSupport.stream(result.spliterator(), false) diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java new file mode 100644 index 00000000000..e2ac2e20efb --- /dev/null +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtils.java @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.gov.gchq.gaffer.federated.simple; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import uk.gov.gchq.gaffer.data.elementdefinition.view.View; +import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler; +import uk.gov.gchq.gaffer.graph.GraphSerialisable; +import uk.gov.gchq.gaffer.operation.Operation; +import uk.gov.gchq.gaffer.operation.OperationChain; +import uk.gov.gchq.gaffer.operation.graph.OperationView; +import uk.gov.gchq.gaffer.store.schema.Schema; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class with static methods to help support the federated store. + */ +public final class FederatedUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(FederatedUtils.class); + + private FederatedUtils() { + // utility class + } + + /** + * Checks if graphs share any groups between their schemas. + * + * @param graphs Graphs to check. + * @return Do they share groups. + */ + public static boolean doGraphsShareGroups(final List graphs) { + // Check if any of the graphs have common groups in their schemas + List schemas = graphs.stream() + .map(GraphSerialisable::getSchema) + .collect(Collectors.toList()); + + // Compare all schemas against each other + for (int i = 0; i < schemas.size() - 1; i++) { + for (int j = i + 1; j < schemas.size(); j++) { + // Compare each schema against the others to see if common groups + if (!Collections.disjoint(schemas.get(i).getGroups(), schemas.get(j).getGroups())) { + LOGGER.debug("Found common schema groups between requested graphs"); + return true; + } + } + } + return false; + } + + /** + * Get a version of the operation chain that satisfies the schema for the + * requested graph. This will ensure the operation will pass validation on + * the sub graph end as there is no universal way to skip validation. + * + * @param operation The operation. + * @param graphSerialisable The graph. + * @param depth Current recursion depth of this method. + * @param depthLimit Limit to the recursion depth. + * @return A valid version of the operation chain. + */ + public static OperationChain getValidOperationForGraph(final Operation operation, final GraphSerialisable graphSerialisable, final int depth, final int depthLimit) { + LOGGER.debug("Creating valid operation for graph, depth is: {}", depth); + final Collection updatedOperations = new ArrayList<>(); + + // Fix the view so it will pass schema validation + if (operation instanceof OperationView + && ((OperationView) operation).getView() != null + && ((OperationView) operation).getView().hasGroups()) { + + // Update the view for the graph + ((OperationView) operation).setView( + getValidViewForGraph(((OperationView) operation).getView(), graphSerialisable)); + + updatedOperations.add(operation); + + // Recursively go into operation chains to make sure everything is fixed + } else if (operation instanceof OperationChain) { + for (final Operation op : ((OperationChain) operation).getOperations()) { + // Resolve if haven't hit the depth limit for validation + if (depth < depthLimit) { + updatedOperations.addAll(getValidOperationForGraph(op, graphSerialisable, depth + 1, depthLimit).getOperations()); + } else { + LOGGER.warn( + "Hit depth limit of {} making the operation valid for graph. The View may be invalid for Graph: {}", + depthLimit, + graphSerialisable.getGraphId()); + updatedOperations.add(op); + } + } + } else { + updatedOperations.add(operation); + } + + // Create and return the fixed chain for the graph + OperationChain newChain = new OperationChain<>(); + newChain.setOptions(operation.getOptions()); + newChain.updateOperations(updatedOperations); + + return newChain; + } + + /** + * Returns a {@link View} that contains groups only relevant to the graph. If + * the supplied view does not require modification it will just be returned. + * + * @param view The view to make valid. + * @param graphSerialisable The relevant graph. + * @return A version of the view valid for the graph. + */ + public static View getValidViewForGraph(final View view, final GraphSerialisable graphSerialisable) { + final Schema schema = graphSerialisable.getSchema(); + + // Figure out all the groups relevant to the graph + final Set validEntities = new HashSet<>(view.getEntityGroups()); + final Set validEdges = new HashSet<>(view.getEdgeGroups()); + validEntities.retainAll(schema.getEntityGroups()); + validEdges.retainAll(schema.getEdgeGroups()); + + if (!validEntities.equals(view.getEntityGroups()) || !validEdges.equals(view.getEdgeGroups())) { + // Need to make changes to the view so start by cloning the view + // and clearing all the edges and entities + final View.Builder builder = new View.Builder() + .merge(view) + .entities(Collections.emptyMap()) + .edges(Collections.emptyMap()); + validEntities.forEach(e -> builder.entity(e, view.getEntity(e))); + validEdges.forEach(e -> builder.edge(e, view.getEdge(e))); + final View newView = builder.build(); + // If the View has no groups left after fixing then this is likely an issue so throw + if (!newView.hasEntities() && !newView.hasEdges()) { + throw new IllegalArgumentException(String.format( + "No groups specified in View are relevant to Graph: '%1$s'. " + + "Please refine your Graphs/View or specify following option to skip execution on offending Graph: '%2$s' ", + graphSerialisable.getGraphId(), + FederatedOperationHandler.OPT_SKIP_FAILED_EXECUTE)); + } + return newView; + } + + // Nothing to do return unmodified view + return view; + } + +} diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java index 734fb3f7ee0..2f4e06932b8 100644 --- a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java @@ -22,6 +22,7 @@ import uk.gov.gchq.gaffer.cache.exception.CacheOperationException; import uk.gov.gchq.gaffer.federated.simple.FederatedStore; +import uk.gov.gchq.gaffer.federated.simple.FederatedUtils; import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess; import uk.gov.gchq.gaffer.graph.GraphSerialisable; import uk.gov.gchq.gaffer.operation.Operation; @@ -90,9 +91,16 @@ public class FederatedOperationHandler

implements Operation */ public static final String OPT_SEPARATE_RESULTS = "federated.separateResults"; + /** + * Depth should go to when making an operation chain relevant to specified + * graphs e.g. fix the View. + */ + public static final String OPT_FIX_OP_LIMIT = "federated.fixOperationLimit"; + @Override public Object doOperation(final P operation, final Context context, final Store store) throws OperationException { LOGGER.debug("Running operation: {}", operation); + final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5")); // If the operation has output wrap and return using sub class handler if (operation instanceof Output) { @@ -112,20 +120,17 @@ public Object doOperation(final P operation, final Context context, final Store // Execute the operation chain on each graph for (final GraphSerialisable gs : graphsToExecute) { try { - gs.getGraph().execute(operation, context.getUser()); - } catch (final OperationException | UnsupportedOperationException e) { + gs.getGraph().execute( + FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit), + context.getUser()); + } catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) { // Optionally skip this error if user has specified to do so LOGGER.error("Operation failed on graph: {}", gs.getGraphId()); if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) { throw e; } LOGGER.info("Continuing operation execution on sub graphs"); - } catch (final IllegalArgumentException e) { - // An operation may fail validation for a sub graph this is not really an error. - // We can just continue to execute on the rest of the graphs - LOGGER.warn("Operation contained invalid arguments for a sub graph, skipped execution on graph: {}", gs.getGraphId()); } - } // Assume no output, we've already checked above diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOutputHandler.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOutputHandler.java index 457377e7e0a..99adb3c5e81 100644 --- a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOutputHandler.java +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOutputHandler.java @@ -20,9 +20,11 @@ import org.slf4j.LoggerFactory; import uk.gov.gchq.gaffer.federated.simple.FederatedStore; +import uk.gov.gchq.gaffer.federated.simple.FederatedUtils; import uk.gov.gchq.gaffer.federated.simple.merge.DefaultResultAccumulator; import uk.gov.gchq.gaffer.federated.simple.merge.FederatedResultAccumulator; import uk.gov.gchq.gaffer.graph.GraphSerialisable; +import uk.gov.gchq.gaffer.operation.OperationChain; import uk.gov.gchq.gaffer.operation.OperationException; import uk.gov.gchq.gaffer.operation.io.Output; import uk.gov.gchq.gaffer.store.Context; @@ -44,8 +46,10 @@ public class FederatedOutputHandler

, O> @Override public O doOperation(final P operation, final Context context, final Store store) throws OperationException { + final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5")); List graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store); + // No-op if (graphsToExecute.isEmpty()) { return null; } @@ -54,18 +58,15 @@ public O doOperation(final P operation, final Context context, final Store store List graphResults = new ArrayList<>(); for (final GraphSerialisable gs : graphsToExecute) { try { - graphResults.add(gs.getGraph().execute(operation, context.getUser())); - } catch (final OperationException | UnsupportedOperationException e) { + OperationChain fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit); + graphResults.add(gs.getGraph().execute(fixedChain, context.getUser())); + } catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) { // Optionally skip this error if user has specified to do so LOGGER.error("Operation failed on graph: {}", gs.getGraphId()); if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) { throw e; } LOGGER.info("Continuing operation execution on sub graphs"); - } catch (final IllegalArgumentException e) { - // An operation may fail validation for a sub graph this is not really an error. - // We can just continue to execute on the rest of the graphs - LOGGER.warn("Operation contained invalid arguments for a sub graph, skipped execution on graph: {}", gs.getGraphId()); } } @@ -80,15 +81,45 @@ public O doOperation(final P operation, final Context context, final Store store combinedProps.putAll(operation.getOptions()); } + // Set up the result accumulator + FederatedResultAccumulator resultAccumulator = getResultAccumulator((FederatedStore) store, operation, graphsToExecute); + + // Should now have a list of objects so need to reduce to just one + return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0)); + } + + + /** + * Sets up a {@link FederatedResultAccumulator} for the specified operation + * and graphs. + * + * @param store The federated store. + * @param operation The original operation. + * @param graphsToExecute The graphs executed on. + * @return A set up accumulator. + */ + protected FederatedResultAccumulator getResultAccumulator(final FederatedStore store, final P operation, final List graphsToExecute) { + // Merge the store props with the operation options for setting up the + // accumulator + Properties combinedProps = store.getProperties().getProperties(); + if (operation.getOptions() != null) { + combinedProps.putAll(operation.getOptions()); + } + // Set up the result accumulator FederatedResultAccumulator resultAccumulator = new DefaultResultAccumulator<>(combinedProps); - resultAccumulator.setSchema(((FederatedStore) store).getSchema(graphsToExecute)); + resultAccumulator.setSchema(store.getSchema(graphsToExecute)); + // Check if user has specified to aggregate if (operation.containsOption(OPT_AGGREGATE_ELEMENTS)) { resultAccumulator.setAggregateElements(Boolean.parseBoolean(operation.getOption(OPT_AGGREGATE_ELEMENTS))); } - // Should now have a list of objects so need to reduce to just one - return graphResults.stream().reduce(resultAccumulator::apply).orElse(graphResults.get(0)); + // Turn aggregation off if there are no shared groups + if (!FederatedUtils.doGraphsShareGroups(graphsToExecute)) { + resultAccumulator.setAggregateElements(false); + } + + return resultAccumulator; } } diff --git a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtilsTest.java b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtilsTest.java new file mode 100644 index 00000000000..b7197b4df25 --- /dev/null +++ b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedUtilsTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package uk.gov.gchq.gaffer.federated.simple; + +import org.junit.jupiter.api.Test; + +import uk.gov.gchq.gaffer.data.elementdefinition.view.View; +import uk.gov.gchq.gaffer.graph.GraphConfig; +import uk.gov.gchq.gaffer.graph.GraphSerialisable; +import uk.gov.gchq.gaffer.operation.Operation; +import uk.gov.gchq.gaffer.operation.OperationChain; +import uk.gov.gchq.gaffer.operation.graph.OperationView; +import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements; +import uk.gov.gchq.gaffer.store.StoreProperties; +import uk.gov.gchq.gaffer.store.schema.Schema; +import uk.gov.gchq.gaffer.store.schema.SchemaEdgeDefinition; +import uk.gov.gchq.gaffer.store.schema.SchemaEntityDefinition; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +class FederatedUtilsTest { + + @Test + void shouldRemoveGroupFromViewIfNotInSchema() { + // Given + String entityInSchema = "entityInSchema"; + String edgeInSchema = "edgeInSchema"; + View testView = new View.Builder() + .entity(entityInSchema) + .entity("entityNotInSchema") + .edge(edgeInSchema) + .edge("edgeNotInSchema") + .build(); + GraphSerialisable graph = new GraphSerialisable( + new GraphConfig("test"), + new Schema.Builder() + .entity(entityInSchema, new SchemaEntityDefinition()) + .edge(edgeInSchema, new SchemaEdgeDefinition()).build(), + new StoreProperties()); + + // When + View fixedView = FederatedUtils.getValidViewForGraph(testView, graph); + + // Then + assertThat(fixedView.getEntityGroups()).containsOnly(entityInSchema); + assertThat(fixedView.getEdgeGroups()).containsOnly(edgeInSchema); + } + + @Test + void shouldPreventExecutionIfNoGroupsInViewAreRelevant() { + // Given + View testView = new View.Builder() + .entity("entityNotInSchema") + .edge("edgeNotInSchema") + .build(); + GraphSerialisable graph = new GraphSerialisable( + new GraphConfig("test"), + new Schema.Builder() + .entity("entityInSchema", new SchemaEntityDefinition()) + .edge("edgeInSchema", new SchemaEdgeDefinition()).build(), + new StoreProperties()); + + // When + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> FederatedUtils.getValidViewForGraph(testView, graph)); + } + + @Test + void shouldChangeViewsOfNestedOperations() { + String entityInSchema = "entityInSchema"; + String edgeInSchema = "edgeInSchema"; + GraphSerialisable graph = new GraphSerialisable( + new GraphConfig("test"), + new Schema.Builder() + .entity(entityInSchema, new SchemaEntityDefinition()) + .edge(edgeInSchema, new SchemaEdgeDefinition()).build(), + new StoreProperties()); + + // View with some mixed groups in + View testView = new View.Builder() + .entity(entityInSchema) + .entity("entityNotInSchema") + .edge(edgeInSchema) + .edge("edgeNotInSchema") + .build(); + + // Build nested operation chain with multiple views + OperationChain nestedViewChain = new OperationChain.Builder() + .first(new GetAllElements.Builder().view(testView).build()) + .then(new OperationChain.Builder() + .first(new GetAllElements.Builder().view(testView).build()) + .then(new OperationChain.Builder() + .first(new GetAllElements.Builder().view(testView).build()) + .build()) + .build()) + .build(); + + // Get a fixed operation chain + List newChain = FederatedUtils.getValidOperationForGraph(nestedViewChain, graph, 0, 5).flatten(); + List fixedOperations = newChain.stream() + .filter(op -> op instanceof OperationView) + .map(op -> (OperationView) op) + .collect(Collectors.toList()); + + // Check all the views only contain relevant groups + fixedOperations.stream() + .map(OperationView::getView) + .forEach(v -> { + assertThat(v.getEntityGroups()).containsOnly(entityInSchema); + assertThat(v.getEdgeGroups()).containsOnly(edgeInSchema); + }); + } + + @Test + void shouldDetectIfGraphsShareGroups() { + // Given + String sharedGroup = "sharedGroup"; + + GraphSerialisable graph1 = new GraphSerialisable( + new GraphConfig("graph1"), + new Schema.Builder().entity(sharedGroup, new SchemaEntityDefinition()).build(), + new StoreProperties()); + GraphSerialisable graph2 = new GraphSerialisable( + new GraphConfig("graph2"), + new Schema.Builder().entity(sharedGroup, new SchemaEntityDefinition()).build(), + new StoreProperties()); + GraphSerialisable graph3 = new GraphSerialisable( + new GraphConfig("graph3"), + new Schema.Builder().entity("notShared", new SchemaEntityDefinition()).build(), + new StoreProperties()); + + // When/Then + assertThat(FederatedUtils.doGraphsShareGroups(Arrays.asList(graph1, graph2))).isTrue(); + assertThat(FederatedUtils.doGraphsShareGroups(Arrays.asList(graph1, graph3))).isFalse(); + } + +}