Skip to content

Commit

Permalink
Gh-3334: Fix Gremlin Reusing Accumulo Iterators (#3335)
Browse files Browse the repository at this point in the history
* move to set where applicable

* update test

* Fix gremlin select step

* remove  deprecated parameter

* use safe add of labels
  • Loading branch information
tb06904 authored Nov 21, 2024
1 parent 960c599 commit 3b93f97
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ public void addEdge(final GafferPopEdge edge) {
public Iterator<Vertex> vertices(final Object... vertexIds) {
final boolean getAll = null == vertexIds || 0 == vertexIds.length;
final OperationChain<Iterable<? extends Element>> getOperation;
final Iterable<Vertex> orphanVertices;

LOGGER.debug(GET_DEBUG_MSG, variables.getElementsLimit());
if (getAll) {
Expand All @@ -430,29 +429,28 @@ public Iterator<Vertex> vertices(final Object... vertexIds) {
.then(new Limit<Element>(variables.getElementsLimit(), true))
.build();
}
// Run requested chain on the graph
final Iterable<? extends Element> result = execute(getOperation);
// Run requested chain on the graph and buffer result to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Vertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Vertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

if (IterableUtils.size(translatedResults) >= variables.getElementsLimit()) {
if (translatedResults.size() >= variables.getElementsLimit()) {
LOGGER.warn(
"Result size is greater than or equal to configured limit ({}). Results may have been truncated",
variables.getElementsLimit());
}

// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
orphanVertices = GafferVertexUtils.getOrphanVertices(result, this, vertexIds);
Iterable<Vertex> chainedIterable = IterableUtils.chainedIterable(translatedResults, orphanVertices);
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, vertexIds));

return chainedIterable.iterator();
return translatedResults.iterator();
}

/**
Expand Down Expand Up @@ -597,19 +595,19 @@ public Iterator<Edge> edges(final Object... elementIds) {
.build();
}

// Run requested chain on the graph
final Iterable<? extends Element> result = execute(getOperation);
// Run requested chain on the graph and buffer to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Edge> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Edge> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

if (IterableUtils.size(translatedResults) >= variables.getElementsLimit()) {
if (translatedResults.size() >= variables.getElementsLimit()) {
LOGGER.warn(
"Result size is greater than or equal to configured limit ({}). Results may have been truncated",
variables.getElementsLimit());
Expand Down Expand Up @@ -811,20 +809,19 @@ private Iterator<GafferPopVertex> verticesWithSeedsAndView(final List<ElementSee
}
}

// Run operation on graph
final Iterable<? extends Element> result = execute(getOperation);
// Run operation on graph buffer to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<GafferPopVertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<GafferPopVertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(GafferPopVertex.class::isInstance)
.map(e -> (GafferPopVertex) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

return translatedResults.iterator();

}

private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand All @@ -840,30 +837,29 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
.build())
.build());

List<EntityId> seedList = StreamSupport.stream(getAdjEntitySeeds.spliterator(), false).collect(Collectors.toList());
List<? extends EntityId> seedList = IterableUtils.toList(getAdjEntitySeeds);

// GetAdjacentIds provides list of entity seeds so run a GetElements to get the actual Entities
final Iterable<? extends Element> result = execute(new OperationChain.Builder()
.first(new GetElements.Builder()
.input(seedList)
.build())
.build());
final Set<Element> result = new HashSet<>(IterableUtils.toList(
execute(new OperationChain.Builder()
.first(new GetElements.Builder()
.input(seedList)
.build())
.build())));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Vertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Vertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.iterator();
.collect(Collectors.toSet());

// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
Iterable<Vertex> chainedIterable = translatedResults;
for (final EntityId seed : seedList) {
Iterable<Vertex> orphanVertices = GafferVertexUtils.getOrphanVertices(result, this, seed.getVertex());
chainedIterable = IterableUtils.chainedIterable(chainedIterable, orphanVertices);
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, seed.getVertex()));
}
return chainedIterable.iterator();
return translatedResults.iterator();
}

private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand Down Expand Up @@ -900,16 +896,16 @@ private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, fina
}

// Run requested chain on the graph
final Iterable<? extends Element> result = execute(getOperation);
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this, true);
final Iterable<Edge> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Edge> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

return translatedResults.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class GafferPopHasStep<S extends Element> extends HasStep<S> {
public GafferPopHasStep(final HasStep<S> originalHasStep) {
super(originalHasStep.getTraversal());
LOGGER.debug("Running custom HasStep on GafferPopGraph");

originalHasStep.getLabels().forEach(this::addLabel);
originalHasStep.getHasContainers().forEach(this::addHasContainer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
* (GafferPop) g.V().out() = [v2, v3, v4, v5]
* </pre>
*/
public class GafferPopVertexStep<E extends Element> extends FlatMapStep<Iterable<Vertex>, E>
public class GafferPopVertexStep<E extends Element> extends FlatMapStep<List<Vertex>, E>
implements AutoCloseable, Configuring {
private static final Logger LOGGER = LoggerFactory.getLogger(GafferPopVertexStep.class);

Expand All @@ -77,6 +77,7 @@ public GafferPopVertexStep(final VertexStep<E> originalVertexStep) {
this.edgeLabels = originalVertexStep.getEdgeLabels();
this.returnClass = originalVertexStep.getReturnClass();
this.traversal = originalVertexStep.getTraversal();
originalVertexStep.getLabels().forEach(this::addLabel);
}

@Override
Expand All @@ -90,7 +91,7 @@ public void configure(final Object... keyValues) {
}

@Override
protected Iterator<E> flatMap(final Traverser.Admin<Iterable<Vertex>> traverser) {
protected Iterator<E> flatMap(final Traverser.Admin<List<Vertex>> traverser) {
return Vertex.class.isAssignableFrom(returnClass) ?
(Iterator<E>) this.vertices(traverser.get()) :
(Iterator<E>) this.edges(traverser.get());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.tinkerpop.gremlin.process.traversal.Traversal.Admin;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
Expand All @@ -27,7 +28,8 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.tinkerpop.process.traversal.step.GafferPopVertexStep;
import uk.gov.gchq.gaffer.tinkerpop.process.traversal.step.LazyFoldStep;

import java.util.List;

/**
* Optimisation strategy to reduce the number of Gaffer operations performed.
Expand All @@ -49,14 +51,13 @@ public void apply(final Admin<?, ?> traversal) {
LOGGER.debug("Inserting FoldStep and replacing VertexStep");

// Replace vertex step
final GafferPopVertexStep<? extends Element> listVertexStep = new GafferPopVertexStep<>(
originalVertexStep);
final GafferPopVertexStep<? extends Element> listVertexStep = new GafferPopVertexStep<>(originalVertexStep);
TraversalHelper.replaceStep(originalVertexStep, listVertexStep, traversal);

// Add in a fold step before the new VertexStep so that the input is the list of
// all vertices
LazyFoldStep<Vertex> lazyFoldStep = new LazyFoldStep<>(originalVertexStep.getTraversal());
TraversalHelper.insertBeforeStep(lazyFoldStep, listVertexStep, traversal);
FoldStep<Vertex, List<Vertex>> foldStep = new FoldStep<>(originalVertexStep.getTraversal());
TraversalHelper.insertBeforeStep(foldStep, listVertexStep, traversal);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopVertex;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -42,18 +42,19 @@ private GafferVertexUtils() {
}

/**
* Util method to extract vertices that are vertices on an edge but do not have an
* Util method to extract vertices that are vertices on an edge but do not have
* an
* associated {@link Vertex} or {@link Entity} the current graph.
* These vertices exist only on an edge.
*
*
* @param result The results from a Gaffer query
* @param graph The GafferPop graph being queried
* @param result The results from a Gaffer query
* @param graph The GafferPop graph being queried
* @param vertexIds The vertexIds that have been used as seeds in the query
* @return Iterable of 'orphan' {@link Vertex}'s
* @return {@link Collection} of 'orphan' {@link Vertex}'s
*/

public static Iterable<Vertex> getOrphanVertices(final Iterable<? extends Element> result, final GafferPopGraph graph, final Object... vertexIds) {
public static Collection<Vertex> getOrphanVertices(final Iterable<? extends Element> result, final GafferPopGraph graph, final Object... vertexIds) {
// Check for any vertex ID seeds that are not returned as Entities
List<Object> orphanVertexIds = Arrays.stream(vertexIds)
.filter(id -> StreamSupport.stream(result.spliterator(), false)
Expand All @@ -73,21 +74,23 @@ public static Iterable<Vertex> getOrphanVertices(final Iterable<? extends Elemen
* @param result The results of a Gaffer query
* @param graph The GafferPop graph being queried
* @param orphanVertexIds Any seeds that were not found to have an entity
* @return Iterable of 'orphan' {@link Vertex}'s
* @return {@link Collection} of 'orphan' {@link Vertex}'s
*/
private static Iterable<Vertex> extractOrphanVerticesFromEdges(final Iterable<? extends Element> result, final GafferPopGraph graph, final List<Object> orphanVertexIds) {
List<Vertex> orphanVertices = new ArrayList<>();
StreamSupport.stream(result.spliterator(), false)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.forEach(e -> {
if (orphanVertexIds.contains(e.getSource()) || orphanVertexIds.equals(e.getSource())) {
orphanVertices.add(new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getSource()), graph));
}
if (orphanVertexIds.contains(e.getDestination()) || orphanVertexIds.equals(e.getDestination())) {
orphanVertices.add(new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getDestination()), graph));
}
});
return orphanVertices;
private static Collection<Vertex> extractOrphanVerticesFromEdges(final Iterable<? extends Element> result, final GafferPopGraph graph, final List<Object> orphanVertexIds) {
return StreamSupport.stream(result.spliterator(), false)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.map(e -> {
if (orphanVertexIds.contains(e.getSource()) || orphanVertexIds.equals(e.getSource())) {
return new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getSource()), graph);
}
if (orphanVertexIds.contains(e.getDestination()) || orphanVertexIds.equals(e.getDestination())) {
return new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getDestination()), graph);
}
return e;
})
.filter(Vertex.class::isInstance)
.map(v -> (Vertex) v)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ void shouldGetAllEdges() {
final Iterator<Edge> edges = graph.edges();

// Then
assertThat(edges).toIterable().containsExactly(createdEdge, knowsEdge);
assertThat(edges).toIterable().containsExactlyInAnyOrder(createdEdge, knowsEdge);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GafferPopGraphStepStrategyCypherIT {

@BeforeAll
public static void beforeAll() {
GafferPopGraph gafferPopGraph = GafferPopModernTestUtils.createModernGraph(GafferPopGraphStepStrategyCypherIT.class, StoreType.MAP);
GafferPopGraph gafferPopGraph = GafferPopModernTestUtils.createModernGraph(GafferPopGraphStepStrategyCypherIT.class, StoreType.ACCUMULO);
g = gafferPopGraph.traversal();
}

Expand Down
Loading

0 comments on commit 3b93f97

Please sign in to comment.