diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 3cc7ec2af4d..d5c02de2bd1 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -484,7 +484,9 @@ Deleted Redirect Rules will be kept in the DB (with DeleteTimestamp). After this "matching.wv.ReachabilityBuildIdVisibilityGracePeriod", 3*time.Minute, `ReachabilityBuildIdVisibilityGracePeriod is the time period for which deleted versioning rules are still considered active -to account for the delay in updating the build id field in visibility.`, +to account for the delay in updating the build id field in visibility. Not yet supported for GetDeploymentReachability. We recommend waiting +at least 2 minutes between changing the current deployment and calling GetDeployment, so that newly started workflow executions using the +recently-current deployment can arrive in visibility.`, ) ReachabilityTaskQueueScanLimit = NewGlobalIntSetting( "limit.reachabilityTaskQueueScan", diff --git a/common/rpc/interceptor/redirection.go b/common/rpc/interceptor/redirection.go index 4bace1a7c9c..06ebf75576b 100644 --- a/common/rpc/interceptor/redirection.go +++ b/common/rpc/interceptor/redirection.go @@ -128,6 +128,12 @@ var ( "DescribeBatchOperation": func() any { return &workflowservice.DescribeBatchOperationResponse{} }, "ListBatchOperations": func() any { return &workflowservice.ListBatchOperationsResponse{} }, "UpdateActivityOptionsById": func() any { return &workflowservice.UpdateActivityOptionsByIdResponse{} }, + + "DescribeDeployment": func() any { return &workflowservice.DescribeDeploymentResponse{} }, + "ListDeployments": func() any { return &workflowservice.ListDeploymentsResponse{} }, + "GetDeploymentReachability": func() any { return &workflowservice.GetDeploymentReachabilityResponse{} }, + "GetCurrentDeployment": func() any { return &workflowservice.GetCurrentDeploymentResponse{} }, + "SetCurrentDeployment": func() any { return &workflowservice.SetCurrentDeploymentResponse{} }, } ) diff --git a/common/worker_versioning/worker_versioning.go b/common/worker_versioning/worker_versioning.go index 0f60118c17a..8696b23449f 100644 --- a/common/worker_versioning/worker_versioning.go +++ b/common/worker_versioning/worker_versioning.go @@ -51,7 +51,6 @@ const ( UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned ) -// TODO (carly): fix delimiter // escapeBuildIdSearchAttributeDelimiter is a helper which escapes the BuildIdSearchAttributeDelimiter character in the input string func escapeBuildIdSearchAttributeDelimiter(s string) string { s = strings.Replace(s, BuildIdSearchAttributeDelimiter, `|`+BuildIdSearchAttributeDelimiter, -1) @@ -63,16 +62,13 @@ func escapeBuildIdSearchAttributeDelimiter(s string) string { // BuildIds KeywordList in this format. If the workflow becomes unpinned or unversioned, this entry will be removed from // that list. func PinnedBuildIdSearchAttribute(deployment *deploymentpb.Deployment) string { - escapedDeployment := fmt.Sprintf("%s%s%s", + return fmt.Sprintf("%s%s%s%s%s", + BuildIdSearchAttributePrefixPinned, + BuildIdSearchAttributeDelimiter, escapeBuildIdSearchAttributeDelimiter(deployment.GetSeriesName()), BuildIdSearchAttributeDelimiter, escapeBuildIdSearchAttributeDelimiter(deployment.GetBuildId()), ) - return sqlparser.String(sqlparser.NewStrVal([]byte(fmt.Sprintf("%s%s%s", - BuildIdSearchAttributePrefixPinned, - BuildIdSearchAttributeDelimiter, - escapedDeployment, - )))) } // AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID diff --git a/service/history/api/updateworkflowoptions/api.go b/service/history/api/updateworkflowoptions/api.go index 6d5caa16397..95f3ee30bf4 100644 --- a/service/history/api/updateworkflowoptions/api.go +++ b/service/history/api/updateworkflowoptions/api.go @@ -27,7 +27,6 @@ package updateworkflowoptions import ( "context" "fmt" - "google.golang.org/protobuf/types/known/fieldmaskpb" "go.temporal.io/api/serviceerror" @@ -64,10 +63,8 @@ func Invoke( req.GetWorkflowExecution().GetWorkflowId(), req.GetWorkflowExecution().GetRunId(), ), - func(workflowLease api.WorkflowLease) (_ *api.UpdateWorkflowAction, updateError error) { + func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) { mutableState := workflowLease.GetMutableState() - defer func() { workflowLease.GetReleaseFn()(updateError) }() - if !mutableState.IsWorkflowExecutionRunning() { // in-memory mutable state is still clean, let updateError=nil in the defer func() // to prevent clearing and reloading mutable state while releasing the lock @@ -75,13 +72,13 @@ func Invoke( } // Merge the requested options mentioned in the field mask with the current options in the mutable state - mergedOpts, updateError := applyWorkflowExecutionOptions( + mergedOpts, err := applyWorkflowExecutionOptions( getOptionsFromMutableState(mutableState), req.GetWorkflowExecutionOptions(), req.GetUpdateMask(), ) - if updateError != nil { - return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error parsing update_options: %v", updateError)) + if err != nil { + return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error applying update_options: %v", err)) } // Set options for gRPC response @@ -95,9 +92,9 @@ func Invoke( }, nil } - _, updateError = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(req.GetWorkflowExecutionOptions().GetVersioningOverride()) - if updateError != nil { - return nil, updateError + _, err = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride()) + if err != nil { + return nil, err } // TODO (carly) part 2: handle safe deployment change --> CreateWorkflowTask=true diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 7fda1451de2..8c6330175b3 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -2740,7 +2740,10 @@ func (ms *MutableStateImpl) saveBuildIds(buildIds []string, maxSearchAttributeVa ms.executionInfo.SearchAttributes = searchAttributes } - hasUnversionedOrAssigned := worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0]) + hasUnversionedOrAssigned := false + if len(buildIds) > 0 { // len is 0 if we are removing the pinned search attribute and the workflow was never unversioned or assigned + hasUnversionedOrAssigned = worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0]) + } for { saPayload, err := searchattribute.EncodeValue(buildIds, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) if err != nil { diff --git a/service/worker/deployment/deployment_client.go b/service/worker/deployment/deployment_client.go index 2ed030ea584..c18e12db5c2 100644 --- a/service/worker/deployment/deployment_client.go +++ b/service/worker/deployment/deployment_client.go @@ -93,8 +93,7 @@ type DeploymentClientImpl struct { VisibilityManager manager.VisibilityManager MaxIDLengthLimit dynamicconfig.IntPropertyFn VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter - - reachabilityCache reachabilityCache + reachabilityCache reachabilityCache } var _ DeploymentStoreClient = (*DeploymentClientImpl)(nil) diff --git a/service/worker/deployment/deployment_reachability.go b/service/worker/deployment/deployment_reachability.go index 34bad8faec4..ab5a18abe09 100644 --- a/service/worker/deployment/deployment_reachability.go +++ b/service/worker/deployment/deployment_reachability.go @@ -59,7 +59,7 @@ func getDeploymentReachability( ) (enumspb.DeploymentReachability, time.Time, error) { // 1a. Reachable by new unpinned workflows if isCurrent { - // TODO (carly): still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability + // TODO (carly) part 2: still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability return enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, time.Now(), nil } @@ -99,10 +99,10 @@ func makeCountRequest( func makeDeploymentQuery(seriesName, buildID string, open bool) string { var statusFilter string - deploymentFilter := "= " + worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{ + deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{ SeriesName: seriesName, BuildId: buildID, - }) + })) if open { statusFilter = "= 'Running'" } else { diff --git a/service/worker/deployment/deployment_reachability_test.go b/service/worker/deployment/deployment_reachability_test.go index 6fc17323bce..da25dedeca4 100644 --- a/service/worker/deployment/deployment_reachability_test.go +++ b/service/worker/deployment/deployment_reachability_test.go @@ -47,11 +47,11 @@ func TestMakeDeploymentQuery(t *testing.T) { buildId := "A" query := makeDeploymentQuery(seriesName, buildId, true) - expectedQuery := "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus = 'Running'" + expectedQuery := "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus = 'Running'" assert.Equal(t, expectedQuery, query) query = makeDeploymentQuery(seriesName, buildId, false) - expectedQuery = "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus != 'Running'" + expectedQuery = "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus != 'Running'" assert.Equal(t, expectedQuery, query) } @@ -63,9 +63,8 @@ func TestReachable_CurrentDeployment(t *testing.T) { vm := manager.NewMockVisibilityManager(gomock.NewController(t)) // won't receive any calls testCache := newReachabilityCache(metrics.NoopMetricsHandler, vm, testReachabilityCacheOpenWFsTTL, testReachabilityCacheClosedWFsTTL) - reach, reachValidTime, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache) + reach, _, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache) assert.Nil(t, err) - assert.Greater(t, time.Now(), reachValidTime) assert.Equal(t, enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, reach) } diff --git a/service/worker/deployment/fx.go b/service/worker/deployment/fx.go index 3e75a4ea3f8..13e1708c9a7 100644 --- a/service/worker/deployment/fx.go +++ b/service/worker/deployment/fx.go @@ -95,8 +95,8 @@ func DeploymentStoreClientProvider(historyClient resource.HistoryClient, visibil reachabilityCache: newReachabilityCache( metrics.NoopMetricsHandler, visibilityManager, - reachabilityCacheOpenWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheOpenWFsTTL) - reachabilityCacheClosedWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheClosedWFsTTL) + dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc)(), + dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc)(), ), } } diff --git a/tests/deployment_test.go b/tests/deployment_test.go index 660a4728806..fadfd82d227 100644 --- a/tests/deployment_test.go +++ b/tests/deployment_test.go @@ -27,18 +27,24 @@ package tests import ( "context" "fmt" + workflowpb "go.temporal.io/api/workflow/v1" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/fieldmaskpb" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/service/worker/deployment" "go.temporal.io/server/tests/testcore" "google.golang.org/protobuf/types/known/timestamppb" @@ -56,6 +62,7 @@ type ( DeploymentSuite struct { testcore.FunctionalTestBase *require.Assertions + sdkClient sdkclient.Client } ) @@ -78,6 +85,15 @@ func (d *DeploymentSuite) SetupSuite() { dynamicconfig.FrontendEnableExecuteMultiOperation.Key(): true, dynamicconfig.MatchingEnableDeployments.Key(): true, dynamicconfig.WorkerEnableDeployment.Key(): true, + + // Reachability + dynamicconfig.ReachabilityCacheOpenWFsTTL.Key(): testReachabilityCacheOpenWFsTTL, + dynamicconfig.ReachabilityCacheClosedWFsTTL.Key(): testReachabilityCacheClosedWFsTTL, + + // Make sure we don't hit the rate limiter in tests + dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Key(): 1000, + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstRatioPerInstance.Key(): 1, + dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS.Key(): 1000, } d.SetDynamicConfigOverrides(dynamicConfigOverrides) d.FunctionalTestBase.SetupSuite("testdata/es_cluster.yaml") @@ -91,6 +107,14 @@ func (d *DeploymentSuite) TearDownSuite() { func (d *DeploymentSuite) SetupTest() { d.FunctionalTestBase.SetupTest() d.setAssertions() + sdkClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: d.FrontendGRPCAddress(), + Namespace: d.Namespace(), + }) + if err != nil { + d.Logger.Fatal("Error when creating SDK client", tag.Error(err)) + } + d.sdkClient = sdkClient } func (d *DeploymentSuite) TearDownTest() { @@ -235,6 +259,7 @@ func (d *DeploymentSuite) TestGetCurrentDeployment_NoCurrentDeployment() { } } +// addDeploymentsAndVerifyListDeployments does the following: // verifyDeploymentListInfo checks the equality between two DeploymentListInfo objects func (d *DeploymentSuite) verifyDeploymentListInfo(expectedDeploymentListInfo *deploymentpb.DeploymentListInfo, receivedDeploymentListInfo *deploymentpb.DeploymentListInfo) bool { maxDurationBetweenTimeStamps := 1 * time.Millisecond @@ -371,3 +396,514 @@ func (d *DeploymentSuite) TestListDeployments_MultipleDeployments_WithSeriesFilt seriesFilter := deploymentInfo[0].Deployment.SeriesName d.startlistAndValidateDeployments(deploymentInfo, seriesFilter) } + +// TODO Shivam - refactor the above test cases TestListDeployments_WithSeriesNameFilter + TestListDeployments_WithoutSeriesNameFilter +// Refactoring should be done in a way where we are validating the exact deployment (based on how many we create) - right now, +// the tests do validate the read API logic but are not the most assertive + +// TODO Shivam - Add more getCurrentDeployment tests when SetCurrentDefaultBuildID API has been defined + +func (d *DeploymentSuite) TestGetDeploymentReachability_OverrideUnversioned() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // presence of internally used delimiters (:) or escape + // characters shouldn't break functionality + seriesName := testcore.RandomizeStr("my-series|:|:") + buildID := testcore.RandomizeStr("bgt:|") + taskQueue := &taskqueuepb.TaskQueue{Name: "deployment-test", Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + workerDeployment := &deploymentpb.Deployment{ + SeriesName: seriesName, + BuildId: buildID, + } + errChan := make(chan error) + defer close(errChan) + + // Start a deployment workflow + go func() { + d.startDeploymentWorkflows(ctx, taskQueue, workerDeployment, errChan) + }() + + // Wait for the deployment to exist + d.EventuallyWithT(func(t *assert.CollectT) { + a := assert.New(t) + + resp, err := d.FrontendClient().DescribeDeployment(ctx, &workflowservice.DescribeDeploymentRequest{ + Namespace: d.Namespace(), + Deployment: workerDeployment, + }) + a.NoError(err) + a.NotNil(resp.DeploymentInfo) + a.NotNil(resp.DeploymentInfo.Deployment) + }, time.Second*5, time.Millisecond*200) + <-ctx.Done() + select { + case err := <-errChan: + d.Fail("Expected error channel to be empty but got error %w", err) + default: + } + + // non-current deployment is unreachable + ctx = context.Background() + resp, err := d.FrontendClient().GetDeploymentReachability(ctx, &workflowservice.GetDeploymentReachabilityRequest{ + Namespace: d.Namespace(), + Deployment: workerDeployment, + }) + d.NoError(err) + d.Assert().Equal(enumspb.DEPLOYMENT_REACHABILITY_UNREACHABLE, resp.GetReachability()) + + // start an unversioned workflow, set pinned deployment override --> deployment should be reachable + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + + // set override on our new unversioned workflow + updateOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: workerDeployment, + }, + } + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: updateOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), updateOpts)) + + // describe workflow and check that the versioning info has the override + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(updateOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + d.Eventually(func() bool { + resp, err = d.FrontendClient().GetDeploymentReachability(ctx, &workflowservice.GetDeploymentReachabilityRequest{ + Namespace: d.Namespace(), + Deployment: workerDeployment, + }) + return resp.GetReachability() == enumspb.DEPLOYMENT_REACHABILITY_REACHABLE + }, 5*time.Second, 50*time.Millisecond) + + // TODO (carly): once sdk allows starting a deployment worker, start worker, complete workflow, and check for CLOSED_ONLY + // TODO (carly): once SetCurrentDeployment is ready, check that a current deployment is reachable even with no workflows + // TODO (carly): test starting a workflow execution on a current deployment, then getting reachability with no override + // TODO (carly): check cache times (do I need to do this in functional when I have cache time tests in unit?) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetUnpinnedThenUnset() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + unpinnedOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + Deployment: nil, + }, + } + + // 1. Set unpinned override --> describe workflow shows the override + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: unpinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), unpinnedOpts)) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(unpinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + // 2. Unset using empty update opts with mutation mask --> describe workflow shows no more override + updateResp, err = d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), &workflowpb.WorkflowExecutionOptions{})) + descResp, err = d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + d.Nil(descResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride()) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetPinnedThenUnset() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // presence of internally used delimiters (:) or escape + // characters shouldn't break functionality + seriesName := testcore.RandomizeStr("my-series|:|:") + buildID := testcore.RandomizeStr("bgt:|") + workerDeployment := &deploymentpb.Deployment{ + SeriesName: seriesName, + BuildId: buildID, + } + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + pinnedOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: workerDeployment, + }, + } + + // 1. Set pinned override on our new unversioned workflow --> describe workflow shows the override + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: pinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOpts)) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(pinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + // 2. Unset with empty update opts with mutation mask --> describe workflow shows no more override + updateResp, err = d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), &workflowpb.WorkflowExecutionOptions{})) + descResp, err = d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + d.Nil(descResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride()) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_EmptyFields() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // presence of internally used delimiters (:) or escape + // characters shouldn't break functionality + seriesName := testcore.RandomizeStr("my-series|:|:") + buildID := testcore.RandomizeStr("bgt:|") + workerDeployment := &deploymentpb.Deployment{ + SeriesName: seriesName, + BuildId: buildID, + } + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + pinnedOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: workerDeployment, + }, + } + + // 1. Pinned update with empty mask --> describe workflow shows no change + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: pinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), &workflowpb.WorkflowExecutionOptions{})) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.Nil(versioningInfo.GetVersioningOverride()) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetPinnedSetPinned() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + pinnedOptsA := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: &deploymentpb.Deployment{ + SeriesName: "seriesName", + BuildId: "A", + }, + }, + } + pinnedOptsB := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: &deploymentpb.Deployment{ + SeriesName: "seriesName", + BuildId: "B", + }, + }, + } + + // 1. Set pinned override A --> describe workflow shows the override + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: pinnedOptsA, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOptsA)) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(pinnedOptsA.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + // 3. Set pinned override B --> describe workflow shows the override + updateResp, err = d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: pinnedOptsB, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOptsB)) + descResp, err = d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo = descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(pinnedOptsB.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetUnpinnedSetUnpinned() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + unpinnedOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + Deployment: nil, + }, + } + + // 1. Set unpinned override --> describe workflow shows the override + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: unpinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), unpinnedOpts)) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(unpinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + // 1. Set unpinned override --> describe workflow shows the override + updateResp, err = d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: unpinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), unpinnedOpts)) + descResp, err = d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo = descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(unpinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetUnpinnedSetPinned() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + unpinnedOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + Deployment: nil, + }, + } + + // 1. Set unpinned override --> describe workflow shows the override + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: unpinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), unpinnedOpts)) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(unpinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + pinnedOptsA := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: &deploymentpb.Deployment{ + SeriesName: "seriesName", + BuildId: "A", + }, + }, + } + + // 1. Set pinned override A --> describe workflow shows the override + updateResp, err = d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: pinnedOptsA, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOptsA)) + descResp, err = d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo = descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(pinnedOptsA.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) +} + +func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetPinnedSetUnpinned() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // start an unversioned workflow + unversionedTQ := "unversioned-test-tq" + run, err := d.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: unversionedTQ}, "wf") + d.NoError(err) + unversionedWFExec := &commonpb.WorkflowExecution{ + WorkflowId: run.GetID(), + RunId: run.GetRunID(), + } + unpinnedOpts := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + Deployment: nil, + }, + } + pinnedOptsA := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: &deploymentpb.Deployment{ + SeriesName: "seriesName", + BuildId: "A", + }, + }, + } + + // 1. Set pinned override A --> describe workflow shows the override + updateResp, err := d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: pinnedOptsA, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOptsA)) + descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo := descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(pinnedOptsA.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) + + // 1. Set unpinned override --> describe workflow shows the override + updateResp, err = d.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: d.Namespace(), + WorkflowExecution: unversionedWFExec, + WorkflowExecutionOptions: unpinnedOpts, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + d.NoError(err) + d.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), unpinnedOpts)) + descResp, err = d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: d.Namespace(), + Execution: unversionedWFExec, + }) + d.NoError(err) + versioningInfo = descResp.GetWorkflowExecutionInfo().GetVersioningInfo() + d.True(proto.Equal(unpinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride())) +}