diff --git a/CHANGELOG.md b/CHANGELOG.md index c8f99f0b..952f7bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Enhancements ### Bug Fixes - Remove useCase and defaultParams field in WorkflowRequest ([#758](https://github.com/opensearch-project/flow-framework/pull/758)) +- Fix RBAC fetching from workflow state when template is not present ([#998](https://github.com/opensearch-project/flow-framework/pull/998)) ### Infrastructure ### Documentation diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index 813613a3..4e0967ca 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -156,7 +156,17 @@ private void resolveUserAndExecute( boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled; // Update workflow request, check if user has permissions to update the workflow // Get workflow and verify backend roles - getWorkflow(requestedUser, workflowId, filterByBackendRole, listener, function, client, clusterService, xContentRegistry); + getWorkflow( + requestedUser, + workflowId, + filterByBackendRole, + false, + listener, + function, + client, + clusterService, + xContentRegistry + ); } else { // Create Workflow. No need to get current workflow. function.run(); diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java index 2974f522..41e61c01 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -83,12 +83,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener executeDeleteRequest(request, listener, context), client, diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 9699de5a..af0eec66 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -129,8 +129,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener executeDeprovisionRequest(request, listener, context), + () -> executeDeprovisionRequest(request, listener, context, user), client, clusterService, xContentRegistry @@ -146,7 +147,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener listener, - ThreadContext.StoredContext context + ThreadContext.StoredContext context, + User user ) { String workflowId = request.getWorkflowId(); String allowDelete = request.getParams().get(ALLOW_DELETE); @@ -163,7 +165,8 @@ private void executeDeprovisionRequest( workflowId, response.getWorkflowState().resourcesCreated(), deleteAllowedResources, - listener + listener, + user ) ); }, exception -> { @@ -180,7 +183,8 @@ private void executeDeprovisionSequence( String workflowId, List resourcesCreated, Set deleteAllowedResources, - ActionListener listener + ActionListener listener, + User user ) { List deleteNotAllowed = new ArrayList<>(); // Create a list of ProcessNodes with the corresponding deprovision workflow steps @@ -294,26 +298,23 @@ private void executeDeprovisionSequence( logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed); } // This is a redundant best-effort backup to the incremental deletion done earlier - updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener); + updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener, user); } private void updateWorkflowState( String workflowId, List remainingResources, List deleteNotAllowed, - ActionListener listener + ActionListener listener, + User user ) { if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) { // Successful deprovision of all resources, reset state to initial flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> { if (Boolean.TRUE.equals(templateExists)) { - flowFrameworkIndicesHandler.putInitialStateToWorkflowState( - workflowId, - getUserContext(client), - ActionListener.wrap(indexResponse -> { - logger.info("Reset workflow {} state to NOT_STARTED", workflowId); - }, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); }) - ); + flowFrameworkIndicesHandler.putInitialStateToWorkflowState(workflowId, user, ActionListener.wrap(indexResponse -> { + logger.info("Reset workflow {} state to NOT_STARTED", workflowId); + }, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })); } else { flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> { logger.info("Deleted workflow {} state", workflowId); diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java index bf374e05..4106e492 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateTransportAction.java @@ -89,6 +89,7 @@ protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListe user, workflowId, filterByEnabled, + true, listener, () -> executeGetWorkflowStateRequest(request, listener, context), client, diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java index 58d37edf..2462a839 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowTransportAction.java @@ -98,6 +98,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener executeGetRequest(request, listener, context), client, diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 45f37416..81db1197 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -132,6 +132,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener executeProvisionRequest(request, listener, context), client, diff --git a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java index 8e501228..fc616343 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java @@ -139,6 +139,7 @@ protected void doExecute(Task task, ReprovisionWorkflowRequest request, ActionLi user, workflowId, filterByEnabled, + false, listener, () -> executeReprovisionRequest(request, listener, context), client, diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 154e343f..7428249f 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -36,6 +36,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.Template; +import org.opensearch.flowframework.model.WorkflowState; import org.opensearch.flowframework.transport.WorkflowResponse; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.index.query.BoolQueryBuilder; @@ -67,6 +68,7 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; /** * Utility methods for Template parsing @@ -284,6 +286,7 @@ public static SearchSourceBuilder addUserBackendRolesFilter(User user, SearchSou * @param requestedUser the user to execute the request * @param workflowId workflow id * @param filterByEnabled filter by enabled setting + * @param statePresent state present for the transport action * @param listener action listener * @param function workflow function * @param client node client @@ -294,6 +297,7 @@ public static void resolveUserAndExecute( User requestedUser, String workflowId, Boolean filterByEnabled, + Boolean statePresent, ActionListener listener, Runnable function, Client client, @@ -307,7 +311,17 @@ public static void resolveUserAndExecute( // !filterByEnabled means security is enabled and filterByEnabled is disabled function.run(); } else { - getWorkflow(requestedUser, workflowId, filterByEnabled, listener, function, client, clusterService, xContentRegistry); + getWorkflow( + requestedUser, + workflowId, + filterByEnabled, + statePresent, + listener, + function, + client, + clusterService, + xContentRegistry + ); } } catch (Exception e) { listener.onFailure(e); @@ -368,6 +382,7 @@ public static void checkFilterByBackendRoles(User requestedUser) { * @param requestUser the user to execute the request * @param workflowId workflow id * @param filterByEnabled filter by enabled setting + * @param statePresent state present for the transport action * @param listener action listener * @param function workflow function * @param client node client @@ -378,15 +393,17 @@ public static void getWorkflow( User requestUser, String workflowId, Boolean filterByEnabled, + Boolean statePresent, ActionListener listener, Runnable function, Client client, ClusterService clusterService, NamedXContentRegistry xContentRegistry ) { - if (clusterService.state().metadata().hasIndex(GLOBAL_CONTEXT_INDEX)) { + String index = statePresent ? WORKFLOW_STATE_INDEX : GLOBAL_CONTEXT_INDEX; + if (clusterService.state().metadata().hasIndex(index)) { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - GetRequest request = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId); + GetRequest request = new GetRequest(index, workflowId); client.get( request, ActionListener.wrap( @@ -395,9 +412,11 @@ public static void getWorkflow( requestUser, workflowId, filterByEnabled, + statePresent, listener, function, - xContentRegistry + xContentRegistry, + context ), exception -> { logger.error("Failed to get workflow: {}", workflowId, exception); @@ -407,10 +426,8 @@ public static void getWorkflow( ); } } else { - String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( - "Failed to retrieve template ({}) from global context.", - workflowId - ).getFormattedMessage(); + String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage("Failed to retrieve template ({}).", workflowId) + .getFormattedMessage(); logger.error(errorMessage); listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); } @@ -422,26 +439,30 @@ public static void getWorkflow( * @param response get response * @param workflowId workflow id * @param filterByEnabled filter by enabled setting + * @param statePresent state present for the transport action * @param listener action listener * @param function workflow function * @param xContentRegistry contentRegister to parse get response + * @param context thread context */ public static void onGetWorkflowResponse( GetResponse response, User requestUser, String workflowId, Boolean filterByEnabled, + Boolean statePresent, ActionListener listener, Runnable function, - NamedXContentRegistry xContentRegistry + NamedXContentRegistry xContentRegistry, + ThreadContext.StoredContext context ) { if (response.isExists()) { try ( XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef()) ) { + context.restore(); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - Template template = Template.parse(parser); - User resourceUser = template.getUser(); + User resourceUser = statePresent ? WorkflowState.parse(parser).getUser() : Template.parse(parser).getUser(); if (!filterByEnabled || checkUserPermissions(requestUser, resourceUser, workflowId) || isAdmin(requestUser)) { function.run(); diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 3570dccf..e21e41ba 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -337,6 +337,7 @@ public Response createIndexRole(String role, String index) throws IOException { TestHelpers.toHttpEntity( "{\n" + "\"cluster_permissions\": [\n" + + "\"cluster:admin/ingest/pipeline/put\",\n" + "\"cluster:admin/ingest/pipeline/delete\"\n" + "],\n" + "\"index_permissions\": [\n" @@ -353,6 +354,7 @@ public Response createIndexRole(String role, String index) throws IOException { + "\"crud\",\n" + "\"indices:admin/create\",\n" + "\"indices:admin/aliases\",\n" + + "\"indices:admin/settings/update\",\n" + "\"indices:admin/delete\"\n" + "]\n" + "}\n"