Skip to content

Commit

Permalink
move back to serialisables
Browse files Browse the repository at this point in the history
  • Loading branch information
tb06904 committed Jan 23, 2025
1 parent 5492d81 commit 338fe55
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 252 deletions.
9 changes: 0 additions & 9 deletions core/graph/src/main/java/uk/gov/gchq/gaffer/graph/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,6 @@ public List<Class <? extends GraphHook>> getGraphHooks() {
return config.getHooks().stream().map(GraphHook::getClass).collect(Collectors.toList());
}

/**
* Returns the {@link Store} for this graph.
*
* @return The store
*/
public Store getStore() {
return store;
}

/**
* @return a collection of all the supported {@link Operation}s.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.ChangeGraphAccessHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.ChangeGraphIdHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.RemoveGraphHandler;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.graph.hook.GraphHook;
import uk.gov.gchq.gaffer.graph.hook.NamedOperationResolver;
import uk.gov.gchq.gaffer.graph.hook.NamedViewResolver;
import uk.gov.gchq.gaffer.named.operation.AddNamedOperation;
import uk.gov.gchq.gaffer.named.operation.DeleteNamedOperation;
import uk.gov.gchq.gaffer.named.operation.GetAllNamedOperations;
Expand Down Expand Up @@ -134,7 +130,7 @@ public class FederatedStore extends Store {

// Quick access in memory cache of graphs added to this instance whist it was running.
// Will use this first then resort to the full graph cache if not found.
private final Map<String, Pair<Graph, GraphAccess>> quickAccessGraphs = new HashMap<>();
private final Map<String, Pair<GraphSerialisable, GraphAccess>> quickAccessGraphs = new HashMap<>();

// Gaffer cache of available graphs
private Cache<String, Pair<GraphSerialisable, GraphAccess>> graphCache;
Expand All @@ -160,21 +156,20 @@ public class FederatedStore extends Store {
*
* @throws IllegalArgumentException If there is already a graph with the supplied ID
*/
public void addGraph(final Graph graph, final GraphAccess graphAccess) {
public void addGraph(final GraphSerialisable graph, final GraphAccess graphAccess) {
String graphId = graph.getGraphId();
LOGGER.debug("Adding Graph: '{}'", graphId);
// Pair the graph with its access
Pair<Graph, GraphAccess> graphAndAccessPair = new ImmutablePair<>(graph, graphAccess);
Pair<GraphSerialisable, GraphAccess> gsAndAccessPair = new ImmutablePair<>(new GraphSerialisable(graph), graphAccess);
Pair<GraphSerialisable, GraphAccess> graphAndAccessPair = new ImmutablePair<>(graph, graphAccess);
try {
// Add safely to the cache
graphCache.getCache().putSafe(graphId, gsAndAccessPair);
graphCache.getCache().putSafe(graphId, graphAndAccessPair);
// Add to quick access
if (quickAccessGraphs.containsKey(graphId)) {
throw new OverwritingException("Graph with ID: '" + graphId + "' already exists");
} else {
quickAccessGraphs.put(graphId, graphAndAccessPair);
}

quickAccessGraphs.put(graphId, graphAndAccessPair);
} catch (final CacheOperationException e) {
// Unknown issue adding to cache
throw new GafferRuntimeException(e.getMessage(), e);
Expand Down Expand Up @@ -211,7 +206,7 @@ public void removeGraph(final String graphId) {
* @throws CacheOperationException If issue getting from cache.
* @throws IllegalArgumentException If graph not found.
*/
public Graph getGraph(final String graphId) throws CacheOperationException {
public GraphSerialisable getGraph(final String graphId) throws CacheOperationException {
return getGraphAccessPair(graphId).getLeft();
}

Expand All @@ -237,7 +232,7 @@ public GraphAccess getGraphAccess(final String graphId) throws CacheOperationExc
* @throws CacheOperationException If issue getting from cache.
* @throws IllegalArgumentException If graph not found.
*/
public Pair<Graph, GraphAccess> getGraphAccessPair(final String graphId) throws CacheOperationException {
public Pair<GraphSerialisable, GraphAccess> getGraphAccessPair(final String graphId) throws CacheOperationException {
if (quickAccessGraphs.containsKey(graphId)) {
return quickAccessGraphs.get(graphId);
}
Expand All @@ -248,16 +243,9 @@ public Pair<Graph, GraphAccess> getGraphAccessPair(final String graphId) throws
String.format(GRAPH_ID_ERROR, graphId));
}
// Save to quick access for next time
Graph graph = new Graph.Builder()
.config(graphAndAccess.getLeft().getConfig())
.addSchema(graphAndAccess.getLeft().getSchema())
.storeProperties(graphAndAccess.getLeft().getStoreProperties())
.addToLibrary(false)
.build();
Pair<Graph, GraphAccess> pair = new ImmutablePair<>(graph, graphAndAccess.getRight());
quickAccessGraphs.putIfAbsent(graphId, pair);

return pair;
quickAccessGraphs.putIfAbsent(graphId, graphAndAccess);

return graphAndAccess;
}

/**
Expand Down Expand Up @@ -308,8 +296,8 @@ public void setDefaultGraphIds(final List<String> defaultGraphIds) {
*/
public void changeGraphId(final String graphToUpdateId, final String newGraphId) throws StoreException, CacheOperationException {
// Get existing graph and access
final Pair<Graph, GraphAccess> graphPairToUpdate = getGraphAccessPair(graphToUpdateId);
final Graph graphToUpdate = graphPairToUpdate.getLeft();
final Pair<GraphSerialisable, GraphAccess> graphPairToUpdate = getGraphAccessPair(graphToUpdateId);
final GraphSerialisable graphToUpdate = graphPairToUpdate.getLeft();
final GraphAccess graphAccess = graphPairToUpdate.getRight();

// Remove from cache
Expand All @@ -318,28 +306,18 @@ public void changeGraphId(final String graphToUpdateId, final String newGraphId)
// For accumulo update the table with the new graph ID
if (graphToUpdate.getStoreProperties().getStoreClass().startsWith(AccumuloStore.class.getPackage().getName())) {
renameTable((AccumuloProperties) graphToUpdate.getStoreProperties(), graphToUpdateId, newGraphId);
// Update id in the original graph
graphToUpdate.getConfig().setGraphId(newGraphId);
GraphSerialisable updatedGraphSerialisable = new GraphSerialisable.Builder(graphToUpdate)
.config(graphToUpdate.getConfig())
.build();
// Add graph with new id back to cache
addGraph(updatedGraphSerialisable, graphAccess);
} else {
// For other stores just re-add with new graph ID
graphToUpdate.getConfig().setGraphId(newGraphId);
addGraph(graphToUpdate, graphAccess);
}

// Need to remove old cache hooks in the config so the hooks update with the correct suffix
List<GraphHook> graphHooks = new ArrayList<>(graphToUpdate
.getConfig()
.getHooks());
graphHooks.removeIf(NamedViewResolver.class::isInstance);
graphHooks.removeIf(NamedOperationResolver.class::isInstance);
graphToUpdate.getConfig().setHooks(null);
graphToUpdate.getConfig().setHooks(graphHooks);

// Re-initialise so the store reference gets updated with the new ID
graphToUpdate.getConfig().setGraphId(newGraphId);
Graph updatedGraph = new Graph.Builder()
.config(graphToUpdate.getConfig())
.addSchema(graphToUpdate.getSchema())
.storeProperties(graphToUpdate.getStoreProperties())
.store(graphToUpdate.getStore())
.build();

// Add graph with new id back to cache
addGraph(updatedGraph, graphAccess);
}

/**
Expand All @@ -351,12 +329,10 @@ public void changeGraphId(final String graphToUpdateId, final String newGraphId)
* @throws CacheOperationException If issue updating the cache.
*/
public void changeGraphAccess(final String graphId, final GraphAccess newAccess) throws CacheOperationException {
final Graph graph = getGraph(graphId);
// Create the new pairs
final Pair<Graph, GraphAccess> graphAndAccessPair = new ImmutablePair<>(graph, newAccess);
final Pair<GraphSerialisable, GraphAccess> gsAndAccessPair = new ImmutablePair<>(new GraphSerialisable(graph), newAccess);
final Pair<GraphSerialisable, GraphAccess> graphAndAccessPair = new ImmutablePair<>(getGraph(graphId), newAccess);
// Add to the cache this will overwrite any existing value
graphCache.getCache().put(graphId, gsAndAccessPair);
graphCache.getCache().put(graphId, graphAndAccessPair);
// Add to quick access
quickAccessGraphs.put(graphId, graphAndAccessPair);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.graph.OperationView;
Expand Down Expand Up @@ -50,7 +50,7 @@ private FederatedUtils() {
* @param graphs Graphs to check.
* @return Do they share groups.
*/
public static boolean doGraphsShareGroups(final List<Graph> graphs) {
public static boolean doGraphsShareGroups(final List<GraphSerialisable> graphs) {
// Compare all schemas against each other
for (int i = 0; i < graphs.size() - 1; i++) {
for (int j = i + 1; j < graphs.size(); j++) {
Expand All @@ -75,7 +75,7 @@ public static boolean doGraphsShareGroups(final List<Graph> graphs) {
* @param depthLimit Limit to the recursion depth.
* @return A valid version of the operation chain.
*/
public static OperationChain getValidOperationForGraph(final Operation operation, final Graph graph, final int depth, final int depthLimit) {
public static OperationChain getValidOperationForGraph(final Operation operation, final GraphSerialisable graph, final int depth, final int depthLimit) {
LOGGER.debug("Creating valid operation for graph, depth is: {}", depth);
final Collection<Operation> updatedOperations = new ArrayList<>();

Expand Down Expand Up @@ -123,7 +123,7 @@ public static OperationChain getValidOperationForGraph(final Operation operation
* @param graph The relevant graph.
* @return A version of the view valid for the graph.
*/
public static View getValidViewForGraph(final View view, final Graph graph) {
public static View getValidViewForGraph(final View view, final GraphSerialisable graph) {
final Schema schema = graph.getSchema();

// Figure out all the groups relevant to the graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
Expand Down Expand Up @@ -114,18 +115,17 @@ public Object doOperation(final P operation, final Context context, final Store
return new FederatedOutputHandler<>().doOperation((Output) operation, context, store);
}

List<Graph> graphsToExecute = getGraphsToExecuteOn(operation, context, (FederatedStore) store);
List<GraphSerialisable> graphsToExecute = getGraphsToExecuteOn(operation, context, (FederatedStore) store);
// No-op
if (graphsToExecute.isEmpty()) {
return null;
}

// Execute the operation chain on each graph
for (final Graph graph : graphsToExecute) {
for (final GraphSerialisable graph : graphsToExecute) {
try {
graph.execute(
FederatedUtils.getValidOperationForGraph(operation, graph, 0, fixLimit),
context.getUser());
OperationChain<Void> fixedChain = FederatedUtils.getValidOperationForGraph(operation, graph, 0, fixLimit);
graph.getGraph().execute(fixedChain, context.getUser());
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", graph.getGraphId());
Expand Down Expand Up @@ -154,11 +154,11 @@ public Object doOperation(final P operation, final Context context, final Store
* @return List of {@link Graph}s to execute on.
* @throws OperationException Fail to get Graphs.
*/
protected List<Graph> getGraphsToExecuteOn(final Operation operation, final Context context,
protected List<GraphSerialisable> getGraphsToExecuteOn(final Operation operation, final Context context,
final FederatedStore store) throws OperationException {

List<String> specifiedGraphIds = new ArrayList<>();
List<Graph> graphsToExecute = new ArrayList<>();
List<GraphSerialisable> graphsToExecute = new ArrayList<>();

// If user specified graph IDs for this chain parse as comma separated list
if (operation.containsOption(OPT_SHORT_GRAPH_IDS)) {
Expand All @@ -181,7 +181,7 @@ protected List<Graph> getGraphsToExecuteOn(final Operation operation, final Cont
// Get the corresponding graph serialisables
for (final String id : specifiedGraphIds) {
try {
Pair<Graph, GraphAccess> pair = store.getGraphAccessPair(id);
Pair<GraphSerialisable, GraphAccess> pair = store.getGraphAccessPair(id);

// Check the user has access to the graph
if (pair.getRight().hasReadAccess(context.getUser(), store.getProperties().getAdminAuth())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import uk.gov.gchq.gaffer.federated.simple.merge.DefaultResultAccumulator;
import uk.gov.gchq.gaffer.federated.simple.merge.FederatedResultAccumulator;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.get.GetSchemaHandler;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
Expand All @@ -50,7 +50,7 @@ public class FederatedOutputHandler<P extends Output<O>, O>
@Override
public O doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));
List<Graph> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

// No-op
if (graphsToExecute.isEmpty()) {
Expand All @@ -59,10 +59,10 @@ public O doOperation(final P operation, final Context context, final Store store

// Execute the operation chain on each graph
List<O> graphResults = new ArrayList<>();
for (final Graph graph : graphsToExecute) {
for (final GraphSerialisable graph : graphsToExecute) {
try {
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, graph, 0, fixLimit);
graphResults.add(graph.execute(fixedChain, context.getUser()));
graphResults.add(graph.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException | IllegalArgumentException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", graph.getGraphId());
Expand Down Expand Up @@ -110,7 +110,7 @@ public O doOperation(final P operation, final Context context, final Store store
protected FederatedResultAccumulator<O> getResultAccumulator(
final P operation,
final FederatedStore store,
final List<Graph> graphsToExecute) throws OperationException {
final List<GraphSerialisable> graphsToExecute) throws OperationException {
// Merge the store props with the operation options for setting up the
// accumulator
Properties combinedProps = store.getProperties().getProperties();
Expand All @@ -131,7 +131,7 @@ protected FederatedResultAccumulator<O> getResultAccumulator(
}
// Set the merged schema if we are aggregating
if (resultAccumulator.aggregateElements()) {
List<Schema> schemas = graphsToExecute.stream().map(Graph::getSchema).collect(Collectors.toList());
List<Schema> schemas = graphsToExecute.stream().map(GraphSerialisable::getSchema).collect(Collectors.toList());
resultAccumulator.setSchema(GetSchemaHandler.getMergedSchema(schemas));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
Expand All @@ -42,7 +42,7 @@ public class SeparateOutputHandler<P extends Output<O>, O> extends FederatedOper
@Override
public Map<String, O> doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));
List<Graph> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

if (graphsToExecute.isEmpty()) {
return new HashMap<>();
Expand All @@ -51,10 +51,10 @@ public Map<String, O> doOperation(final P operation, final Context context, fina
// Execute the operation chain on each graph
LOGGER.debug("Returning separated graph results");
Map<String, O> results = new HashMap<>();
for (final Graph graph : graphsToExecute) {
for (final GraphSerialisable graph : graphsToExecute) {
try {
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, graph, 0, fixLimit);
results.put(graph.getGraphId(), graph.execute(fixedChain, context.getUser()));
results.put(graph.getGraphId(), graph.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", graph.getGraphId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Void doOperation(final AddGraph operation, final Context context, final S
}

// Add the graph
((FederatedStore) store).addGraph(newGraph.getGraph(), access);
((FederatedStore) store).addGraph(newGraph, access);

return null;
}
Expand Down
Loading

0 comments on commit 338fe55

Please sign in to comment.