Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Functional Tests and Fix Bugs #6889

Merged
merged 7 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,9 @@ Deleted Redirect Rules will be kept in the DB (with DeleteTimestamp). After this
"matching.wv.ReachabilityBuildIdVisibilityGracePeriod",
3*time.Minute,
`ReachabilityBuildIdVisibilityGracePeriod is the time period for which deleted versioning rules are still considered active
to account for the delay in updating the build id field in visibility.`,
to account for the delay in updating the build id field in visibility. Not yet supported for GetDeploymentReachability. We recommend waiting
carlydf marked this conversation as resolved.
Show resolved Hide resolved
at least 2 minutes between changing the current deployment and calling GetDeployment, so that newly started workflow executions using the
recently-current deployment can arrive in visibility.`,
)
ReachabilityTaskQueueScanLimit = NewGlobalIntSetting(
"limit.reachabilityTaskQueueScan",
Expand Down
2 changes: 2 additions & 0 deletions common/rpc/interceptor/redirection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ var (
"DescribeBatchOperation": func() any { return &workflowservice.DescribeBatchOperationResponse{} },
"ListBatchOperations": func() any { return &workflowservice.ListBatchOperationsResponse{} },
"UpdateActivityOptionsById": func() any { return &workflowservice.UpdateActivityOptionsByIdResponse{} },

"GetDeploymentReachability": func() any { return &workflowservice.GetDeploymentReachabilityResponse{} },
}
)

Expand Down
40 changes: 15 additions & 25 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/temporalio/sqlparser"
commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common/namespace"
Expand All @@ -42,42 +41,33 @@ import (
)

const (
buildIdSearchAttributePrefixReachability = "reachability"
buildIdSearchAttributePrefixAssigned = "assigned"
buildIdSearchAttributePrefixVersioned = "versioned"
buildIdSearchAttributePrefixUnversioned = "unversioned"
BuildIdSearchAttributeDelimiter = ":"
BuildIdSearchAttributePrefixPinned = "pinned"
buildIdSearchAttributePrefixAssigned = "assigned"
buildIdSearchAttributePrefixVersioned = "versioned"
buildIdSearchAttributePrefixUnversioned = "unversioned"
BuildIdSearchAttributeDelimiter = ":"
// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
)

// TODO (carly): fix delimiter
// escapeBuildIdSearchAttributeDelimiter is a helper which escapes the BuildIdSearchAttributeDelimiter character in the input string
func escapeBuildIdSearchAttributeDelimiter(s string) string {
s = strings.Replace(s, BuildIdSearchAttributeDelimiter, `|`+BuildIdSearchAttributeDelimiter, -1)
return s
}

// ReachabilityBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID in the form
// 'reachability:<behavior>:<deployment_series_name>:<deployment_build_id>'
func ReachabilityBuildIdSearchAttribute(behavior enumspb.VersioningBehavior, deployment *deploymentpb.Deployment) string {
var escapedDeployment string
if deployment == nil {
escapedDeployment = "UNVERSIONED"
} else {
escapedDeployment = fmt.Sprintf("%s%s%s",
escapeBuildIdSearchAttributeDelimiter(deployment.GetSeriesName()),
BuildIdSearchAttributeDelimiter,
escapeBuildIdSearchAttributeDelimiter(deployment.GetBuildId()),
)
}
return sqlparser.String(sqlparser.NewStrVal([]byte(fmt.Sprintf("%s%s%s%s%s",
buildIdSearchAttributePrefixReachability,
// PinnedBuildIdSearchAttribute returns the search attribute value for the currently assigned pinned build ID in the form
// 'pinned:<deployment_series_name>:<deployment_build_id>'. Each workflow execution will have at most one member of the
// BuildIds KeywordList in this format. If the workflow becomes unpinned or unversioned, this entry will be removed from
// that list.
func PinnedBuildIdSearchAttribute(deployment *deploymentpb.Deployment) string {
return fmt.Sprintf("%s%s%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapeBuildIdSearchAttributeDelimiter(behavior.String()),
escapeBuildIdSearchAttributeDelimiter(deployment.GetSeriesName()),
BuildIdSearchAttributeDelimiter,
escapedDeployment,
))))
escapeBuildIdSearchAttributeDelimiter(deployment.GetBuildId()),
)
}

// AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID
Expand Down
17 changes: 7 additions & 10 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package updateworkflowoptions
import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/fieldmaskpb"

"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -64,24 +63,22 @@ func Invoke(
req.GetWorkflowExecution().GetWorkflowId(),
req.GetWorkflowExecution().GetRunId(),
),
func(workflowLease api.WorkflowLease) (_ *api.UpdateWorkflowAction, updateError error) {
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
defer func() { workflowLease.GetReleaseFn()(updateError) }()

if !mutableState.IsWorkflowExecutionRunning() {
// in-memory mutable state is still clean, let updateError=nil in the defer func()
// to prevent clearing and reloading mutable state while releasing the lock
return nil, consts.ErrWorkflowCompleted
}

// Merge the requested options mentioned in the field mask with the current options in the mutable state
mergedOpts, updateError := applyWorkflowExecutionOptions(
mergedOpts, err := applyWorkflowExecutionOptions(
getOptionsFromMutableState(mutableState),
req.GetWorkflowExecutionOptions(),
req.GetUpdateMask(),
)
if updateError != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error parsing update_options: %v", updateError))
if err != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error applying update_options: %v", err))
}

// Set options for gRPC response
Expand All @@ -95,9 +92,9 @@ func Invoke(
}, nil
}

_, updateError = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(req.GetWorkflowExecutionOptions().GetVersioningOverride())
if updateError != nil {
return nil, updateError
_, err = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride())
if err != nil {
return nil, err
}

// TODO (carly) part 2: handle safe deployment change --> CreateWorkflowTask=true
Expand Down
42 changes: 24 additions & 18 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"math/rand"
"reflect"
"slices"
"strings"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
Expand Down Expand Up @@ -2678,17 +2679,14 @@ func (ms *MutableStateImpl) loadBuildIds() ([]string, error) {
return searchAttributeValues, nil
}

// getReachabilityDeployment ignores DeploymentTransition.Deployment. If there is a pinned override,
// it returns the override deployment. If there is an unpinned override or no override, it returns
// execution_info.deployment, which is the last deployment that this workflow completed a task on.
func (ms *MutableStateImpl) getReachabilityDeployment() *deploymentpb.Deployment {
versioningInfo := ms.GetExecutionInfo().GetVersioningInfo()
if override := versioningInfo.GetVersioningOverride(); override != nil {
if override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED {
return override.GetDeployment()
}
// getPinnedDeployment returns nil if the workflow is not pinned. If there is a pinned override,
// it returns the override deployment. If there is no override, it returns execution_info.deployment,
// which is the last deployment that this workflow completed a task on.
func (ms *MutableStateImpl) getPinnedDeployment() *deploymentpb.Deployment {
if ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_PINNED {
return nil
}
return versioningInfo.GetDeployment()
return ms.GetEffectiveDeployment()
}

// Takes a list of loaded build IDs from a search attribute and adds a new build ID to it. Also makes sure that the
Expand All @@ -2701,18 +2699,16 @@ func (ms *MutableStateImpl) addBuildIdToLoadedSearchAttribute(
stamp *commonpb.WorkerVersionStamp,
) []string {
var newValues []string
if !stamp.GetUseVersioning() && ms.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { // unversioned workflows may still have non-nil deployment, so we don't check deployment
effectiveBehavior := ms.GetEffectiveVersioningBehavior()
if !stamp.GetUseVersioning() && effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { // unversioned workflows may still have non-nil deployment, so we don't check deployment
newValues = append(newValues, worker_versioning.UnversionedSearchAttribute)
} else if ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
newValues = append(newValues, worker_versioning.ReachabilityBuildIdSearchAttribute(
ms.GetEffectiveVersioningBehavior(),
ms.getReachabilityDeployment(),
))
} else if effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED {
newValues = append(newValues, worker_versioning.PinnedBuildIdSearchAttribute(ms.getPinnedDeployment()))
} else if ms.GetAssignedBuildId() != "" {
newValues = append(newValues, worker_versioning.AssignedBuildIdSearchAttribute(ms.GetAssignedBuildId()))
}

if ms.GetEffectiveVersioningBehavior() == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
if effectiveBehavior == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
buildId := worker_versioning.VersionStampToBuildIdSearchAttribute(stamp)
found := slices.Contains(newValues, buildId)
for _, existingValue := range existingValues {
Expand All @@ -2727,6 +2723,13 @@ func (ms *MutableStateImpl) addBuildIdToLoadedSearchAttribute(
newValues = append(newValues, buildId)
}
}

// Remove pinned build id search attribute if it exists and we are not pinned
if effectiveBehavior != enumspb.VERSIONING_BEHAVIOR_PINNED {
newValues = slices.DeleteFunc(newValues, func(s string) bool {
return strings.Contains(s, worker_versioning.BuildIdSearchAttributePrefixPinned)
})
}
return newValues
}

Expand All @@ -2737,7 +2740,10 @@ func (ms *MutableStateImpl) saveBuildIds(buildIds []string, maxSearchAttributeVa
ms.executionInfo.SearchAttributes = searchAttributes
}

hasUnversionedOrAssigned := worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0])
hasUnversionedOrAssigned := false
if len(buildIds) > 0 { // len is 0 if we are removing the pinned search attribute and the workflow was never unversioned or assigned
hasUnversionedOrAssigned = worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0])
}
for {
saPayload, err := searchattribute.EncodeValue(buildIds, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions service/worker/deployment/deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ type DeploymentClientImpl struct {
VisibilityManager manager.VisibilityManager
MaxIDLengthLimit dynamicconfig.IntPropertyFn
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter

reachabilityCache reachabilityCache
reachabilityCache reachabilityCache
}

var _ DeploymentStoreClient = (*DeploymentClientImpl)(nil)
Expand Down
6 changes: 3 additions & 3 deletions service/worker/deployment/deployment_reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func getDeploymentReachability(
) (enumspb.DeploymentReachability, time.Time, error) {
// 1a. Reachable by new unpinned workflows
if isCurrent {
// TODO (carly): still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability
// TODO (carly) part 2: still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability
return enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, time.Now(), nil
}

Expand Down Expand Up @@ -99,10 +99,10 @@ func makeCountRequest(

func makeDeploymentQuery(seriesName, buildID string, open bool) string {
var statusFilter string
deploymentFilter := "= " + worker_versioning.ReachabilityBuildIdSearchAttribute(enumspb.VERSIONING_BEHAVIOR_PINNED, &deploymentpb.Deployment{
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{
SeriesName: seriesName,
BuildId: buildID,
})
}))
if open {
statusFilter = "= 'Running'"
} else {
Expand Down
7 changes: 3 additions & 4 deletions service/worker/deployment/deployment_reachability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func TestMakeDeploymentQuery(t *testing.T) {
buildId := "A"

query := makeDeploymentQuery(seriesName, buildId, true)
expectedQuery := "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
expectedQuery := "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
assert.Equal(t, expectedQuery, query)

query = makeDeploymentQuery(seriesName, buildId, false)
expectedQuery = "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
expectedQuery = "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
assert.Equal(t, expectedQuery, query)
}

Expand All @@ -63,9 +63,8 @@ func TestReachable_CurrentDeployment(t *testing.T) {
vm := manager.NewMockVisibilityManager(gomock.NewController(t)) // won't receive any calls
testCache := newReachabilityCache(metrics.NoopMetricsHandler, vm, testReachabilityCacheOpenWFsTTL, testReachabilityCacheClosedWFsTTL)

reach, reachValidTime, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache)
reach, _, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache)
assert.Nil(t, err)
assert.Greater(t, time.Now(), reachValidTime)
assert.Equal(t, enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, reach)
}

Expand Down
4 changes: 2 additions & 2 deletions service/worker/deployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func DeploymentStoreClientProvider(historyClient resource.HistoryClient, visibil
reachabilityCache: newReachabilityCache(
metrics.NoopMetricsHandler,
visibilityManager,
reachabilityCacheOpenWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheOpenWFsTTL)
reachabilityCacheClosedWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheClosedWFsTTL)
dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc)(),
dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc)(),
),
}
}
Expand Down
Loading