Skip to content

Commit

Permalink
Integrate multitenancy into workflow provisioning (#1012)
Browse files Browse the repository at this point in the history
* Adding tenantId in ProcessNode to be surfaced to the client

Signed-off-by: Siddhartha Bingi <sidbingi@amazon.com>

* Passing tenantId in Transport Actions

Signed-off-by: Siddhartha Bingi <sidbingi@amazon.com>

* Updating tests to have tenantId as null for consistency

Signed-off-by: Siddhartha Bingi <sidbingi@amazon.com>

* Minor code refactoring

Signed-off-by: Siddhartha Bingi <sidbingi@amazon.com>

---------

Signed-off-by: Siddhartha Bingi <sidbingi@amazon.com>
Co-authored-by: Siddhartha Bingi <sidbingi@amazon.com>
(cherry picked from commit 055f1ea)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and Siddhartha Bingi committed Jan 23, 2025
1 parent 123ec5d commit dd7dace
Show file tree
Hide file tree
Showing 57 changed files with 312 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, ActionList

private void validateWorkflows(Template template) throws Exception {
for (Workflow workflow : template.workflows().values()) {
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap());
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap(), "fakeTenantId");
workflowProcessSorter.validate(sortedNodes, pluginsService);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ private void executeDeprovisionSequence(
Collections.emptyList(),
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
flowFrameworkSettings.getRequestTimeout()
flowFrameworkSettings.getRequestTimeout(),
"fakeTenantId"
)
);
}
Expand Down Expand Up @@ -274,7 +275,8 @@ private void executeDeprovisionSequence(
pn.predecessors(),
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
pn.nodeTimeout()
pn.nodeTimeout(),

Check warning on line 278 in src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java#L278

Added line #L278 was not covered by tests
"fakeTenantId"
);
}).collect(Collectors.toList());
// Pause briefly before next loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ private void executeProvisionRequest(
List<ProcessNode> provisionProcessSequence = workflowProcessSorter.sortProcessNodes(
provisionWorkflow,
workflowId,
request.getParams()
request.getParams(),
"fakeTenantId"
);
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ private void executeReprovisionRequest(
List<ProcessNode> updatedProcessSequence = workflowProcessSorter.sortProcessNodes(
provisionWorkflow,
request.getWorkflowId(),
Collections.emptyMap() // TODO : Add suport to reprovision substitution templates
Collections.emptyMap(), // TODO : Add suport to reprovision substitution templates
"fakeTenantId"
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {

PlainActionFuture<WorkflowData> createPipelineFuture = PlainActionFuture.newFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {

PlainActionFuture<WorkflowData> registerLocalModelFuture = PlainActionFuture.newFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> createPipelineFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> createConnectorFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> deleteAgentFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> deleteConnectorFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> deleteIndexFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> deletePipelineFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> deleteModelFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> deleteSearchPipelineFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {

PlainActionFuture<WorkflowData> deployModelFuture = PlainActionFuture.newFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> future = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ProcessNode {
private final ThreadPool threadPool;
private final String threadPoolName;
private final TimeValue nodeTimeout;
private final String tenantId;

private final PlainActionFuture<WorkflowData> future = PlainActionFuture.newFuture();

Expand All @@ -51,6 +52,7 @@ public class ProcessNode {
* @param threadPool The OpenSearch thread pool
* @param threadPoolName The thread pool to use
* @param nodeTimeout The timeout value for executing on this node
* @param tenantId The tenantId
*/
public ProcessNode(
String id,
Expand All @@ -61,7 +63,8 @@ public ProcessNode(
List<ProcessNode> predecessors,
ThreadPool threadPool,
String threadPoolName,
TimeValue nodeTimeout
TimeValue nodeTimeout,
String tenantId
) {
this.id = id;
this.workflowStep = workflowStep;
Expand All @@ -72,6 +75,7 @@ public ProcessNode(
this.threadPool = threadPool;
this.threadPoolName = threadPoolName;
this.nodeTimeout = nodeTimeout;
this.tenantId = tenantId;
}

/**
Expand Down Expand Up @@ -144,6 +148,14 @@ public TimeValue nodeTimeout() {
return nodeTimeout;
}

/**
* Returns the tenantId value for this node in the workflow.
* @return The node's tenantId value
*/
public String tenantId() {
return tenantId;

Check warning on line 156 in src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java#L156

Added line #L156 was not covered by tests
}

/**
* Execute this node in the sequence.
* Initializes the node's {@link CompletableFuture} and completes it when the process completes.
Expand Down Expand Up @@ -172,7 +184,8 @@ public PlainActionFuture<WorkflowData> execute() {
this.input,
inputMap,
this.previousNodeInputs,
this.params
this.params,
this.tenantId
);
// If completed exceptionally, this is a no-op
future.onResponse(stepFuture.actionGet(this.nodeTimeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {

String workflowId = currentNodeInputs.getWorkflowId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> registerModelGroupFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {

PlainActionFuture<WorkflowData> registerRemoteModelFuture = PlainActionFuture.newFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {

PlainActionFuture<WorkflowData> reIndexFuture = PlainActionFuture.newFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> undeployModelFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> updateIndexFuture = PlainActionFuture.newFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public PlainActionFuture<WorkflowData> execute(
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
) {
PlainActionFuture<WorkflowData> workflowDataFuture = PlainActionFuture.newFuture();
workflowDataFuture.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ public WorkflowProcessSorter(
* @param workflow A workflow with (unsorted) nodes and edges which define predecessors and successors
* @param workflowId The workflowId associated with the step
* @param params Parameters passed on the REST path
* @param tenantId The tenantId associated with the step
* @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list.
*/
public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId, Map<String, String> params) {
public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId, Map<String, String> params, String tenantId) {
if (workflow.nodes().size() > this.maxWorkflowSteps) {
throw new FlowFrameworkException(
"Workflow "
Expand Down Expand Up @@ -140,7 +141,8 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId,
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
nodeTimeout,
tenantId
);
idToNodeMap.put(processNode.id(), processNode);
nodes.add(processNode);
Expand Down Expand Up @@ -319,7 +321,8 @@ private ProcessNode createNewProcessNode(
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
nodeTimeout,
"fakeTenantId"
);
}

Expand Down Expand Up @@ -350,7 +353,8 @@ private ProcessNode createUpdateProcessNode(
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
nodeTimeout,
"fakeTenantId"
);
} else {
// Case 3 : Cannot update step (not supported)
Expand Down Expand Up @@ -392,7 +396,8 @@ private ProcessNode createWorkflowDataStepNode(
predecessorNodes,
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout
nodeTimeout,
"fakeTenantId"
);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ public interface WorkflowStep {
* @param outputs WorkflowData content of previous steps.
* @param previousNodeInputs Input params for this node that come from previous steps
* @param params Params passed on the REST path
* @param tenantId The tenantId
* @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing either the step's output data or {@link WorkflowData#EMPTY} which may be passed to follow-on steps.
*/
PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
Map<String, String> params,
String tenantId
);

/**
Expand Down
Loading

0 comments on commit dd7dace

Please sign in to comment.