Skip to content

Commit

Permalink
Merge branch 'develop' into gh-3342-handle-different-groups-in-view-f…
Browse files Browse the repository at this point in the history
…ederated-poc
  • Loading branch information
tb06904 authored Dec 4, 2024
2 parents b2ccc09 + 7726ff3 commit 5417985
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.operation.io.Input;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.operation.GetSchema;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferEdgeGenerator;
Expand Down Expand Up @@ -86,6 +87,10 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static uk.gov.gchq.gaffer.tinkerpop.GafferPopGraphVariables.DEFAULT_GET_ELEMENTS_LIMIT;
import static uk.gov.gchq.gaffer.tinkerpop.GafferPopGraphVariables.DEFAULT_HAS_STEP_FILTER_STAGE;


/**
* A <code>GafferPopGraph</code> is an implementation of
* {@link org.apache.tinkerpop.gremlin.structure.Graph}.
Expand Down Expand Up @@ -187,29 +192,25 @@ public class GafferPopGraph implements org.apache.tinkerpop.gremlin.structure.Gr
*/
public static final String GET_ELEMENTS_LIMIT = "gaffer.elements.getlimit";

/**
* Default value for the max number of elements returned by getElements
*/
public static final int DEFAULT_GET_ELEMENTS_LIMIT = 5000;

/**
* Configuration key for when to apply HasStep filtering
*/
public static final String HAS_STEP_FILTER_STAGE = "gaffer.elements.hasstepfilterstage";

public enum HasStepFilterStage {
PRE_AGGREGATION,
POST_AGGREGATION,
POST_TRANSFORM
}

/**
* Default to pre-aggregation filtering for HasStep predicates
* Configuration key to set if orphaned vertices (e.g. vertices without an entity)
* should be included in the result by default
*/
public static final HasStepFilterStage DEFAULT_HAS_STEP_FILTER_STAGE = HasStepFilterStage.PRE_AGGREGATION;
public static final String INCLUDE_ORPHANED_VERTICES = "gaffer.includeOrphanedVertices";

/**
* Set default user ID to use if not set by the user factory.
*/
public static final String USER_ID = "gaffer.userId";

/**
* Set default data auths if not set by the user factory.
*/
public static final String DATA_AUTHS = "gaffer.dataAuths";

/**
Expand Down Expand Up @@ -410,45 +411,54 @@ public void addEdge(final GafferPopEdge edge) {
*/
@Override
public Iterator<Vertex> vertices(final Object... vertexIds) {
LOGGER.debug(GET_DEBUG_MSG, variables.getElementsLimit());
final boolean getAll = null == vertexIds || 0 == vertexIds.length;
final OperationChain<Iterable<? extends Element>> getOperation;
final Output<Iterable<? extends Element>> getOperation;

LOGGER.debug(GET_DEBUG_MSG, variables.getElementsLimit());
if (getAll) {
getOperation = new Builder()
.first(new GetAllElements.Builder()
.view(createAllEntitiesView())
.build())
.then(new Limit<Element>(variables.getElementsLimit(), true))
.build();
final GetAllElements.Builder builder = new GetAllElements.Builder();
// If we are not including orphans then apply the all entities view
if (!variables.getIncludeOrphanedVertices()) {
builder.view(createAllEntitiesView());
}
getOperation = builder.build();

} else {
getOperation = new Builder()
.first(new GetElements.Builder()
.input(getElementSeeds(Arrays.asList(vertexIds)))
.build())
.then(new Limit<Element>(variables.getElementsLimit(), true))
.build();
final GetElements.Builder builder = new GetElements.Builder()
.input(getElementSeeds(Arrays.asList(vertexIds)));
// If we are not including orphans then apply the all entities view
if (!variables.getIncludeOrphanedVertices()) {
builder.view(createAllEntitiesView());
}
getOperation = builder.build();
}

// Run requested chain on the graph and buffer result to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));
final OperationChain<Iterable<? extends Element>> chain = new Builder()
.first(getOperation)
.then(new Limit<>(variables.getElementsLimit(), true))
.build();
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(chain)));

// Warn of truncation
if (result.size() >= variables.getElementsLimit()) {
LOGGER.warn(
"Result size is greater than or equal to configured limit ({}). Results may have been truncated",
variables.getElementsLimit());
}

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Set<Vertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.limit(variables.getElementsLimit())
.collect(Collectors.toSet());

if (translatedResults.size() >= variables.getElementsLimit()) {
LOGGER.warn(
"Result size is greater than or equal to configured limit ({}). Results may have been truncated",
variables.getElementsLimit());
}

// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, vertexIds));
if (variables.getIncludeOrphanedVertices()) {
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, vertexIds));
}

return translatedResults.iterator();
}
Expand Down Expand Up @@ -756,6 +766,8 @@ public void setDefaultVariables(final GafferPopGraphVariables variables) {
configuration().getInteger(GET_ELEMENTS_LIMIT, DEFAULT_GET_ELEMENTS_LIMIT));
variables.set(GafferPopGraphVariables.HAS_STEP_FILTER_STAGE,
configuration().getString(HAS_STEP_FILTER_STAGE, DEFAULT_HAS_STEP_FILTER_STAGE.toString()));
variables.set(GafferPopGraphVariables.INCLUDE_ORPHANED_VERTICES,
configuration().getBoolean(INCLUDE_ORPHANED_VERTICES, false));
variables.set(GafferPopGraphVariables.LAST_OPERATION_CHAIN, new OperationChain<Object>());
}

Expand All @@ -782,24 +794,15 @@ private Iterator<GafferPopVertex> verticesWithSeedsAndView(final List<ElementSee
.build();
}

final OperationChain<Iterable<? extends Element>> getOperation;
final Output<Iterable<? extends Element>> getOperation;
LOGGER.debug(GET_DEBUG_MSG, variables.getElementsLimit());
if (getAll) {
getOperation = new Builder()
.first(new GetAllElements.Builder()
.view(entitiesView)
.build())
.then(new Limit<>(variables.getElementsLimit(), true))
.build();
getOperation = new GetAllElements.Builder().view(entitiesView).build();
} else {
getOperation = new Builder()
.first(new GetElements.Builder()
.input(seeds)
.view(entitiesView)
.build())
.then(new Limit<>(variables.getElementsLimit(), true))
.build();

getOperation = new GetElements.Builder()
.input(seeds)
.view(entitiesView)
.build();
if (null == entitiesView || entitiesView.getEntityGroups().contains(ID_LABEL)) {
seeds.forEach(seed -> {
if (seed instanceof EntitySeed) {
Expand All @@ -810,15 +813,18 @@ private Iterator<GafferPopVertex> verticesWithSeedsAndView(final List<ElementSee
}

// Run operation on graph buffer to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));
final OperationChain<Iterable<? extends Element>> chain = new Builder()
.first(getOperation)
.then(new Limit<>(variables.getElementsLimit(), true))
.build();
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(chain)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Set<GafferPopVertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(GafferPopVertex.class::isInstance)
.map(e -> (GafferPopVertex) e)
.limit(variables.getElementsLimit())
.collect(Collectors.toSet());

return translatedResults.iterator();
Expand All @@ -829,7 +835,8 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
throw new UnsupportedOperationException("There could be a lot of vertices, so please add some seeds");
}

final Iterable<? extends EntityId> getAdjEntitySeeds = execute(new OperationChain.Builder()
final Iterable<? extends EntityId> getAdjEntitySeeds = execute(
new OperationChain.Builder()
.first(new GetAdjacentIds.Builder()
.input(seeds)
.view(view)
Expand All @@ -839,13 +846,15 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see

List<? extends EntityId> seedList = IterableUtils.toList(getAdjEntitySeeds);

final GetElements.Builder builder = new GetElements.Builder().input(seedList);
// If we are not including orphans then apply the all entities view
if (!variables.getIncludeOrphanedVertices()) {
builder.view(createAllEntitiesView());
}

// GetAdjacentIds provides list of entity seeds so run a GetElements to get the actual Entities
final Set<Element> result = new HashSet<>(IterableUtils.toList(
execute(new OperationChain.Builder()
.first(new GetElements.Builder()
.input(seedList)
.build())
.build())));
execute(new OperationChain.Builder().first(builder.build()).build())));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
Expand All @@ -856,9 +865,11 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
.collect(Collectors.toSet());

// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
for (final EntityId seed : seedList) {
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, seed.getVertex()));
if (variables.getIncludeOrphanedVertices()) {
translatedResults.addAll(
GafferVertexUtils.getOrphanVertices(result, this, seedList.stream().map(EntityId::getVertex).toArray(Object[]::new)));
}

return translatedResults.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class GafferPopGraphVariables implements Graph.Variables {
private static final Logger LOGGER = LoggerFactory.getLogger(GafferPopGraphVariables.class);
private static final String VAR_UPDATE_ERROR_STRING = "Ignoring update variable: {}, incorrect value type: {}";

// KEYS

/**
* Variable key for the {@link Map} of Gaffer operation options.
*/
Expand All @@ -54,7 +56,8 @@ public final class GafferPopGraphVariables implements Graph.Variables {
public static final String USER = "user";

/**
* The max number of elements that can be returned by GetElements
* The max number of elements that can be returned by a single GetElements
* or GetAllElements
*/
public static final String GET_ELEMENTS_LIMIT = "getElementsLimit";

Expand All @@ -73,6 +76,29 @@ public final class GafferPopGraphVariables implements Graph.Variables {
*/
public static final String LAST_OPERATION_CHAIN = "lastOperation";

/**
* The key to set if orphaned vertices (e.g. vertices without an entity)
* should be included in the result
*/
public static final String INCLUDE_ORPHANED_VERTICES = "includeOrphanedVertices";

// DEFAULTS

/**
* Default value for the max number of elements returned by getElements
*/
public static final int DEFAULT_GET_ELEMENTS_LIMIT = 20000;

/**
* Default to pre-aggregation filtering for HasStep predicates
*/
public static final HasStepFilterStage DEFAULT_HAS_STEP_FILTER_STAGE = HasStepFilterStage.PRE_AGGREGATION;

public enum HasStepFilterStage {
PRE_AGGREGATION,
POST_AGGREGATION,
POST_TRANSFORM
}

private final Map<String, Object> variables;

Expand Down Expand Up @@ -121,6 +147,10 @@ public void set(final String key, final Object value) {
}
break;

case INCLUDE_ORPHANED_VERTICES:
variables.put(key, Boolean.valueOf(value.toString()));
break;

default:
variables.put(key, value);
break;
Expand Down Expand Up @@ -169,6 +199,10 @@ public OperationChain<?> getLastOperationChain() {
return (OperationChain) variables.get(LAST_OPERATION_CHAIN);
}

public boolean getIncludeOrphanedVertices() {
return Boolean.parseBoolean(variables.get(INCLUDE_ORPHANED_VERTICES).toString());
}

public String toString() {
return StringFactory.graphVariablesString(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.data.elementdefinition.view.ViewElementDefinition;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph.HasStepFilterStage;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraphVariables;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraphVariables.HasStepFilterStage;
import uk.gov.gchq.gaffer.tinkerpop.process.traversal.step.util.GafferPopHasContainer;
import uk.gov.gchq.koryphe.impl.predicate.Exists;

Expand Down Expand Up @@ -248,8 +248,8 @@ private ViewElementDefinition createElementDefFromPredicates(final String filter
hasStepFilterStage = HasStepFilterStage.valueOf(filterStage);
} catch (final IllegalArgumentException e) {
LOGGER.warn("Unknown hasStepFilterStage: {}. Defaulting to {}",
filterStage, GafferPopGraph.DEFAULT_HAS_STEP_FILTER_STAGE);
hasStepFilterStage = GafferPopGraph.DEFAULT_HAS_STEP_FILTER_STAGE;
filterStage, GafferPopGraphVariables.DEFAULT_HAS_STEP_FILTER_STAGE);
hasStepFilterStage = GafferPopGraphVariables.DEFAULT_HAS_STEP_FILTER_STAGE;
}

switch (hasStepFilterStage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ void shouldSeedWithVertexOnlyEdge() throws OperationException {
// Edge has a vertex but not an entity in the graph - Gaffer only feature
getGraph().addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), "7", getGraph()));

List<Vertex> result = g.V("7").toList();
// Need to enable orphaned vertices on the query
List<Vertex> result = g.with(GafferPopGraphVariables.INCLUDE_ORPHANED_VERTICES, "true").V("7").toList();
assertThat(result)
.extracting(r -> r.id())
.contains("7");
Expand All @@ -305,9 +306,12 @@ void shouldTraverseEdgeWithVertexOnlySeed() throws OperationException {
// Edge has a vertex but not an entity in the graph - Gaffer only feature
getGraph().addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), "7", getGraph()));

List<Map<Object, Object>> result = g.V("7").inE().outV().elementMap().toList();
assertThat(result)
.containsExactly(MARKO.getPropertyMap());
// Need to enable orphaned vertices on the query
List<Map<Object, Object>> result = g.with(GafferPopGraphVariables.INCLUDE_ORPHANED_VERTICES, "true")
.V("7").inE().outV().elementMap().toList();

assertThat(result).containsExactly(MARKO.getPropertyMap());

reset();
}

Expand Down
Loading

0 comments on commit 5417985

Please sign in to comment.