Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed RBAC fetching from workflow state when template is not present #998

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Enhancements
### Bug Fixes
- Remove useCase and defaultParams field in WorkflowRequest ([#758](https://github.com/opensearch-project/flow-framework/pull/758))
- Fix RBAC fetching from workflow state when template is not present ([#998](https://github.com/opensearch-project/flow-framework/pull/998))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,17 @@ private void resolveUserAndExecute(
boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled;
// Update workflow request, check if user has permissions to update the workflow
// Get workflow and verify backend roles
getWorkflow(requestedUser, workflowId, filterByBackendRole, listener, function, client, clusterService, xContentRegistry);
getWorkflow(
requestedUser,
workflowId,
filterByBackendRole,
false,
listener,
function,
client,
clusterService,
xContentRegistry
);
} else {
// Create Workflow. No need to get current workflow.
function.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
String workflowId = request.getWorkflowId();
User user = getUserContext(client);

final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();

resolveUserAndExecute(
user,
workflowId,
filterByEnabled,
clearStatus,
listener,
() -> executeDeleteRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@
user,
workflowId,
filterByEnabled,
true,
listener,
() -> executeDeprovisionRequest(request, listener, context),
() -> executeDeprovisionRequest(request, listener, context, user),
client,
clusterService,
xContentRegistry
Expand All @@ -146,7 +147,8 @@
private void executeDeprovisionRequest(
WorkflowRequest request,
ActionListener<WorkflowResponse> listener,
ThreadContext.StoredContext context
ThreadContext.StoredContext context,
User user
) {
String workflowId = request.getWorkflowId();
String allowDelete = request.getParams().get(ALLOW_DELETE);
Expand All @@ -163,7 +165,8 @@
workflowId,
response.getWorkflowState().resourcesCreated(),
deleteAllowedResources,
listener
listener,
user
)
);
}, exception -> {
Expand All @@ -180,7 +183,8 @@
String workflowId,
List<ResourceCreated> resourcesCreated,
Set<String> deleteAllowedResources,
ActionListener<WorkflowResponse> listener
ActionListener<WorkflowResponse> listener,
User user
) {
List<ResourceCreated> deleteNotAllowed = new ArrayList<>();
// Create a list of ProcessNodes with the corresponding deprovision workflow steps
Expand Down Expand Up @@ -294,26 +298,23 @@
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
// This is a redundant best-effort backup to the incremental deletion done earlier
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener, user);
}

private void updateWorkflowState(
String workflowId,
List<ResourceCreated> remainingResources,
List<ResourceCreated> deleteNotAllowed,
ActionListener<WorkflowResponse> listener
ActionListener<WorkflowResponse> listener,
User user
) {
if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) {
// Successful deprovision of all resources, reset state to initial
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(workflowId, user, ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); }));

Check warning on line 317 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L316-L317

