Skip to content

Commit

Permalink
Merge branch 'develop' into gh-3322-cache-updates-federated-poc
Browse files Browse the repository at this point in the history
  • Loading branch information
p29876 authored Oct 22, 2024
2 parents 63f10b0 + a5a67ac commit 3468700
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@

public final class OtelUtil {

public static final String USER_ATTRIBUTE = "enduser.id";
public static final String JOB_ID_ATTRIBUTE = "gaffer.jobId";
public static final String GRAPH_ID_ATTRIBUTE = "gaffer.graphId";
public static final String VIEW_ATTRIBUTE = "gaffer.view";
public static final String GREMLIN_QUERY_ATTRIBUTE = "gaffer.gremlin.query";

private static boolean openTelemetryActive = false;

private OtelUtil() {
Expand Down
6 changes: 3 additions & 3 deletions core/graph/src/main/java/uk/gov/gchq/gaffer/graph/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ private <O> GraphResult<O> _execute(final StoreExecuter<O> storeExecuter, final
Span span = OtelUtil.startSpan(
this.getClass().getName(),
"Graph Request: " + clonedOpChain.toOverviewString());
span.setAttribute("gaffer.graphId", getGraphId());
span.setAttribute("gaffer.jobId", clonedContext.getJobId());
span.setAttribute("gaffer.user", clonedContext.getUser().getUserId());
span.setAttribute(OtelUtil.GRAPH_ID_ATTRIBUTE, getGraphId());
span.setAttribute(OtelUtil.JOB_ID_ATTRIBUTE, clonedContext.getJobId());
span.setAttribute(OtelUtil.USER_ATTRIBUTE, clonedContext.getUser().getUserId());

O result = null;
// Sets the span to current so parent child spans are auto linked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public OUT doOperation(final OperationChain<OUT> operationChain, final Context c
for (final Operation op : preparedOperationChain.getOperations()) {
// OpenTelemetry hooks
Span span = OtelUtil.startSpan(this.getClass().getName(), op.getClass().getName());
span.setAttribute("jobId", context.getJobId());
span.setAttribute(OtelUtil.JOB_ID_ATTRIBUTE, context.getJobId());
if (op instanceof OperationView && ((OperationView) op).getView() != null) {
span.setAttribute("view", ((OperationView) op).getView().toString());
span.setAttribute(OtelUtil.VIEW_ATTRIBUTE, ((OperationView) op).getView().toString());
}

// Sets the span to current so parent child spans are auto linked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.element.id.EntityId;
import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphConfig;
Expand All @@ -51,6 +52,7 @@
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.operation.io.Input;
import uk.gov.gchq.gaffer.store.operation.GetSchema;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferEdgeGenerator;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferEntityGenerator;
Expand Down Expand Up @@ -80,6 +82,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -540,7 +543,6 @@ public Iterator<Vertex> adjVerticesWithView(final Object vertexId, final Directi
* Given an iterable of vertex ids, adjacent vertices will be returned.
* If you provide any optional labels then you must provide edge labels and the vertex
* labels - any missing labels will cause the elements to be filtered out.
* This method will not return 'id' vertices, only vertices that exist as entities in Gaffer.
*
* @param vertexIds the iterable of vertex ids to start at.
* @param direction the direction along edges to travel
Expand Down Expand Up @@ -830,16 +832,21 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
throw new UnsupportedOperationException("There could be a lot of vertices, so please add some seeds");
}

final Iterable<? extends Element> result = execute(new OperationChain.Builder()
final Iterable<? extends EntityId> getAdjEntitySeeds = execute(new OperationChain.Builder()
.first(new GetAdjacentIds.Builder()
.input(seeds)
.view(view)
.inOutType(getInOutType(direction))
.build())
// GetAdjacentIds provides list of entity seeds so run a GetElements to get the actual Entities
.then(new GetElements.Builder()
.view(createAllEntitiesView())
.build())
.input(seeds)
.view(view)
.inOutType(getInOutType(direction))
.build())
.build());

List<EntityId> seedList = StreamSupport.stream(getAdjEntitySeeds.spliterator(), false).collect(Collectors.toList());

// GetAdjacentIds provides list of entity seeds so run a GetElements to get the actual Entities
final Iterable<? extends Element> result = execute(new OperationChain.Builder()
.first(new GetElements.Builder()
.input(seedList)
.build())
.build());

// Translate results to Gafferpop elements
Expand All @@ -850,7 +857,13 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
.map(e -> (Vertex) e)
.iterator();

return translatedResults.iterator();
// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
Iterable<Vertex> chainedIterable = translatedResults;
for (final EntityId seed : seedList) {
Iterable<Vertex> orphanVertices = GafferVertexUtils.getOrphanVertices(result, this, seed.getVertex());
chainedIterable = IterableUtils.chainedIterable(chainedIterable, orphanVertices);
}
return chainedIterable.iterator();
}

private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand Down Expand Up @@ -922,7 +935,12 @@ private View createView(final String... labels) {
View view = null;
if (null != labels && 0 < labels.length) {
final View.Builder viewBuilder = new View.Builder();
final Schema schema = graph.getSchema();
final Schema schema = execute(new OperationChain.Builder()
.first(new GetSchema.Builder()
.compact(true)
.build())
.options(opOptions)
.build());
for (final String label : labels) {
if (schema.isEntity(label)) {
viewBuilder.entity(label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

import org.apache.tinkerpop.gremlin.structure.Vertex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.data.element.Edge;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.element.Entity;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopVertex;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class GafferVertexUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(GafferVertexUtils.class);

private GafferVertexUtils() {
// Utility class
Expand All @@ -58,6 +62,7 @@ public static Iterable<Vertex> getOrphanVertices(final Iterable<? extends Elemen
.noneMatch(e -> e.equals(id)))
.collect(Collectors.toList());

orphanVertexIds.forEach(id -> LOGGER.debug("Getting orphan vertices for vertex {}", id));
return (orphanVertexIds.isEmpty()) ? Collections.emptyList() : extractOrphanVerticesFromEdges(result, graph, orphanVertexIds);
}

Expand All @@ -71,19 +76,18 @@ public static Iterable<Vertex> getOrphanVertices(final Iterable<? extends Elemen
* @return Iterable of 'orphan' {@link Vertex}'s
*/
private static Iterable<Vertex> extractOrphanVerticesFromEdges(final Iterable<? extends Element> result, final GafferPopGraph graph, final List<Object> orphanVertexIds) {
return StreamSupport.stream(result.spliterator(), false)
List<Vertex> orphanVertices = new ArrayList<>();
StreamSupport.stream(result.spliterator(), false)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.map(e -> {
if (orphanVertexIds.contains(e.getSource())) {
return new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getSource()), graph);
} else if (orphanVertexIds.contains(e.getDestination())) {
return new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getDestination()), graph);
} else {
return null;
.forEach(e -> {
if (orphanVertexIds.contains(e.getSource()) || orphanVertexIds.equals(e.getSource())) {
orphanVertices.add(new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getSource()), graph));
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (orphanVertexIds.contains(e.getDestination()) || orphanVertexIds.equals(e.getDestination())) {
orphanVertices.add(new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getDestination()), graph));
}
});
return orphanVertices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
*/
class GafferPopGraphIT {
private static final String TEST_NAME_FORMAT = "({0}) {displayName}";
private static final String VERTEX_ONLY_ID_STRING = "7";
private static GafferPopGraph mapStore;
private static GafferPopGraph accumuloStore;

Expand Down Expand Up @@ -303,28 +304,45 @@ void shouldAddE(String graph, GraphTraversalSource g) {
@MethodSource("provideTraversals")
void shouldSeedWithVertexOnlyEdge(String graph, GraphTraversalSource g) {
// Edge has a vertex but not an entity in the graph - Gaffer only feature
String vertexOnlyId = "7";
mapStore.addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), vertexOnlyId, mapStore));
accumuloStore.addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), vertexOnlyId, accumuloStore));
// [1 - knows -> 7]
mapStore.addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), VERTEX_ONLY_ID_STRING, mapStore));
accumuloStore.addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), VERTEX_ONLY_ID_STRING, accumuloStore));

