diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index bbd7c11d5..40fea99e0 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -517,7 +517,7 @@ void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, ActionList private void validateWorkflows(Template template) throws Exception { for (Workflow workflow : template.workflows().values()) { - List sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap()); + List sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap(), "fakeTenantId"); workflowProcessSorter.validate(sortedNodes, pluginsService); } } diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index af0eec663..b840e1362 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -211,7 +211,8 @@ private void executeDeprovisionSequence( Collections.emptyList(), this.threadPool, DEPROVISION_WORKFLOW_THREAD_POOL, - flowFrameworkSettings.getRequestTimeout() + flowFrameworkSettings.getRequestTimeout(), + "fakeTenantId" ) ); } @@ -274,7 +275,8 @@ private void executeDeprovisionSequence( pn.predecessors(), this.threadPool, DEPROVISION_WORKFLOW_THREAD_POOL, - pn.nodeTimeout() + pn.nodeTimeout(), + "fakeTenantId" ); }).collect(Collectors.toList()); // Pause briefly before next loop diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 5e2b4d2f2..e0d405075 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -198,7 +198,8 @@ private void executeProvisionRequest( List provisionProcessSequence = workflowProcessSorter.sortProcessNodes( provisionWorkflow, workflowId, - request.getParams() + request.getParams(), + "fakeTenantId" ); workflowProcessSorter.validate(provisionProcessSequence, pluginsService); diff --git a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java index b9d437759..ff0dcef62 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java @@ -198,7 +198,8 @@ private void executeReprovisionRequest( List updatedProcessSequence = workflowProcessSorter.sortProcessNodes( provisionWorkflow, request.getWorkflowId(), - Collections.emptyMap() // TODO : Add suport to reprovision substitution templates + Collections.emptyMap(), // TODO : Add suport to reprovision substitution templates + "fakeTenantId" ); try { diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java index f16c87ef7..e779ddf4f 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java @@ -63,7 +63,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture createPipelineFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java index 4317c0087..79adb1e1c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java @@ -89,7 +89,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture registerLocalModelFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractUpdatePipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractUpdatePipelineStep.java index 5c3e3a52f..e03e048a9 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractUpdatePipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractUpdatePipelineStep.java @@ -70,7 +70,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture createPipelineFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java index cd9d2e2ac..08aac354a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java @@ -76,7 +76,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture createConnectorFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index 4d594b0c5..14cbb0736 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -67,7 +67,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture createIndexFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java index 41accccf3..25a9cdae1 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java @@ -55,7 +55,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deleteAgentFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java index bfa10be20..c2b5f8e9e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java @@ -53,7 +53,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deleteConnectorFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java index 261d061b4..e76918004 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java @@ -58,7 +58,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deleteIndexFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java index 674a18f70..6f0cd85d5 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java @@ -58,7 +58,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deletePipelineFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java index 15c5b8c0e..918e99081 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java @@ -55,7 +55,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deleteModelFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java index b35cdf525..d4e15f92e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java @@ -58,7 +58,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deleteSearchPipelineFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java index 923650c20..46ee3a24b 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java @@ -65,7 +65,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture deployModelFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java b/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java index cf03de4bb..658b09a02 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java @@ -38,7 +38,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture future = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java b/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java index 454bdb2d1..f3c7ae19d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java @@ -36,6 +36,7 @@ public class ProcessNode { private final ThreadPool threadPool; private final String threadPoolName; private final TimeValue nodeTimeout; + private final String tenantId; private final PlainActionFuture future = PlainActionFuture.newFuture(); @@ -51,6 +52,7 @@ public class ProcessNode { * @param threadPool The OpenSearch thread pool * @param threadPoolName The thread pool to use * @param nodeTimeout The timeout value for executing on this node + * @param tenantId The tenantId */ public ProcessNode( String id, @@ -61,7 +63,8 @@ public ProcessNode( List predecessors, ThreadPool threadPool, String threadPoolName, - TimeValue nodeTimeout + TimeValue nodeTimeout, + String tenantId ) { this.id = id; this.workflowStep = workflowStep; @@ -72,6 +75,7 @@ public ProcessNode( this.threadPool = threadPool; this.threadPoolName = threadPoolName; this.nodeTimeout = nodeTimeout; + this.tenantId = tenantId; } /** @@ -144,6 +148,14 @@ public TimeValue nodeTimeout() { return nodeTimeout; } + /** + * Returns the tenantId value for this node in the workflow. + * @return The node's tenantId value + */ + public String tenantId() { + return tenantId; + } + /** * Execute this node in the sequence. * Initializes the node's {@link CompletableFuture} and completes it when the process completes. @@ -172,7 +184,8 @@ public PlainActionFuture execute() { this.input, inputMap, this.previousNodeInputs, - this.params + this.params, + this.tenantId ); // If completed exceptionally, this is a no-op future.onResponse(stepFuture.actionGet(this.nodeTimeout)); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java index 5eed7a864..45cc08bdc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java @@ -84,7 +84,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { String workflowId = currentNodeInputs.getWorkflowId(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java index 6cc990429..90606c81b 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java @@ -70,7 +70,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture registerModelGroupFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index 72b56ab79..b1d39b830 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -74,7 +74,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture registerRemoteModelFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java index a733f63e3..f7b5c0eef 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -68,7 +68,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture reIndexFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java index 9d13c6953..dd865076c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ToolStep.java @@ -63,7 +63,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { try { Map inputs = ParseUtils.getInputsFromPreviousSteps( diff --git a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java index 433315f44..ab2e0a36f 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UndeployModelStep.java @@ -58,7 +58,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture undeployModelFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java index 727eb31a6..1965b6149 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/UpdateIndexStep.java @@ -64,7 +64,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture updateIndexFuture = PlainActionFuture.newFuture(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowDataStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowDataStep.java index 2d8ed0dcb..bde26ed3d 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowDataStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowDataStep.java @@ -40,7 +40,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture workflowDataFuture = PlainActionFuture.newFuture(); workflowDataFuture.onResponse( diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 88a5d67e5..d0abb8c6e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -90,9 +90,10 @@ public WorkflowProcessSorter( * @param workflow A workflow with (unsorted) nodes and edges which define predecessors and successors * @param workflowId The workflowId associated with the step * @param params Parameters passed on the REST path + * @param tenantId The tenantId associated with the step * @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list. */ - public List sortProcessNodes(Workflow workflow, String workflowId, Map params) { + public List sortProcessNodes(Workflow workflow, String workflowId, Map params, String tenantId) { if (workflow.nodes().size() > this.maxWorkflowSteps) { throw new FlowFrameworkException( "Workflow " @@ -140,7 +141,8 @@ public List sortProcessNodes(Workflow workflow, String workflowId, predecessorNodes, threadPool, PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout + nodeTimeout, + tenantId ); idToNodeMap.put(processNode.id(), processNode); nodes.add(processNode); @@ -319,7 +321,8 @@ private ProcessNode createNewProcessNode( predecessorNodes, threadPool, PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout + nodeTimeout, + "fakeTenantId" ); } @@ -350,7 +353,8 @@ private ProcessNode createUpdateProcessNode( predecessorNodes, threadPool, PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout + nodeTimeout, + "fakeTenantId" ); } else { // Case 3 : Cannot update step (not supported) @@ -392,7 +396,8 @@ private ProcessNode createWorkflowDataStepNode( predecessorNodes, threadPool, PROVISION_WORKFLOW_THREAD_POOL, - nodeTimeout + nodeTimeout, + "fakeTenantId" ); } else { return null; diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index 0456e5bbf..7e759ef54 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -24,6 +24,7 @@ public interface WorkflowStep { * @param outputs WorkflowData content of previous steps. * @param previousNodeInputs Input params for this node that come from previous steps * @param params Params passed on the REST path + * @param tenantId The tenantId * @return A CompletableFuture of the building block. This block should return immediately, but not be completed until the step executes, containing either the step's output data or {@link WorkflowData#EMPTY} which may be passed to follow-on steps. */ PlainActionFuture execute( @@ -31,7 +32,8 @@ PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ); /** diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 4841871aa..8972bf3c5 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -60,6 +60,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -165,7 +166,8 @@ public void testDeprovisionWorkflow() throws Exception { PlainActionFuture future = PlainActionFuture.newFuture(); future.onResponse(WorkflowData.EMPTY); - when(this.deleteConnectorStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + when(this.deleteConnectorStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap(), nullable(String.class))) + .thenReturn(future); CountDownLatch latch = new CountDownLatch(1); LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); @@ -197,7 +199,8 @@ public void testFailToDeprovision() throws Exception { PlainActionFuture future = PlainActionFuture.newFuture(); future.onFailure(new RuntimeException("rte")); - when(this.undeployModelStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + when(this.undeployModelStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap(), nullable(String.class))) + .thenReturn(future); CountDownLatch latch = new CountDownLatch(1); LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); @@ -274,7 +277,8 @@ public void testAllowDeleteRequired() throws Exception { PlainActionFuture future = PlainActionFuture.newFuture(); future.onResponse(WorkflowData.EMPTY); - when(this.deleteIndexStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + when(this.deleteIndexStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap(), nullable(String.class))) + .thenReturn(future); latch = new CountDownLatch(1); latchedActionListener = new LatchedActionListener<>(listener, latch); @@ -312,7 +316,8 @@ public void testFailToDeprovisionAndAllowDeleteRequired() throws Exception { PlainActionFuture future = PlainActionFuture.newFuture(); future.onFailure(new RuntimeException("rte")); - when(this.undeployModelStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + when(this.undeployModelStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap(), nullable(String.class))) + .thenReturn(future); CountDownLatch latch = new CountDownLatch(1); LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); diff --git a/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java index e13b1f0de..4da9efeb0 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java @@ -123,7 +123,7 @@ public void testReprovisionWorkflow() throws Exception { // Stub validations when(mockTemplate.workflows()).thenReturn(mockWorkflows); - when(workflowProcessSorter.sortProcessNodes(any(), any(), any())).thenReturn(List.of()); + when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(List.of()); doNothing().when(workflowProcessSorter).validate(any(), any()); when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(mockTemplate); @@ -171,7 +171,7 @@ public void testReprovisionProvisioningWorkflow() throws Exception { // Stub validations when(mockTemplate.workflows()).thenReturn(mockWorkflows); - when(workflowProcessSorter.sortProcessNodes(any(), any(), any())).thenReturn(List.of()); + when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(List.of()); doNothing().when(workflowProcessSorter).validate(any(), any()); when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(mockTemplate); @@ -211,7 +211,7 @@ public void testReprovisionNotStartedWorkflow() throws Exception { // Stub validations when(mockTemplate.workflows()).thenReturn(mockWorkflows); - when(workflowProcessSorter.sortProcessNodes(any(), any(), any())).thenReturn(List.of()); + when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(List.of()); doNothing().when(workflowProcessSorter).validate(any(), any()); when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(mockTemplate); @@ -251,7 +251,7 @@ public void testFailedStateUpdate() throws Exception { // Stub validations when(mockTemplate.workflows()).thenReturn(mockWorkflows); - when(workflowProcessSorter.sortProcessNodes(any(), any(), any())).thenReturn(List.of()); + when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(List.of()); doNothing().when(workflowProcessSorter).validate(any(), any()); when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(mockTemplate); @@ -299,7 +299,7 @@ public void testFailedWorkflowStateRetrieval() throws Exception { // Stub validations when(mockTemplate.workflows()).thenReturn(mockWorkflows); - when(workflowProcessSorter.sortProcessNodes(any(), any(), any())).thenReturn(List.of()); + when(workflowProcessSorter.sortProcessNodes(any(), any(), any(), any())).thenReturn(List.of()); doNothing().when(workflowProcessSorter).validate(any(), any()); when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(mockTemplate); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java index 3ed8d15ec..28adfaa73 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateConnectorStepTests.java @@ -104,7 +104,8 @@ public void testCreateConnector() throws IOException, ExecutionException, Interr inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), any()); @@ -127,7 +128,8 @@ public void testCreateConnectorFailure() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).createConnector(any(MLCreateConnectorInput.class), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java index b9a9dfe88..08cba8267 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java @@ -104,7 +104,8 @@ public void testCreateIndexStep() throws ExecutionException, InterruptedExceptio inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); @@ -125,7 +126,8 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); @@ -146,7 +148,8 @@ public void testCreateIndexStepUnsafeFailure() throws ExecutionException, Interr inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java index efd9275b5..c1c34fba6 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java @@ -86,7 +86,8 @@ public void testCreateIngestPipelineStep() throws InterruptedException, Executio inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -110,7 +111,8 @@ public void testCreateIngestPipelineStepFailure() throws InterruptedException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -146,7 +148,8 @@ public void testMissingData() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java index ac1fda8d8..de030093a 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java @@ -86,7 +86,8 @@ public void testCreateSearchPipelineStep() throws InterruptedException, Executio inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -110,7 +111,8 @@ public void testCreateSearchPipelineStepFailure() throws InterruptedException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -146,7 +148,8 @@ public void testMissingData() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteAgentStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteAgentStepTests.java index 240dbe966..c2dbd04cf 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteAgentStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteAgentStepTests.java @@ -66,7 +66,8 @@ public void testDeleteAgent() throws IOException, ExecutionException, Interrupte inputData, Map.of("step_1", new WorkflowData(Map.of(AGENT_ID, agentId), "workflowId", "nodeId")), Map.of("step_1", AGENT_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteAgent(any(String.class), any()); @@ -90,7 +91,8 @@ public void testDeleteAgentNotFound() throws IOException, ExecutionException, In inputData, Map.of("step_1", new WorkflowData(Map.of(AGENT_ID, agentId), "workflowId", "nodeId")), Map.of("step_1", AGENT_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteAgent(any(String.class), any()); @@ -106,7 +108,8 @@ public void testNoAgentIdInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -129,7 +132,8 @@ public void testDeleteAgentFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(AGENT_ID, "test"), "workflowId", "nodeId")), Map.of("step_1", AGENT_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteAgent(any(String.class), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java index 9aadd060a..a5795fccc 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteConnectorStepTests.java @@ -66,7 +66,8 @@ public void testDeleteConnector() throws IOException, ExecutionException, Interr inputData, Map.of("step_1", new WorkflowData(Map.of(CONNECTOR_ID, connectorId), "workflowId", "nodeId")), Map.of("step_1", CONNECTOR_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteConnector(anyString(), anyActionListener()); @@ -82,7 +83,8 @@ public void testNoConnectorIdInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -105,7 +107,8 @@ public void testDeleteConnectorFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(CONNECTOR_ID, "test"), "workflowId", "nodeId")), Map.of("step_1", CONNECTOR_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteConnector(anyString(), anyActionListener()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java index 9ef574427..65c16efa0 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java @@ -72,7 +72,8 @@ public void testDeleteIndex() throws IOException, ExecutionException, Interrupte inputData, Map.of("step_1", new WorkflowData(Map.of(INDEX_NAME, indexName), "workflowId", "nodeId")), Map.of("step_1", INDEX_NAME), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(indicesAdminClient).delete(any(DeleteIndexRequest.class), any()); @@ -88,7 +89,8 @@ public void testNoIndexNameInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -111,7 +113,8 @@ public void testDeleteIndexFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(INDEX_NAME, "test"), "workflowId", "nodeId")), Map.of("step_1", INDEX_NAME), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(indicesAdminClient).delete(any(DeleteIndexRequest.class), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java index 9b1c070bd..b09402266 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java @@ -72,7 +72,8 @@ public void testDeletePipeline() throws IOException, ExecutionException, Interru inputData, Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, pipelineId), "workflowId", "nodeId")), Map.of("step_1", PIPELINE_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(clusterAdminClient).deletePipeline(any(DeletePipelineRequest.class), any()); @@ -88,7 +89,8 @@ public void testNoPipelineIdInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -111,7 +113,8 @@ public void testDeletePipelineFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, "test"), "workflowId", "nodeId")), Map.of("step_1", PIPELINE_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(clusterAdminClient).deletePipeline(any(DeletePipelineRequest.class), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteModelStepTests.java index a21793fb1..66349f8d8 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteModelStepTests.java @@ -66,7 +66,8 @@ public void testDeleteModel() throws IOException, ExecutionException, Interrupte inputData, Map.of("step_1", new WorkflowData(Map.of(MODEL_ID, modelId), "workflowId", "nodeId")), Map.of("step_1", MODEL_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteModel(any(String.class), any()); @@ -90,7 +91,8 @@ public void testDeleteModelNotFound() throws IOException, ExecutionException, In inputData, Map.of("step_1", new WorkflowData(Map.of(MODEL_ID, modelId), "workflowId", "nodeId")), Map.of("step_1", MODEL_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteModel(any(String.class), any()); @@ -106,7 +108,8 @@ public void testNoModelIdInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -129,7 +132,8 @@ public void testDeleteModelFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(MODEL_ID, "test"), "workflowId", "nodeId")), Map.of("step_1", MODEL_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deleteModel(any(String.class), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java index 4ebc7fd5b..f369558ed 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java @@ -72,7 +72,8 @@ public void testDeleteSearchPipeline() throws IOException, ExecutionException, I inputData, Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, pipelineId), "workflowId", "nodeId")), Map.of("step_1", PIPELINE_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(clusterAdminClient).deleteSearchPipeline(any(DeleteSearchPipelineRequest.class), any()); @@ -88,7 +89,8 @@ public void testNoPipelineIdInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -111,7 +113,8 @@ public void testDeleteSearchPipelineFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, "test"), "workflowId", "nodeId")), Map.of("step_1", PIPELINE_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(clusterAdminClient).deleteSearchPipeline(any(DeleteSearchPipelineRequest.class), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index 9b945a05c..f4c297187 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -143,7 +143,8 @@ public void testDeployModel() throws ExecutionException, InterruptedException, I inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -170,7 +171,8 @@ public void testDeployModelFailure() { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).deploy(eq("modelId"), actionListenerCaptor.capture()); @@ -210,7 +212,8 @@ public void testDeployModelTaskFailure() throws IOException, InterruptedExceptio inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java index b53574cfe..56faaebbd 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java @@ -31,7 +31,8 @@ public void testNoOpStep() throws IOException { WorkflowData.EMPTY, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); } @@ -46,7 +47,8 @@ public void testNoOpStepDelay() throws IOException, InterruptedException { delayData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); // Sleep isn't exactly accurate so leave 100ms of roundoff @@ -68,7 +70,8 @@ public void testNoOpStepInterrupt() throws IOException, InterruptedException { delayData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); try { future.actionGet(); @@ -102,7 +105,7 @@ public void testNoOpStepParse() throws IOException { Exception ex = assertThrows( WorkflowStepException.class, - () -> noopStep.execute("nodeId", delayData, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + () -> noopStep.execute("nodeId", delayData, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), null) ); assertEquals("failed to parse setting [delay] with value [foo] as a time value: unit is missing or unrecognized", ex.getMessage()); } diff --git a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java index 9e205cb89..492a4e109 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ProcessNodeTests.java @@ -75,7 +75,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture f = PlainActionFuture.newFuture(); f.onResponse(new WorkflowData(Map.of("test", "output"), "test-id", "test-node-id")); @@ -93,7 +94,8 @@ public String getName() { List.of(successfulNode), testThreadPool, PROVISION_WORKFLOW_THREAD_POOL, - TimeValue.timeValueMillis(50) + TimeValue.timeValueMillis(50), + null ); assertEquals("A", nodeA.id()); assertEquals("test", nodeA.workflowStep().getName()); @@ -121,7 +123,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture future = PlainActionFuture.newFuture(); testThreadPool.schedule( @@ -143,7 +146,8 @@ public String getName() { Collections.emptyList(), testThreadPool, PROVISION_WORKFLOW_THREAD_POOL, - TimeValue.timeValueMillis(500) + TimeValue.timeValueMillis(500), + null ); assertEquals("B", nodeB.id()); assertEquals("test", nodeB.workflowStep().getName()); @@ -165,7 +169,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture future = PlainActionFuture.newFuture(); testThreadPool.schedule( @@ -187,7 +192,8 @@ public String getName() { Collections.emptyList(), testThreadPool, PROVISION_WORKFLOW_THREAD_POOL, - TimeValue.timeValueMillis(100) + TimeValue.timeValueMillis(100), + null ); assertEquals("Zzz", nodeZ.id()); assertEquals("sleepy", nodeZ.workflowStep().getName()); @@ -210,7 +216,8 @@ public PlainActionFuture execute( WorkflowData currentNodeInputs, Map outputs, Map previousNodeInputs, - Map params + Map params, + String tenantId ) { PlainActionFuture f = PlainActionFuture.newFuture(); f.onResponse(WorkflowData.EMPTY); @@ -228,7 +235,8 @@ public String getName() { List.of(successfulNode, failedNode), testThreadPool, PROVISION_WORKFLOW_THREAD_POOL, - TimeValue.timeValueSeconds(15) + TimeValue.timeValueSeconds(15), + null ); assertEquals("E", nodeE.id()); assertEquals("test", nodeE.workflowStep().getName()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java index 8def95f58..41967ba0b 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java @@ -117,7 +117,8 @@ public void testRegisterAgent() throws IOException, ExecutionException, Interrup inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).registerAgent(any(MLAgent.class), actionListenerCaptor.capture()); @@ -150,7 +151,8 @@ public void testRegisterAgentFailure() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).registerAgent(any(MLAgent.class), actionListenerCaptor.capture()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java index d42a1ae21..073b738e4 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalCustomModelStepTests.java @@ -164,7 +164,8 @@ public void testRegisterLocalCustomModelSuccess() throws Exception { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -199,7 +200,8 @@ public void testRegisterLocalCustomModelSuccess() throws Exception { boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -270,7 +272,8 @@ public void testRegisterLocalCustomModelDeployStateUpdateFail() throws Exception boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -291,7 +294,8 @@ public void testRegisterLocalCustomModelFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -332,7 +336,8 @@ public void testRegisterLocalCustomModelTaskFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -346,7 +351,8 @@ public void testMissingInputs() { new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"), Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); @@ -392,7 +398,8 @@ public void testBoolParseFail() throws IOException, ExecutionException, Interrup boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java index a0ef430b9..fce42dc31 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalPretrainedModelStepTests.java @@ -157,7 +157,8 @@ public void testRegisterLocalPretrainedModelSuccess() throws Exception { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -186,7 +187,8 @@ public void testRegisterLocalPretrainedModelSuccess() throws Exception { boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -211,7 +213,8 @@ public void testRegisterLocalPretrainedModelFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -252,7 +255,8 @@ public void testRegisterLocalPretrainedModelTaskFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -266,7 +270,8 @@ public void testMissingInputs() { new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"), Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); @@ -297,7 +302,8 @@ public void testBoolParseFail() throws IOException, ExecutionException, Interrup boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java index e3157b20b..78f28841e 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterLocalSparseEncodingModelStepTests.java @@ -160,7 +160,8 @@ public void testRegisterLocalSparseEncodingModelSuccess() throws Exception { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -191,7 +192,8 @@ public void testRegisterLocalSparseEncodingModelSuccess() throws Exception { boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); future.actionGet(); @@ -216,7 +218,8 @@ public void testRegisterLocalSparseEncodingModelFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -257,7 +260,8 @@ public void testRegisterLocalSparseEncodingModelTaskFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -271,7 +275,8 @@ public void testMissingInputs() { new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"), Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); @@ -304,7 +309,8 @@ public void testBoolParseFail() throws IOException, ExecutionException, Interrup boolStringWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterModelGroupStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterModelGroupStepTests.java index 05eeb8500..ad96e0e48 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterModelGroupStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterModelGroupStepTests.java @@ -121,7 +121,8 @@ public void testRegisterModelGroup() throws ExecutionException, InterruptedExcep inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).registerModelGroup(any(MLRegisterModelGroupInput.class), actionListenerCaptor.capture()); @@ -135,7 +136,8 @@ public void testRegisterModelGroup() throws ExecutionException, InterruptedExcep boolStringInputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -160,7 +162,8 @@ public void testRegisterModelGroupFailure() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).registerModelGroup(any(MLRegisterModelGroupInput.class), actionListenerCaptor.capture()); @@ -180,7 +183,8 @@ public void testRegisterModelGroupWithNoName() throws IOException { inputDataWithNoName, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -197,7 +201,8 @@ public void testBoolParseFail() throws IOException, ExecutionException, Interrup badBoolInputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java index 362601264..14a01225d 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStepTests.java @@ -105,7 +105,8 @@ public void testRegisterRemoteModelSuccess() throws Exception { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(mlNodeClient, times(1)).register(any(MLRegisterModelInput.class), any()); @@ -158,7 +159,8 @@ public void testRegisterAndDeployRemoteModelSuccess() throws Exception { deployWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(mlNodeClient, times(1)).register(any(MLRegisterModelInput.class), any()); @@ -190,7 +192,8 @@ public void testRegisterAndDeployRemoteModelSuccess() throws Exception { deployWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(mlNodeClient, times(2)).register(any(MLRegisterModelInput.class), any()); @@ -220,7 +223,8 @@ public void testRegisterRemoteModelFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -263,7 +267,8 @@ public void testRegisterRemoteModelUpdateFailure() { deployWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -313,7 +318,8 @@ public void testRegisterRemoteModelDeployUpdateFailure() { deployWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -346,7 +352,8 @@ public void testReisterRemoteModelInterfaceFailure() { incorrectWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -366,7 +373,8 @@ public void testRegisterRemoteModelUnSafeFailure() { workflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = expectThrows(ExecutionException.class, () -> future.get().getClass()); @@ -381,7 +389,8 @@ public void testMissingInputs() { new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"), Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); @@ -410,7 +419,8 @@ public void testBoolParseFail() throws IOException, ExecutionException, Interrup deployWorkflowData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java index a195025ea..234881ea2 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java @@ -95,7 +95,8 @@ public void testReindexStep() throws ExecutionException, InterruptedException, I inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(client, times(1)).execute(any(), any(ReindexRequest.class), actionListenerCaptor.capture()); @@ -124,7 +125,8 @@ public void testReindexStepFailure() throws ExecutionException, InterruptedExcep inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); verify(client, times(1)).execute(any(), any(ReindexRequest.class), actionListenerCaptor.capture()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java index 2b5e5b7fa..e7d91bebc 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java @@ -88,7 +88,8 @@ public void testTool() throws ExecutionException, InterruptedException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); assertEquals(MLToolSpec.class, future.get().getContent().get("tools").getClass()); @@ -99,7 +100,8 @@ public void testTool() throws ExecutionException, InterruptedException { boolStringInputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); assertEquals(MLToolSpec.class, future.get().getContent().get("tools").getClass()); @@ -114,7 +116,8 @@ public void testBoolParseFail() { badBoolInputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -133,7 +136,8 @@ public void testToolWithConnectorId() throws ExecutionException, InterruptedExce inputData, Map.of(createConnectorNodeId, inputDataWithConnectorId), Map.of(createConnectorNodeId, CONNECTOR_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); Object tools = future.get().getContent().get("tools"); @@ -150,7 +154,8 @@ public void testToolWithModelId() throws ExecutionException, InterruptedExceptio inputData, Map.of(createModelNodeId, inputDataWithModelId), Map.of(createModelNodeId, MODEL_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); Object tools = future.get().getContent().get("tools"); @@ -167,7 +172,8 @@ public void testToolWithAgentId() throws ExecutionException, InterruptedExceptio inputData, Map.of(createAgentNodeId, inputDataWithAgentId), Map.of(createAgentNodeId, AGENT_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); Object tools = future.get().getContent().get("tools"); diff --git a/src/test/java/org/opensearch/flowframework/workflow/UndeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UndeployModelStepTests.java index ca30929e4..203a9bbb7 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UndeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UndeployModelStepTests.java @@ -73,7 +73,8 @@ public void testUndeployModel() throws IOException, ExecutionException, Interrup inputData, Map.of("step_1", new WorkflowData(Map.of(MODEL_ID, modelId), "workflowId", "nodeId")), Map.of("step_1", MODEL_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).undeploy(any(String[].class), any(), any()); @@ -89,7 +90,8 @@ public void testNoModelIdInOutput() throws IOException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -121,7 +123,8 @@ public void testUndeployModelFailure() throws IOException { inputData, Map.of("step_1", new WorkflowData(Map.of(MODEL_ID, "test"), "workflowId", "nodeId")), Map.of("step_1", MODEL_ID), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(machineLearningNodeClient).undeploy(any(String[].class), any(), any()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java index e4ea939ea..45c7b4fa4 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UpdateIndexStepTests.java @@ -102,7 +102,8 @@ public void testUpdateIndexStepWithUpdatedSettings() throws ExecutionException, data, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(indicesAdminClient, times(1)).getSettings(any(GetSettingsRequest.class), any()); @@ -162,7 +163,8 @@ public void testFailedToUpdateIndexSettings() throws ExecutionException, Interru data, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -188,7 +190,8 @@ public void testMissingSettings() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -238,7 +241,8 @@ public void testUpdateMixedSettings() throws InterruptedException { data, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); verify(indicesAdminClient, times(1)).getSettings(any(GetSettingsRequest.class), any()); @@ -265,7 +269,8 @@ public void testEmptyConfiguration() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -289,7 +294,8 @@ public void testMissingInputs() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); @@ -341,7 +347,8 @@ public void testNoSettingsChanged() throws InterruptedException { data, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/UpdateIngestPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UpdateIngestPipelineStepTests.java index 75dc5b584..daa72b3fe 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UpdateIngestPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UpdateIngestPipelineStepTests.java @@ -75,7 +75,8 @@ public void testUpdateIngestPipelineStep() throws InterruptedException, Executio inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -99,7 +100,8 @@ public void testUpdateIngestPipelineStepFailure() throws InterruptedException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -135,7 +137,8 @@ public void testMissingData() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/UpdateSearchPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/UpdateSearchPipelineStepTests.java index 214b47547..0a1f91b5f 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/UpdateSearchPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/UpdateSearchPipelineStepTests.java @@ -75,7 +75,8 @@ public void testUpdateSearchPipelineStep() throws InterruptedException, Executio inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -100,7 +101,8 @@ public void testUpdateSearchPipelineStepFailure() throws InterruptedException { inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertFalse(future.isDone()); @@ -135,7 +137,8 @@ public void testMissingData() throws InterruptedException { incorrectData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataStepTests.java index f6e751b6f..1f37549fe 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataStepTests.java @@ -47,7 +47,8 @@ public void testExecuteWorkflowDataStep() throws ExecutionException, Interrupted inputData, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + null ); assertTrue(future.isDone()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 5d4624b7d..0c17bdcdb 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -81,7 +81,7 @@ private static Workflow parseToWorkflow(String json) throws IOException { // Wrap parser into node list private static List parseToNodes(String json) throws IOException { - return workflowProcessSorter.sortProcessNodes(parseToWorkflow(json), "123", Collections.emptyMap()); + return workflowProcessSorter.sortProcessNodes(parseToWorkflow(json), "123", Collections.emptyMap(), null); } // Wrap parser into string list @@ -437,7 +437,7 @@ public void testSuccessfulGraphValidation() throws Exception { List.of(edge1, edge2) ); - List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap()); + List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap(), null); workflowProcessSorter.validateGraph(sortedProcessNodes); } @@ -459,7 +459,7 @@ public void testFailedGraphValidation() throws IOException { WorkflowEdge edge = new WorkflowEdge(registerModel.id(), deployModel.id()); Workflow workflow = new Workflow(Collections.emptyMap(), List.of(registerModel, deployModel), List.of(edge)); - List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap()); + List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap(), null); FlowFrameworkException ex = expectThrows( FlowFrameworkException.class, () -> workflowProcessSorter.validateGraph(sortedProcessNodes) @@ -481,7 +481,7 @@ public void testFailedDenyListValidation() throws IOException { FlowFrameworkException ex = expectThrows( FlowFrameworkException.class, - () -> workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap()) + () -> workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap(), null) ); assertEquals("The step type [delete_index] for node [workflow_step_1] can not be used in a workflow.", ex.getMessage()); assertEquals(RestStatus.FORBIDDEN, ex.getRestStatus()); @@ -524,7 +524,7 @@ public void testSuccessfulInstalledPluginValidation() throws Exception { List.of(createConnector, registerModel, deployModel), List.of(edge1, edge2) ); - List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap()); + List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap(), null); workflowProcessSorter.validatePluginsInstalled(sortedProcessNodes, List.of("opensearch-flow-framework", "opensearch-ml")); } @@ -566,7 +566,7 @@ public void testFailedInstalledPluginValidation() throws Exception { List.of(createConnector, registerModel, deployModel), List.of(edge1, edge2) ); - List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap()); + List sortedProcessNodes = workflowProcessSorter.sortProcessNodes(workflow, "123", Collections.emptyMap(), null); FlowFrameworkException exception = expectThrows( FlowFrameworkException.class,