-
Notifications
You must be signed in to change notification settings - Fork 888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass versioning info when adding Matching tasks #6879
Conversation
…shahab/transfer-versioning-info # Conflicts: # service/history/workflow/mutable_state_impl.go
@@ -364,3 +366,16 @@ func queryDirectlyThroughMatching( | |||
QueryRejected: matchingResp.GetQueryRejected(), | |||
}}, err | |||
} | |||
|
|||
func GetWorkflowVersioningInfoMatchingTask(msResp *historyservice.GetMutableStateResponse) *deploymentpb.WorkflowVersioningInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything besides Invoke
in api
dir must be unexported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this.
@@ -216,6 +218,7 @@ func (u *Updater) ApplyRequest( | |||
ms.GetMostRecentWorkerVersionStamp(), | |||
ms.HasCompletedAnyWorkflowTask(), | |||
) | |||
u.workflowVersioningInfo = workflow.GetWorkflowVersioningInfoMatchingTask(ms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
u.workflowVersioningInfo = workflow.GetWorkflowVersioningInfoMatchingTask(ms) | |
u.workflowVersioningInfo = common.CloneProto(workflow.GetWorkflowVersioningInfoMatchingTask(ms)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and u.directive
above too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workflowVersioningInfo is removed and u.directive does not come from MS, it's created fresh.
@@ -231,15 +234,15 @@ func (u *Updater) OnSuccess( | |||
// Speculative WFT was created and needs to be added directly to matching w/o transfer task. | |||
// TODO (alex): This code is copied from transferQueueActiveTaskExecutor.processWorkflowTask. | |||
// Helper function needs to be extracted to avoid code duplication. | |||
err := u.addWorkflowTaskToMatching(ctx, u.wfKey, u.taskQueue, u.scheduledEventID, u.scheduleToStartTimeout, u.directive) | |||
err := u.addWorkflowTaskToMatching(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't recall why we pass all these Updater
fields to Updater
method. @stephanos, is there a specific reason? Because this change makes perfect sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't see one. I must have transferred them from vars to fields and missed that obvious refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here: e819c3a
directive *taskqueuespb.TaskVersionDirective | ||
workflowVersioningInfo *deploymentpb.WorkflowVersioningInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move both of this under line 80. They need to be under this WARNING comment below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workflowVersioningInfo is removed and u.directive does not come from MS, it's created fresh.
@@ -216,13 +216,14 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask( | |||
|
|||
timeout := timestamp.DurationValue(ai.ScheduleToStartTimeout) | |||
directive := MakeDirectiveForActivityTask(mutableState, ai) | |||
wfVersioningInfo := workflow.GetWorkflowVersioningInfoMatchingTask(mutableState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see this scare NOTE below.
wfVersioningInfo := workflow.GetWorkflowVersioningInfoMatchingTask(mutableState) | |
wfVersioningInfo := common.CloneProto(workflow.GetWorkflowVersioningInfoMatchingTask(mutableState)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this
@@ -265,6 +266,8 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( | |||
|
|||
directive := MakeDirectiveForWorkflowTask(mutableState) | |||
|
|||
wfVersioningInfo := workflow.GetWorkflowVersioningInfoMatchingTask(mutableState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh boy, it is everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not anymore.
func (ms *MutableStateImpl) GetEffectiveDeployment() *deploymentpb.Deployment { | ||
versioningInfo := ms.GetExecutionInfo().GetVersioningInfo() | ||
if versioningInfo == nil { | ||
return nil | ||
} else if transition := versioningInfo.GetDeploymentTransition(); transition != nil { | ||
return transition.GetDeployment() | ||
} else if override := versioningInfo.GetVersioningOverride(); override != nil && | ||
override.GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED { | ||
return override.GetDeployment() | ||
} else if ms.GetEffectiveVersioningBehavior() != enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { | ||
return versioningInfo.GetDeployment() | ||
} | ||
return nil | ||
return GetEffectiveDeployment(ms.GetExecutionInfo().GetVersioningInfo()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like these names but I can't come up with something better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change later if you come up with something better :)
…er-versioning-info # Conflicts: # api/deployment/v1/message.pb.go # api/historyservice/v1/request_response.pb.go # api/persistence/v1/tasks.pb.go # api/taskqueue/v1/message.pb.go # proto/internal/temporal/server/api/deployment/v1/message.proto # proto/internal/temporal/server/api/historyservice/v1/request_response.proto
4b131f6
to
3a2c03c
Compare
@@ -258,6 +258,9 @@ message WorkflowExecutionInfo { | |||
|
|||
// When present, it means the workflow execution is versioned, or is transitioning from | |||
// unversioned workers to versioned ones. | |||
// Note: Deployment objects inside versioning info are immutable, never change their fields. | |||
// (-- api-linter: core::0203::immutable=disabled | |||
// aip.dev/not-precedent: field_behavior annotation not available in our gogo fork --) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did you copy this from? We are not on gogo fork anymore :-)
## What changed? <!-- Describe what has changed in this PR --> Workflow versioning info needs to be passed to tasks sent to Matching so they can be places in right queues. ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
## What changed? <!-- Describe what has changed in this PR --> Workflow versioning info needs to be passed to tasks sent to Matching so they can be places in right queues. ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
What changed?
Workflow versioning info needs to be passed to tasks sent to Matching so they can be places in right queues.
Why?
How did you test it?
Potential risks
Documentation
Is hotfix candidate?