List<Vertex> result = g.V("7").toList();
List<Vertex> result = g.V(VERTEX_ONLY_ID_STRING).toList();
assertThat(result)
.extracting(r -> r.id())
.contains(vertexOnlyId);
.contains(VERTEX_ONLY_ID_STRING);
reset();
}

@ParameterizedTest(name = TEST_NAME_FORMAT)
@MethodSource("provideTraversals")
void shouldTraverseEdgeWithVertexOnlySeed(String graph, GraphTraversalSource g) {
// Edge has a vertex but not an entity in the graph - Gaffer only feature
String vertexOnlyId = "7";
mapStore.addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), vertexOnlyId, mapStore));
accumuloStore.addEdge(new GafferPopEdge("knows", GafferPopModernTestUtils.MARKO.getId(), vertexOnlyId, accumuloStore));
void shouldTraverseEdgeWithVertexOnlyEdge(String graph, GraphTraversalSource g) {
// Edge has a two vertices with no entities in the graph - Gaffer only feature
// [8 - knows -> 7]
mapStore.addEdge(new GafferPopEdge("knows", "8", VERTEX_ONLY_ID_STRING, mapStore));
accumuloStore.addEdge(new GafferPopEdge("knows", "8", VERTEX_ONLY_ID_STRING, accumuloStore));

List<Map<Object, Object>> result = g.V("7").inE().outV().elementMap().toList();
List<Vertex> result = g.V(VERTEX_ONLY_ID_STRING).inE().inV().toList();
assertThat(result)
.containsExactly(MARKO.getPropertyMap());
.extracting(r -> r.id())
.contains(VERTEX_ONLY_ID_STRING);
List<Vertex> result2 = g.V(VERTEX_ONLY_ID_STRING).inE().outV().toList();
assertThat(result2)
.extracting(r -> r.id())
.contains("8");
List<Vertex> result3 = g.V("8").outE().inV().toList();
assertThat(result3)
.extracting(r -> r.id())
.contains(VERTEX_ONLY_ID_STRING);
List<Vertex> result4 = g.V("8").outE().outV().toList();
assertThat(result4)
.extracting(r -> r.id())
.contains("8");
List<Vertex> resultLabel = g.V("8").out("knows").toList();
assertThat(resultLabel)
.extracting(r -> r.id())
.containsOnly(VERTEX_ONLY_ID_STRING);
reset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import uk.gov.gchq.gaffer.rest.factory.spring.AbstractUserFactory;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraphVariables;
import uk.gov.gchq.gaffer.user.User;
import uk.gov.gchq.koryphe.tuple.n.Tuple2;

import java.io.IOException;
Expand Down Expand Up @@ -309,7 +310,17 @@ private Tuple2<Object, JSONObject> runGremlinQuery(final String gremlinQuery) {
// OpenTelemetry hooks
Span span = OtelUtil.startSpan(
this.getClass().getName(), "Gremlin Request: " + UUID.nameUUIDFromBytes(gremlinQuery.getBytes(StandardCharsets.UTF_8)));
span.setAttribute("gaffer.gremlin.query", gremlinQuery);
span.setAttribute(OtelUtil.GREMLIN_QUERY_ATTRIBUTE, gremlinQuery);

User user = ((GafferPopGraphVariables) gafferPopGraph.variables()).getUser();
String userId;
if (user != null) {
userId = user.getUserId();
} else {
LOGGER.warn("Could not find Gaffer user for OTEL. Using default.");
userId = "unknownGremlinUser";
}
span.setAttribute(OtelUtil.USER_ATTRIBUTE, userId);

// tuple to hold the result and explain
Tuple2<Object, JSONObject> pair = new Tuple2<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import uk.gov.gchq.gaffer.rest.factory.spring.AbstractUserFactory;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraphVariables;
import uk.gov.gchq.gaffer.user.User;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -145,14 +146,16 @@ private ResponseMessage handleGremlinRequest(final WebSocketSession session, fin

// OpenTelemetry hooks
Span span = OtelUtil.startSpan(this.getClass().getName(), "Gremlin Request: " + requestId.toString());
span.setAttribute("gaffer.gremlin.query", request.getArgs().get(Tokens.ARGS_GREMLIN).toString());
span.setAttribute(OtelUtil.GREMLIN_QUERY_ATTRIBUTE, request.getArgs().get(Tokens.ARGS_GREMLIN).toString());

// Execute the query
try (Scope scope = span.makeCurrent();
GremlinExecutor gremlinExecutor = getGremlinExecutor()) {
// Set current headers for potential authorisation then set the user
userFactory.setHttpHeaders(session.getHandshakeHeaders());
graph.variables().set(GafferPopGraphVariables.USER, userFactory.createUser());
User user = userFactory.createUser();
graph.variables().set(GafferPopGraphVariables.USER, user);
span.setAttribute(OtelUtil.USER_ATTRIBUTE, user.getUserId());

// Run the query using the gremlin executor service
Object result = gremlinExecutor.eval(
Expand Down

0 comments on commit 3468700

Please sign in to comment.