Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh-3350: Simple fed store fixes #3358

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Crown Copyright
* Copyright 2020-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,9 +21,11 @@
import uk.gov.gchq.koryphe.Summary;
import uk.gov.gchq.koryphe.predicate.KoryphePredicate;

import java.io.Serializable;

@Since("1.13.1")
@Summary("Predicate which never allows user access")
public class NoAccessUserPredicate extends KoryphePredicate<User> {
public class NoAccessUserPredicate extends KoryphePredicate<User> implements Serializable {
@Override
public boolean test(final User user) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Crown Copyright
* Copyright 2020-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,16 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.access.predicate.user;

import uk.gov.gchq.gaffer.user.User;
import uk.gov.gchq.koryphe.Since;
import uk.gov.gchq.koryphe.Summary;
import uk.gov.gchq.koryphe.predicate.KoryphePredicate;

import java.io.Serializable;

@Since("1.13.1")
@Summary("A predicate which always allows a user access")
public class UnrestrictedAccessUserPredicate extends KoryphePredicate<User> {
public class UnrestrictedAccessUserPredicate extends KoryphePredicate<User> implements Serializable {
@Override
public boolean test(final User user) {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,10 +91,9 @@ public static OperationChain getValidOperationForGraph(final Operation operation
&& ((OperationView) operation).getView().hasGroups()) {

// Update the view for the graph
((OperationView) operation).setView(
getValidViewForGraph(((OperationView) operation).getView(), graphSerialisable));

updatedOperations.add(operation);
OperationView fixedOp = (OperationView) operation.shallowClone();
fixedOp.setView(getValidViewForGraph(fixedOp.getView(), graphSerialisable));
updatedOperations.add((Operation) fixedOp);

// Recursively go into operation chains to make sure everything is fixed
} else if (operation instanceof OperationChain) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,12 +32,10 @@
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Main default handler for federated operations. Handles delegation to selected
Expand Down Expand Up @@ -97,10 +95,15 @@
*/
public static final String OPT_FIX_OP_LIMIT = "federated.fixOperationLimit";

/**
* Default depth limit for fixing an operation chain.
*/
public static final int DFLT_FIX_OP_LIMIT = 5;

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
Expand Down Expand Up @@ -153,40 +156,43 @@
*/
protected List<GraphSerialisable> getGraphsToExecuteOn(final Operation operation, final Context context,
final FederatedStore store) throws OperationException {
// Use default graph IDs as fallback
List<String> graphIds = store.getDefaultGraphIds();
List<GraphSerialisable> graphsToExecute = new LinkedList<>();
// If user specified graph IDs for this chain parse as comma separated list
if (operation.containsOption(OPT_GRAPH_IDS)) {
graphIds = Arrays.asList(operation.getOption(OPT_GRAPH_IDS).split(","));
} else if (operation.containsOption(OPT_SHORT_GRAPH_IDS)) {
graphIds = Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(","));
}
List<String> specifiedGraphIds = new ArrayList<>();
List<GraphSerialisable> graphsToExecute = new ArrayList<>();

// If user specified graph IDs for this chain parse as comma separated list
if (operation.containsOption(OPT_SHORT_GRAPH_IDS)) {
specifiedGraphIds.addAll(Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(",")));
// Check legacy option
} else if (operation.containsOption(OPT_GRAPH_IDS)) {
specifiedGraphIds.addAll(Arrays.asList(operation.getOption(OPT_GRAPH_IDS).split(",")));
// If a user has specified to just exclude some graphs then run all but them
if (operation.containsOption(OPT_EXCLUDE_GRAPH_IDS)) {
// Get all the IDs
graphIds = StreamSupport.stream(store.getAllGraphsAndAccess().spliterator(), false)
.map(Pair::getLeft)
.map(GraphSerialisable::getGraphId)
.collect(Collectors.toList());

} else if (operation.containsOption(OPT_EXCLUDE_GRAPH_IDS)) {
store.getAllGraphsAndAccess().forEach(pair -> specifiedGraphIds.add(pair.getLeft().getGraphId()));
// Exclude the ones the user has specified
Arrays.asList(operation.getOption(OPT_EXCLUDE_GRAPH_IDS).split(",")).forEach(graphIds::remove);
Arrays.asList(operation.getOption(OPT_EXCLUDE_GRAPH_IDS).split(",")).forEach(specifiedGraphIds::remove);
}

// Use default graph IDs as a fallback
if (specifiedGraphIds.isEmpty()) {
specifiedGraphIds.addAll(store.getDefaultGraphIds());
}

try {
// Get the corresponding graph serialisable
for (final String id : graphIds) {
LOGGER.debug("Will execute on Graph: {}", id);
// Get the corresponding graph serialisables
for (final String id : specifiedGraphIds) {
try {
Pair<GraphSerialisable, GraphAccess> pair = store.getGraphAccessPair(id);

// Check the user has access to the graph
if (pair.getRight().hasReadAccess(context.getUser(), store.getProperties().getAdminAuth())) {
LOGGER.debug("User has access, will execute on Graph: '{}'", id);
// Create a new graph object from the serialised info
graphsToExecute.add(pair.getLeft());
} else {
LOGGER.warn("User does not have access, to Graph: '{}' it will be skipped", id);

Check warning on line 191 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java#L191

Added line #L191 was not covered by tests
}
} catch (final CacheOperationException e) {
throw new OperationException("Failed to get Graph from cache: '" + id + "'", e);

Check warning on line 194 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java#L193-L194

Added lines #L193 - L194 were not covered by tests
}
} catch (final CacheOperationException e) {
throw new OperationException("Failed to get Graphs from cache", e);
}

// Keep graphs sorted so results returned are predictable between runs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,7 @@ public class FederatedOutputHandler<P extends Output<O>, O>

@Override
public O doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

// No-op
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,9 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
Expand All @@ -39,6 +41,7 @@ public class SeparateOutputHandler<P extends Output<O>, O> extends FederatedOper

@Override
public Map<String, O> doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

if (graphsToExecute.isEmpty()) {
Expand All @@ -50,7 +53,8 @@ public Map<String, O> doOperation(final P operation, final Context context, fina
Map<String, O> results = new HashMap<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
results.put(gs.getGraphId(), gs.getGraph().execute(operation, context.getUser()));
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit);
results.put(gs.getGraphId(), gs.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
Expand Down
Loading