Skip to content

Commit

Permalink
Merge branch 'main' into version-catalog-1
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis authored Jan 16, 2025
2 parents 974f5f6 + 33579a3 commit 09bf146
Show file tree
Hide file tree
Showing 20 changed files with 1,029 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.18...2.x)
### Features
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))

### Enhancements
### Bug Fixes
- Remove useCase and defaultParams field in WorkflowRequest ([#758](https://github.com/opensearch-project/flow-framework/pull/758))
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=f397b287023acdba1e9f6fc5ea72d22dd63669d59ed4a289a29b1a76eee151c6
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
distributionSha256Sum=7a00d51fb93147819aab76024feece20b6b84e420694101f276be952e08bef03
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
3 changes: 1 addition & 2 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ done
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
' "$PWD" ) || exit
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private CommonValue() {}
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The param name for specifying the timeout duration in seconds to wait for workflow completion */
public static final String WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down Expand Up @@ -186,6 +188,8 @@ private CommonValue() {}
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";
/** Provision Timeout field */
public static final String PROVISION_TIMEOUT_FIELD = "provision.timeout";
/*
* Constants associated with resource provisioning / state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand Down Expand Up @@ -43,6 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -88,6 +90,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -145,6 +148,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
return processError(ffe, params, request);
}
// Ensure wait_for_completion is not set unless reprovision or provision is true
if (waitForCompletionTimeout != TimeValue.MINUS_ONE && !(reprovision || provision)) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Request parameters 'wait_for_completion_timeout' are not allowed unless the 'provision' or 'reprovision' parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}

try {
Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
Expand Down Expand Up @@ -219,7 +231,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

if (waitForCompletionTimeout != TimeValue.MINUS_ONE) {
params = Map.of(WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout.toString());
}
WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -33,6 +34,7 @@
import java.util.stream.Collectors;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -73,6 +75,7 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE);
try {
Map<String, String> params = parseParamsAndContent(request);
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
Expand All @@ -86,7 +89,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params, waitForCompletionTimeout);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
Expand Down Expand Up @@ -214,6 +215,16 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
}
}
String workflowId = request.getWorkflowId();
TimeValue waitForTimeCompletion;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
} else {
// default to minus one indicate async execution
waitForTimeCompletion = TimeValue.MINUS_ONE;
}
if (workflowId == null) {
// This is a new workflow (POST)
// Throttle incoming requests
Expand Down Expand Up @@ -251,7 +262,8 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
WorkflowRequest workflowRequest = new WorkflowRequest(
globalContextResponse.getId(),
null,
request.getParams()
request.getParams(),
waitForTimeCompletion
);
logger.info(
"Provisioning parameter is set, continuing to provision workflow {}",
Expand All @@ -261,7 +273,14 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
ProvisionWorkflowAction.INSTANCE,
workflowRequest,
ActionListener.wrap(provisionResponse -> {
listener.onResponse(new WorkflowResponse(provisionResponse.getWorkflowId()));
listener.onResponse(
(workflowRequest.getWaitForCompletionTimeout() == TimeValue.MINUS_ONE)
? new WorkflowResponse(provisionResponse.getWorkflowId())
: new WorkflowResponse(
provisionResponse.getWorkflowId(),
provisionResponse.getWorkflowState()
)
);
}, exception -> {
String errorMessage = "Provisioning failed.";
logger.error(errorMessage, exception);
Expand Down Expand Up @@ -346,19 +365,26 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
.build();

if (request.isReprovision()) {

// Reprovision request
ReprovisionWorkflowRequest reprovisionRequest = new ReprovisionWorkflowRequest(
getResponse.getId(),
existingTemplate,
template
template,
waitForTimeCompletion
);
logger.info("Reprovisioning parameter is set, continuing to reprovision workflow {}", getResponse.getId());
client.execute(
ReprovisionWorkflowAction.INSTANCE,
reprovisionRequest,
ActionListener.wrap(reprovisionResponse -> {
listener.onResponse(new WorkflowResponse(reprovisionResponse.getWorkflowId()));
listener.onResponse(
reprovisionRequest.getWaitForCompletionTimeout() == TimeValue.MINUS_ONE
? new WorkflowResponse(reprovisionResponse.getWorkflowId())
: new WorkflowResponse(
reprovisionResponse.getWorkflowId(),
reprovisionResponse.getWorkflowState()
)
);
}, exception -> {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Reprovisioning failed for workflow {}",
Expand Down
Loading

0 comments on commit 09bf146

Please sign in to comment.