Added lines #L316 - L317 were not covered by tests
} else {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe
user,
workflowId,
filterByEnabled,
true,
listener,
() -> executeGetWorkflowStateRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetW
user,
workflowId,
filterByEnabled,
false,
listener,
() -> executeGetRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
user,
workflowId,
filterByEnabled,
false,
listener,
() -> executeProvisionRequest(request, listener, context),
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ protected void doExecute(Task task, ReprovisionWorkflowRequest request, ActionLi
user,
workflowId,
filterByEnabled,
false,
listener,
() -> executeReprovisionRequest(request, listener, context),
client,
Expand Down
43 changes: 32 additions & 11 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.transport.WorkflowResponse;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -67,6 +68,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;

/**
* Utility methods for Template parsing
Expand Down Expand Up @@ -284,6 +286,7 @@
* @param requestedUser the user to execute the request
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param statePresent state present for the transport action
* @param listener action listener
* @param function workflow function
* @param client node client
Expand All @@ -294,6 +297,7 @@
User requestedUser,
String workflowId,
Boolean filterByEnabled,
Boolean statePresent,
ActionListener<? extends ActionResponse> listener,
Runnable function,
Client client,
Expand All @@ -307,7 +311,17 @@
// !filterByEnabled means security is enabled and filterByEnabled is disabled
function.run();
} else {
getWorkflow(requestedUser, workflowId, filterByEnabled, listener, function, client, clusterService, xContentRegistry);
getWorkflow(

Check warning on line 314 in src/main/java/org/opensearch/flowframework/util/ParseUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/ParseUtils.java#L314

Added line #L314 was not covered by tests
requestedUser,
workflowId,
filterByEnabled,
statePresent,
listener,
function,
client,
clusterService,
xContentRegistry
);
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -368,6 +382,7 @@
* @param requestUser the user to execute the request
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param statePresent state present for the transport action
* @param listener action listener
* @param function workflow function
* @param client node client
Expand All @@ -378,15 +393,17 @@
User requestUser,
String workflowId,
Boolean filterByEnabled,
Boolean statePresent,
ActionListener listener,
Runnable function,
Client client,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry
) {
if (clusterService.state().metadata().hasIndex(GLOBAL_CONTEXT_INDEX)) {
String index = statePresent ? WORKFLOW_STATE_INDEX : GLOBAL_CONTEXT_INDEX;
if (clusterService.state().metadata().hasIndex(index)) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
GetRequest request = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId);
GetRequest request = new GetRequest(index, workflowId);
client.get(
request,
ActionListener.wrap(
Expand All @@ -395,9 +412,11 @@
requestUser,
workflowId,
filterByEnabled,
statePresent,
listener,
function,
xContentRegistry
xContentRegistry,
context
),
exception -> {
logger.error("Failed to get workflow: {}", workflowId, exception);
Expand All @@ -407,10 +426,8 @@
);
}
} else {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to retrieve template ({}) from global context.",
workflowId
).getFormattedMessage();
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage("Failed to retrieve template ({}).", workflowId)
.getFormattedMessage();

Check warning on line 430 in src/main/java/org/opensearch/flowframework/util/ParseUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/ParseUtils.java#L429-L430

Added lines #L429 - L430 were not covered by tests
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
}
Expand All @@ -422,26 +439,30 @@
* @param response get response
* @param workflowId workflow id
* @param filterByEnabled filter by enabled setting
* @param statePresent state present for the transport action
* @param listener action listener
* @param function workflow function
* @param xContentRegistry contentRegister to parse get response
* @param context thread context
*/
public static void onGetWorkflowResponse(
GetResponse response,
User requestUser,
String workflowId,
Boolean filterByEnabled,
Boolean statePresent,
ActionListener<WorkflowResponse> listener,
Runnable function,
NamedXContentRegistry xContentRegistry
NamedXContentRegistry xContentRegistry,
ThreadContext.StoredContext context
) {
if (response.isExists()) {
try (
XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
context.restore();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Template template = Template.parse(parser);
User resourceUser = template.getUser();
User resourceUser = statePresent ? WorkflowState.parse(parser).getUser() : Template.parse(parser).getUser();

if (!filterByEnabled || checkUserPermissions(requestUser, resourceUser, workflowId) || isAdmin(requestUser)) {
function.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ public Response createIndexRole(String role, String index) throws IOException {
TestHelpers.toHttpEntity(
"{\n"
+ "\"cluster_permissions\": [\n"
+ "\"cluster:admin/ingest/pipeline/put\",\n"
+ "\"cluster:admin/ingest/pipeline/delete\"\n"
+ "],\n"
+ "\"index_permissions\": [\n"
Expand All @@ -353,6 +354,7 @@ public Response createIndexRole(String role, String index) throws IOException {
+ "\"crud\",\n"
+ "\"indices:admin/create\",\n"
+ "\"indices:admin/aliases\",\n"
+ "\"indices:admin/settings/update\",\n"
+ "\"indices:admin/delete\"\n"
+ "]\n"
+ "}\n"
Expand Down
Loading