Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StartWorkflow with VersioningOverride #6891

Merged
merged 11 commits into from
Nov 27, 2024
Merged
4,784 changes: 2,401 additions & 2,383 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func CreateHistoryStartWorkflowRequest(
ContinuedFailure: startRequest.ContinuedFailure,
LastCompletionResult: startRequest.LastCompletionResult,
RootExecutionInfo: rootExecutionInfo,
VersioningOverride: startRequest.GetVersioningOverride(),
}
startRequest.ContinuedFailure = nil
startRequest.LastCompletionResult = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ message StartWorkflowExecutionRequest {
temporal.server.api.workflow.v1.RootExecutionInfo root_execution_info = 11;
// inherited build ID from parent/previous execution
string inherited_build_id = 12;
// If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
temporal.api.workflow.v1.VersioningOverride versioning_override = 13;
}

message StartWorkflowExecutionResponse {
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (wh *WorkflowHandler) DeprecateNamespace(ctx context.Context, request *work
return resp, err
}

// StartWorkflowExecution starts a new long running workflow instance. It will create the instance with
// StartWorkflowExecution starts a new workflow instance (a "workflow execution"). It will create the instance with
// 'WorkflowExecutionStarted' event in history and also schedule the first WorkflowTask for the worker to make the
// first workflow task for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already
// exists with same workflowId.
Expand Down Expand Up @@ -5331,7 +5331,7 @@ func (wh *WorkflowHandler) UpdateWorkflowExecutionOptions(
}
if opts.GetVersioningOverride().GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED &&
opts.GetVersioningOverride().GetDeployment() == nil {
return nil, serviceerror.NewInvalidArgument("Deployment must be set if behavior override is PINNED")
return nil, serviceerror.NewInvalidArgument("Deployment override must be set if behavior override is PINNED")
}

namespaceId, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
Expand Down
5 changes: 5 additions & 0 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
Expand Down Expand Up @@ -323,6 +324,10 @@ func ValidateStartWorkflowExecutionRequest(
if len(request.WorkflowType.GetName()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgument("WorkflowType exceeds length limit.")
}
if request.GetVersioningOverride().GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED &&
request.GetVersioningOverride().GetDeployment() == nil {
return serviceerror.NewInvalidArgument("Deployment override must be set if behavior override is PINNED")
}
if err := retrypolicy.Validate(request.RetryPolicy); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions service/history/api/signalwithstartworkflow/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func ConvertToStartRequest(
WorkflowStartDelay: request.GetWorkflowStartDelay(),
UserMetadata: request.UserMetadata,
Links: request.GetLinks(),
VersioningOverride: request.GetVersioningOverride(),
}

return common.CreateHistoryStartWorkflowRequest(namespaceID.String(), req, nil, nil, now)
Expand Down
1 change: 1 addition & 0 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package updateworkflowoptions
import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/fieldmaskpb"

"go.temporal.io/api/serviceerror"
Expand Down
1 change: 1 addition & 0 deletions service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (b *EventFactory) CreateWorkflowExecutionStartedEvent(
CompletionCallbacks: req.CompletionCallbacks,
RootWorkflowExecution: request.RootExecutionInfo.GetExecution(),
InheritedBuildId: request.InheritedBuildId,
VersioningOverride: request.VersioningOverride,
}

parentInfo := request.ParentExecutionInfo
Expand Down
6 changes: 5 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2343,12 +2343,16 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionStartedEvent(
ms.executionInfo.SearchAttributes = event.SearchAttributes.GetIndexedFields()
}

if event.GetVersioningOverride() != nil {
ms.executionInfo.VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{VersioningOverride: event.GetVersioningOverride()}
}
if inheritedBuildId := event.InheritedBuildId; inheritedBuildId != "" {
ms.executionInfo.InheritedBuildId = inheritedBuildId
if err := ms.UpdateBuildIdAssignment(inheritedBuildId); err != nil {
return err
}
} else if event.SourceVersionStamp.GetUseVersioning() && event.SourceVersionStamp.GetBuildId() != "" {
} else if event.SourceVersionStamp.GetUseVersioning() && event.SourceVersionStamp.GetBuildId() != "" ||
ms.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED {
// TODO: [cleanup-old-wv]
limit := ms.config.SearchAttributesSizeOfValueLimit(string(ms.namespaceEntry.Name()))
if _, err := ms.addBuildIdToSearchAttributesWithNoVisibilityTask(event.SourceVersionStamp, limit); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions service/worker/deployment/deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ package deployment

import (
"context"
"errors"
"time"

"go.temporal.io/api/serviceerror"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
Expand Down Expand Up @@ -214,6 +217,10 @@ func (d *DeploymentClientImpl) DescribeDeployment(ctx context.Context, namespace

res, err := d.HistoryClient.QueryWorkflow(ctx, req)
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
return nil, serviceerror.NewNotFound("Deployment not found")
}
return nil, err
}

Expand Down
Loading
Loading