Skip to content

Commit

Permalink
Pass versioning info when adding Matching tasks (#6879)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Workflow versioning info needs to be passed to tasks sent to Matching so
they can be places in right queues.

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
ShahabT authored Nov 25, 2024
1 parent 643dbad commit 848d76c
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 40 deletions.
4 changes: 4 additions & 0 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/temporalio/sqlparser"
commonpb "go.temporal.io/api/common/v1"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -177,7 +178,19 @@ func DeploymentToString(deployment *deploymentpb.Deployment) string {
// - assignedBuildId: the build ID to which the WF is currently assigned (i.e. mutable state's AssginedBuildId)
// - stamp: the latest versioning stamp of the execution (only needed for old versioning)
// - hasCompletedWorkflowTask: if the wf has completed any WFT
func MakeDirectiveForWorkflowTask(inheritedBuildId string, assignedBuildId string, stamp *commonpb.WorkerVersionStamp, hasCompletedWorkflowTask bool) *taskqueuespb.TaskVersionDirective {
// - behavior: workflow's effective behavior
// - deployment: workflow's effective deployment
func MakeDirectiveForWorkflowTask(
inheritedBuildId string,
assignedBuildId string,
stamp *commonpb.WorkerVersionStamp,
hasCompletedWorkflowTask bool,
behavior enumspb.VersioningBehavior,
deployment *deploymentpb.Deployment,
) *taskqueuespb.TaskVersionDirective {
if behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: deployment}
}
if id := BuildIdIfUsingVersioning(stamp); id != "" && assignedBuildId == "" {
// TODO: old versioning only [cleanup-old-wv]
return MakeBuildIdDirective(id)
Expand Down Expand Up @@ -213,7 +226,3 @@ func StampFromCapabilities(cap *commonpb.WorkerVersionCapabilities) *commonpb.Wo
func StampFromBuildId(buildId string) *commonpb.WorkerVersionStamp {
return &commonpb.WorkerVersionStamp{UseVersioning: true, BuildId: buildId}
}

func StampFromDeployment(deployment *deploymentpb.Deployment) *commonpb.WorkerVersionStamp {
return &commonpb.WorkerVersionStamp{UseVersioning: true, BuildId: deployment.BuildId}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ message WorkflowExecutionInfo {

// When present, it means the workflow execution is versioned, or is transitioning from
// unversioned workers to versioned ones.
// Note: Deployment objects inside versioning info are immutable, never change their fields.
// (-- api-linter: core::0203::immutable=disabled
// aip.dev/not-precedent: field_behavior annotation is not yet used in this repo --)
temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 96;
}

Expand Down
2 changes: 2 additions & 0 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ func queryDirectlyThroughMatching(
msResp.GetAssignedBuildId(),
msResp.GetMostRecentWorkerVersionStamp(),
msResp.GetPreviousStartedEventId() != common.EmptyEventID,
workflow.GetEffectiveVersioningBehavior(msResp.GetVersioningInfo()),
workflow.GetEffectiveDeployment(msResp.GetVersioningInfo()),
)

if msResp.GetIsStickyTaskQueueEnabled() &&
Expand Down
27 changes: 11 additions & 16 deletions service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func (u *Updater) ApplyRequest(
ms.GetAssignedBuildId(),
ms.GetMostRecentWorkerVersionStamp(),
ms.HasCompletedAnyWorkflowTask(),
ms.GetEffectiveVersioningBehavior(),
ms.GetEffectiveDeployment(),
)

return &api.UpdateWorkflowAction{
Expand All @@ -231,15 +233,15 @@ func (u *Updater) OnSuccess(
// Speculative WFT was created and needs to be added directly to matching w/o transfer task.
// TODO (alex): This code is copied from transferQueueActiveTaskExecutor.processWorkflowTask.
// Helper function needs to be extracted to avoid code duplication.
err := u.addWorkflowTaskToMatching(ctx, u.wfKey, u.taskQueue, u.scheduledEventID, u.scheduleToStartTimeout, u.directive)
err := u.addWorkflowTaskToMatching(ctx)

if _, isStickyWorkerUnavailable := err.(*serviceerrors.StickyWorkerUnavailable); isStickyWorkerUnavailable {
// If sticky worker is unavailable, switch to original normal task queue.
u.taskQueue = &taskqueuepb.TaskQueue{
Name: u.normalTaskQueueName,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
err = u.addWorkflowTaskToMatching(ctx, u.wfKey, u.taskQueue, u.scheduledEventID, u.scheduleToStartTimeout, u.directive)
err = u.addWorkflowTaskToMatching(ctx)
}

if err != nil {
Expand Down Expand Up @@ -277,14 +279,7 @@ func (u *Updater) OnSuccess(
}

// TODO (alex-update): Consider moving this func to a better place.
func (u *Updater) addWorkflowTaskToMatching(
ctx context.Context,
wfKey definition.WorkflowKey,
tq *taskqueuepb.TaskQueue,
scheduledEventID int64,
wtScheduleToStartTimeout time.Duration,
directive *taskqueuespb.TaskVersionDirective,
) error {
func (u *Updater) addWorkflowTaskToMatching(ctx context.Context) error {
clock, err := u.shardCtx.NewVectorClock()
if err != nil {
return err
Expand All @@ -293,14 +288,14 @@ func (u *Updater) addWorkflowTaskToMatching(
_, err = u.matchingClient.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{
NamespaceId: u.namespaceID.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: wfKey.WorkflowID,
RunId: wfKey.RunID,
WorkflowId: u.wfKey.WorkflowID,
RunId: u.wfKey.RunID,
},
TaskQueue: tq,
ScheduledEventId: scheduledEventID,
ScheduleToStartTimeout: durationpb.New(wtScheduleToStartTimeout),
TaskQueue: u.taskQueue,
ScheduledEventId: u.scheduledEventID,
ScheduleToStartTimeout: durationpb.New(u.scheduleToStartTimeout),
Clock: clock,
VersionDirective: directive,
VersionDirective: u.directive,
})
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions service/history/worker_versioning_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
Expand Down Expand Up @@ -225,10 +226,15 @@ func MakeDirectiveForWorkflowTask(ms workflow.MutableState) *taskqueuespb.TaskVe
ms.GetAssignedBuildId(),
ms.GetMostRecentWorkerVersionStamp(),
ms.HasCompletedAnyWorkflowTask(),
ms.GetEffectiveVersioningBehavior(),
ms.GetEffectiveDeployment(),
)
}

func MakeDirectiveForActivityTask(mutableState workflow.MutableState, activityInfo *persistencespb.ActivityInfo) *taskqueuespb.TaskVersionDirective {
if behavior := mutableState.GetEffectiveVersioningBehavior(); behavior != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return &taskqueuespb.TaskVersionDirective{Behavior: behavior, Deployment: mutableState.GetEffectiveDeployment()}
}
if !activityInfo.UseCompatibleVersion && activityInfo.GetUseWorkflowBuildIdInfo() == nil {
return worker_versioning.MakeUseAssignmentRulesDirective()
} else if id := mutableState.GetAssignedBuildId(); id != "" {
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ type (
// common case). Deployment is set based on the worker-sent deployment in the latest WFT
// completion. Exception: if Deployment is set but the workflow's effective behavior is
// UNSPECIFIED, it means the workflow is unversioned, so effective deployment will be nil.
// Note: Deployment objects are immutable, never change their fields.
GetEffectiveDeployment() *deploymentpb.Deployment
// GetEffectiveVersioningBehavior returns the effective versioning behavior in the following
// order:
Expand Down
23 changes: 4 additions & 19 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6851,19 +6851,10 @@ func (ms *MutableStateImpl) disablingTransitionHistory() bool {
// common case). Deployment is set based on the worker-sent deployment in the latest WFT
// completion. Exception: if Deployment is set but the workflow's effective behavior is
// UNSPECIFIED, it means the workflow is unversioned, so effective deployment will be nil.
//
// Note: Deployment objects are immutable, never change their fields.
func (ms *MutableStateImpl) GetEffectiveDeployment() *deploymentpb.Deployment {
versioningInfo := ms.GetExecutionInfo().GetVersioningInfo()
if versioningInfo == nil {
return nil
} else if transition := versioningInfo.GetDeploymentTransition(); transition != nil {
return transition.GetDeployment()
} else if override := versioningInfo.GetVersioningOverride(); override != nil &&
override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED {
return override.GetDeployment()
} else if ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return versioningInfo.GetDeployment()
}
return nil
return GetEffectiveDeployment(ms.GetExecutionInfo().GetVersioningInfo())
}

func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTransition {
Expand All @@ -6877,13 +6868,7 @@ func (ms *MutableStateImpl) GetDeploymentTransition() *workflowpb.DeploymentTran
// 2. Behavior: this is returned when there is no override (most common case). Behavior is
// set based on the worker-sent deployment in the latest WFT completion.
func (ms *MutableStateImpl) GetEffectiveVersioningBehavior() enumspb.VersioningBehavior {
versioningInfo := ms.GetExecutionInfo().GetVersioningInfo()
if versioningInfo == nil {
return enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED
} else if override := versioningInfo.GetVersioningOverride(); override != nil {
return override.GetBehavior()
}
return versioningInfo.GetBehavior()
return GetEffectiveVersioningBehavior(ms.GetExecutionInfo().GetVersioningInfo())
}

// StartDeploymentTransition starts a transition to the given deployment. Returns true
Expand Down
41 changes: 41 additions & 0 deletions service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package workflow

import (
commonpb "go.temporal.io/api/common/v1"
deploymentspb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -195,3 +196,43 @@ func (mse MutableStateWithEffects) CanAddEvent() bool {
// Event can be added to the history if workflow is still running.
return mse.MutableState.IsWorkflowExecutionRunning()
}

// GetEffectiveDeployment returns the effective deployment in the following order:
// 1. DeploymentTransition.Deployment: this is returned when the wf is transitioning to a
// new deployment
// 2. VersioningOverride.Deployment: this is returned when user has set a PINNED override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 3. Deployment: this is returned when there is no transition and no override (the most
// common case). Deployment is set based on the worker-sent deployment in the latest WFT
// completion. Exception: if Deployment is set but the workflow's effective behavior is
// UNSPECIFIED, it means the workflow is unversioned, so effective deployment will be nil.
//
// Note: Deployment objects are immutable, never change their fields.
func GetEffectiveDeployment(versioningInfo *workflowpb.WorkflowExecutionVersioningInfo) *deploymentspb.Deployment {
if versioningInfo == nil {
return nil
} else if transition := versioningInfo.GetDeploymentTransition(); transition != nil {
return transition.GetDeployment()
} else if override := versioningInfo.GetVersioningOverride(); override != nil &&
override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED {
return override.GetDeployment()
} else if GetEffectiveVersioningBehavior(versioningInfo) != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED {
return versioningInfo.GetDeployment()
}
return nil
}

// GetEffectiveVersioningBehavior returns the effective versioning behavior in the following
// order:
// 1. VersioningOverride.Behavior: this is returned when user has set a behavior override
// at wf start time, or later via UpdateWorkflowExecutionOptions.
// 2. Behavior: this is returned when there is no override (most common case). Behavior is
// set based on the worker-sent deployment in the latest WFT completion.
func GetEffectiveVersioningBehavior(versioningInfo *workflowpb.WorkflowExecutionVersioningInfo) enumspb.VersioningBehavior {
if versioningInfo == nil {
return enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED
} else if override := versioningInfo.GetVersioningOverride(); override != nil {
return override.GetBehavior()
}
return versioningInfo.GetBehavior()
}
2 changes: 2 additions & 0 deletions service/matching/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro
Clock: task.event.Data.GetClock(),
ScheduleToStartTimeout: expirationDuration,
ForwardInfo: fwdr.getForwardInfo(task),
VersionDirective: task.event.Data.GetVersionDirective(),
},
)
case enumspb.TASK_QUEUE_TYPE_ACTIVITY:
Expand All @@ -168,6 +169,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro
Clock: task.event.Data.GetClock(),
ScheduleToStartTimeout: expirationDuration,
ForwardInfo: fwdr.getForwardInfo(task),
VersionDirective: task.event.Data.GetVersionDirective(),
},
)
default:
Expand Down

0 comments on commit 848d76c

Please sign in to comment.