Skip to content

Commit

Permalink
Gh-3322: Cache updates for federated POC (#3323)
Browse files Browse the repository at this point in the history
* better handle cache related operations

* basic testing to ensure named ops work

* sonar smells

* whitespace

* add option to let results stay separate #3118

* add option to specify to use default graph ids

* typo

* check style

---------

Co-authored-by: p29876 <165825455+p29876@users.noreply.github.com>
  • Loading branch information
tb06904 and p29876 authored Oct 23, 2024
1 parent c8f0732 commit 5e2b804
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private Collection<Operation> resolveNamedOperations(final Operation operation,
.getOperationChain(namedOperation.getParameters());
// Update the operation inputs and add operation chain to the updated list
OperationHandlerUtil.updateOperationInput(namedOperationChain, namedOperation.getInput());
namedOperationChain.setOptions(namedOperation.getOptions());

// Run again to resolve any nested operations in the chain before adding
namedOperationChain.updateOperations(resolveNamedOperations(namedOperationChain, user, depth + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import uk.gov.gchq.gaffer.federated.simple.operation.GetAllGraphIds;
import uk.gov.gchq.gaffer.federated.simple.operation.GetAllGraphInfo;
import uk.gov.gchq.gaffer.federated.simple.operation.RemoveGraph;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.EitherOperationHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOutputHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.add.AddGraphHandler;
Expand All @@ -46,6 +47,13 @@
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.ChangeGraphIdHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.RemoveGraphHandler;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.named.operation.AddNamedOperation;
import uk.gov.gchq.gaffer.named.operation.DeleteNamedOperation;
import uk.gov.gchq.gaffer.named.operation.GetAllNamedOperations;
import uk.gov.gchq.gaffer.named.operation.NamedOperation;
import uk.gov.gchq.gaffer.named.view.AddNamedView;
import uk.gov.gchq.gaffer.named.view.DeleteNamedView;
import uk.gov.gchq.gaffer.named.view.GetAllNamedViews;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
Expand All @@ -54,6 +62,7 @@
import uk.gov.gchq.gaffer.operation.impl.get.GetAdjacentIds;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetGraphCreatedTime;
import uk.gov.gchq.gaffer.serialisation.Serialiser;
import uk.gov.gchq.gaffer.serialisation.ToBytesSerialiser;
import uk.gov.gchq.gaffer.store.Context;
Expand All @@ -65,8 +74,17 @@
import uk.gov.gchq.gaffer.store.operation.GetSchema;
import uk.gov.gchq.gaffer.store.operation.GetTraits;
import uk.gov.gchq.gaffer.store.operation.OperationChainValidator;
import uk.gov.gchq.gaffer.store.operation.handler.GetGraphCreatedTimeHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OutputOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.AddNamedOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.AddNamedViewHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.DeleteNamedOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.DeleteNamedViewHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.GetAllNamedOperationsHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.GetAllNamedViewsHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.NamedOperationHandler;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.ViewValidator;

Expand All @@ -84,6 +102,7 @@
import static uk.gov.gchq.gaffer.accumulostore.utils.TableUtils.renameTable;
import static uk.gov.gchq.gaffer.cache.CacheServiceLoader.DEFAULT_SERVICE_NAME;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_DEFAULT_GRAPH_IDS;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_GRAPH_CACHE_NAME;

/**
* The federated store implementation. Provides the set up and required
Expand Down Expand Up @@ -293,16 +312,6 @@ public Schema getSchema(final List<GraphSerialisable> graphs) {
}
}

/**
* Access to getting the operations that have handlers specific to this
* store.
*
* @return The Operation classes handled by this store.
*/
public Set<Class<? extends Operation>> getStoreSpecificOperations() {
return storeHandlers.keySet();
}

@Override
public void initialise(final String graphId, final Schema unused, final StoreProperties properties) throws StoreException {
if (unused != null) {
Expand All @@ -311,7 +320,7 @@ public void initialise(final String graphId, final Schema unused, final StorePro
super.initialise(graphId, new Schema(), properties);

// Init the cache for graphs
graphCache = new Cache<>("federatedGraphCache-" + graphId);
graphCache = new Cache<>(properties.get(PROP_GRAPH_CACHE_NAME, "federatedGraphCache_" + graphId));

// Get and set default graph IDs from properties
if (properties.containsKey(PROP_DEFAULT_GRAPH_IDS)) {
Expand Down Expand Up @@ -347,11 +356,34 @@ protected Object doUnhandledOperation(final Operation operation, final Context c
@Override
protected void addAdditionalOperationHandlers() {
storeHandlers.forEach(this::addOperationHandler);

final String namedOpCacheSuffix = getProperties().getCacheServiceNamedOperationSuffix(getGraphId());
final String namedViewCacheSuffix = getProperties().getCacheServiceNamedViewSuffix(getGraphId());
final Boolean nestedNamedOpsAllowed = getProperties().isNestedNamedOperationAllow();

// Add overrides as cache operations could be run locally or on sub graphs
if (getProperties().getNamedOperationEnabled()) {
addOperationHandler(NamedOperation.class, new EitherOperationHandler<>(new NamedOperationHandler()));
addOperationHandler(AddNamedOperation.class, new EitherOperationHandler<>(
new AddNamedOperationHandler(namedOpCacheSuffix, nestedNamedOpsAllowed)));
addOperationHandler(GetAllNamedOperations.class, new EitherOperationHandler<>(new GetAllNamedOperationsHandler(namedOpCacheSuffix)));
addOperationHandler(DeleteNamedOperation.class, new EitherOperationHandler<>(new DeleteNamedOperationHandler(namedOpCacheSuffix)));
}

// Named Views could be either
if (getProperties().getNamedViewEnabled()) {
addOperationHandler(AddNamedView.class, new EitherOperationHandler<>(new AddNamedViewHandler(namedViewCacheSuffix)));
addOperationHandler(GetAllNamedViews.class, new EitherOperationHandler<>(new GetAllNamedViewsHandler(namedViewCacheSuffix)));
addOperationHandler(DeleteNamedView.class, new EitherOperationHandler<>(new DeleteNamedViewHandler(namedViewCacheSuffix)));
}

// Misc operations that could be for sub graphs or not
addOperationHandler(GetGraphCreatedTime.class, new EitherOperationHandler<>(new GetGraphCreatedTimeHandler()));
}

@Override
protected OperationHandler<? extends OperationChain<?>> getOperationChainHandler() {
return new FederatedOperationHandler<>();
return new EitherOperationHandler<>(new OperationChainHandler<>(getOperationChainValidator(), getOperationChainOptimisers()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class FederatedStoreProperties extends StoreProperties {
* Property key for setting if public graphs can be added to the store or not
*/
public static final String PROP_ALLOW_PUBLIC_GRAPHS = "gaffer.store.federated.allowPublicGraphs";
/**
* Property key for setting a custom name for the graph cache, by default
* this will be "federatedGraphCache_" followed by the federated graph ID.
*/
public static final String PROP_GRAPH_CACHE_NAME = "gaffer.store.federated.graphCache.name";
/**
* Property key for the class to use when merging number results
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

/**
* Custom handler for operations that could in theory target sub graphs or the
* federated store directly.
*/
public class EitherOperationHandler<O extends Operation> implements OperationHandler<O> {
private static final Logger LOGGER = LoggerFactory.getLogger(EitherOperationHandler.class);

private final OperationHandler<O> standardHandler;

public EitherOperationHandler(final OperationHandler<O> standardHandler) {
this.standardHandler = standardHandler;
}

/**
* If graph IDs are in the options the operation will be handled by a
* {@link FederatedOperationHandler}, otherwise the original handler will be
* used e.g. executed on the federated store directly.
*/
@Override
public Object doOperation(final O operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Checking if Operation should be handled locally or on sub graphs: {}", operation);

// If we have graph IDs then run as a federated operation
if (operation.containsOption(FederatedOperationHandler.OPT_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_SHORT_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_EXCLUDE_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_USE_DFLT_GRAPH_IDS)) {
LOGGER.debug("Operation has specified graph IDs, it will be handled by sub graphs");
return new FederatedOperationHandler<>().doOperation(operation, context, store);
}

// No sub graphs involved just run the handler for this operations on the federated store
return standardHandler.doOperation(operation, context, store);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,16 @@
import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -68,51 +65,41 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
public static final String OPT_EXCLUDE_GRAPH_IDS = "federated.excludeGraphIds";

/**
* The boolean operation option to specify if element merging should be applied or not.
* A boolean option to specify to use the default graph IDs. The option is
* not specifically required as default graph IDs will be used as a
* fallback, but if set the whole chain will be forwarded rather than each
* individual operation so can speed things up.
*/
public static final String OPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";
public static final String OPT_USE_DFLT_GRAPH_IDS = "federated.useDefaultGraphIds";

/**
* A boolean option to specify if to forward the whole operation chain to the sub graph or not.
* The boolean operation option to specify if element merging should be applied or not.
*/
public static final String OPT_FORWARD_CHAIN = "federated.forwardChain";
public static final String OPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";

/**
* A boolean option to specify if a graph should be skipped if execution
* fails on it e.g. continue executing on the rest of the graphs
*/
public static final String OPT_SKIP_FAILED_EXECUTE = "federated.skipGraphOnFail";

/**
* A boolean option to specify if the results from each graph should be kept
* separate. If set this will return a map where each key value is the graph
* ID and its respective result.
*/
public static final String OPT_SEPARATE_RESULTS = "federated.separateResults";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
// Check inside operation chains in case there are operations that don't require running on sub graphs
if (operation instanceof OperationChain) {
Set<Class<? extends Operation>> storeSpecificOps = ((FederatedStore) store).getStoreSpecificOperations();
List<Class<? extends Operation>> chainOps = ((OperationChain<?>) operation).flatten().stream()
.map(Operation::getClass)
.collect(Collectors.toList());

// If all the operations in the chain can be handled by the store then execute them.
// Or if told not to forward the whole chain process each operation individually.
if (storeSpecificOps.containsAll(chainOps) ||
(!Boolean.parseBoolean(operation.getOption(OPT_FORWARD_CHAIN, "true")))) {
// Use default handler
return new OperationChainHandler<>(store.getOperationChainValidator(), store.getOperationChainOptimisers())
.doOperation((OperationChain<Object>) operation, context, store);
}

// Check if we have a mix as that is an issue
// It's better to keep federated and non federated separate so error and report back
if (!Collections.disjoint(storeSpecificOps, chainOps)) {
throw new OperationException(
"Chain contains standard Operations alongside federated store specific Operations."
+ " Please submit each type separately or set: '" + OPT_FORWARD_CHAIN + "' to: 'false'.");
}
}

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
// Should we keep the results separate
if (Boolean.parseBoolean(operation.getOption(OPT_SEPARATE_RESULTS, "false"))) {
return new SeparateOutputHandler<>().doOperation((Output) operation, context, store);
}
return new FederatedOutputHandler<>().doOperation((Output) operation, context, store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public O doOperation(final P operation, final Context context, final Store store
}

// Not expecting any output so exit since we've executed
if (operation.getOutputClass().isAssignableFrom(Void.class) || graphResults.isEmpty()) {
if (operation.getOutputClass() == Void.class || graphResults.isEmpty()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Handler for running federated operations but keeping the results separate
* under a key of the graph ID the results come from.
*/
public class SeparateOutputHandler<P extends Output<O>, O> extends FederatedOperationHandler<P> {
private static final Logger LOGGER = LoggerFactory.getLogger(SeparateOutputHandler.class);

@Override
public Map<String, O> doOperation(final P operation, final Context context, final Store store) throws OperationException {
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

if (graphsToExecute.isEmpty()) {
return new HashMap<>();
}

// Execute the operation chain on each graph
LOGGER.debug("Returning separated graph results");
Map<String, O> results = new HashMap<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
results.put(gs.getGraphId(), gs.getGraph().execute(operation, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;
}
}
}

return results;
}
}
Loading

0 comments on commit 5e2b804

Please sign in to comment.