Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker Deployment Version Drainage Status #7158

Merged
merged 18 commits into from
Jan 29, 2025
Merged

Conversation

carlydf
Copy link
Contributor

@carlydf carlydf commented Jan 24, 2025

What changed?

Add DrainageStatus child workflow to worker deployment system

Why?

To periodically update the version workflow with results from visibility.

How did you test it?

Functional tests. Currently broken (see todo comment in code)

Potential risks

Documentation

Is hotfix candidate?

@carlydf carlydf requested a review from a team as a code owner January 24, 2025 10:06
@carlydf carlydf marked this pull request as draft January 24, 2025 10:06
service/worker/workerdeployment/workflow.go Outdated Show resolved Hide resolved
service/worker/workerdeployment/workflow.go Outdated Show resolved Hide resolved
service/worker/workerdeployment/version_workflow.go Outdated Show resolved Hide resolved
Comment on lines 283 to 286
if args.GetCurrentSinceTime().AsTime().Equal(d.GetVersionState().GetCurrentSinceTime().AsTime()) &&
args.GetRampingSinceTime().AsTime().Equal(d.GetVersionState().GetRampingSinceTime().AsTime()) {
res := &deploymentspb.SyncVersionStateResponse{VersionState: d.VersionState}
return temporal.NewApplicationError("no change", errNoChangeType, res)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud: I believe the idea is to use the function handleSyncState when we want to notify changes related to both ramping as well as current right?

I am having a hard time convincing myself that this validator is doing the right thing, now that we are trying to add ramping - in my head, the two operations are to a certain extent mutually exclusive. If a version x is already current, and the user calls SetCurrent on it again, shouldn't we be evaluating only whether GetCurrentSinceTime is equal to the local state? Why should we check the ramping time?

I know we are going to set the value of ramping_since_time when setCurrent is called too, but how are we going to pass in the same value stored inside d.GetVersionState().GetRampingSinceTime() from SetCurrent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when SetCurrent is called, ramping_since_time will be set to nil, and we want to sync both values. Similarly, if SetRamp is called, current_since_time will be set to nil, which we want to sync across the workflows/task queues that need to know that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to make more sense

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change more - no need to evaluate the timing since current was set just check if it was current/ramping

// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: 'Since' captures the field semantics despite being a preposition. --)
// Nil if not ramping.
google.protobuf.Timestamp ramping_since_time = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud: if we are planning on using version_workflow.handleSyncState to also set a version to ramping, this should have a ramp percentage set right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think so! I was thinking to wait to add that until the ramp PR, but we could pre-emptively add the field now. I'll at least add a todo

WorkerDeploymentWorkflowType = "temporal-sys-worker-deployment-workflow"
WorkerDeploymentVersionWorkflowType = "temporal-sys-worker-deployment-version-workflow"
WorkerDeploymentWorkflowType = "temporal-sys-worker-deployment-workflow"
WorkerDeploymentCheckDrainageWorkflowType = "temporal-sys-worker-deployment-check-drainage-workflow"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temporal-sys-worker-deployment-check-drainage-workflow -> temporal-sys-worker-deployment-drainage-workflow

no need to keep it super verbose; wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice I like that

service/worker/workerdeployment/client.go Show resolved Hide resolved
Comment on lines 372 to 375
if wasAcceptingNewWorkflows && !isAcceptingNewWorkflows {
state.DrainageInfo = &deploymentpb.VersionDrainageInfo{}
workflow.ExecuteChildWorkflow(ctx, WorkerDeploymentCheckDrainageWorkflowType, d.a.namespace)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

say we have a version workflow that has become draining and kickstarts this workflow - now, say this version workflow CAN's - I might have missed this but I don't see a PARENT_CLOSE_POLICY for this child workflow

IMHO, we should:

  1. set the policy to terminate so that the workflow terminates when we CAN
  2. In the version_workflow.run function, if the drainage state is set to draining, we simply kickstart this child workflow again which shall continue execution

This will also call for the todo statement you have inside of drainage_workflow.go to be implemented since if this version workflow were to accept executions, we would have to send a signal to this workflow to stop carrying out execution

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I need to add the code handling CaN of the parent and also handling termination of the child workflow if the parent version starts accepting traffic again

Comment on lines 404 to 409
DeploymentName: d.VersionState.DeploymentName,
Version: d.VersionState.Version,
CurrentSinceTime: d.VersionState.CurrentSinceTime,
RampingSinceTime: d.VersionState.RampingSinceTime,
CreateTime: d.VersionState.CreateTime,
DrainageInfo: d.VersionState.DrainageInfo,
Copy link
Member

@Shivs11 Shivs11 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there point altering the workflow memo of the version workflow to store this? IMO, the 3.1 API's don't require querying visibility to receive information from version workflows directly. We only require these version workflows return their local state during DescribeVersion calls.

I think we should alter the worker-deployment workflow memo to have these additional fields you have added since the deployment workflow will now be the source-of-truth for things like "current-version", "ramping-version", etc. (RoutingInfo, basically)

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our discussion on what the memo is used for (to give info via DescribeWorkflowExecution and ListWorkflowExecutions) I removed everything here, because DescribeVersion uses query, not memo. I have a hunch that we might find it useful to put drainage info in here at some point, but we should only add that when we have a use-case for it

@carlydf carlydf marked this pull request as ready for review January 28, 2025 07:08
google.protobuf.Timestamp routing_update_time = 4;
// If this version is the current version of its deployment.
bool is_current = 5;
// Range: [0, 100]. Must be zero if is_current is true. Must be non-zero if `version.version`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may have to change this comment since we don't have a version.version (or we may have in the near future based on Shahab's latest API refinements)

Comment on lines 281 to 291
go s.pollFromDeployment(ctx, tv2)
s.EventuallyWithT(func(t *assert.CollectT) {
a := assert.New(t)
resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{
Namespace: s.Namespace().String(),
Version: tv2.BuildID(),
})
a.NoError(err)
a.Equal(tv2.DeploymentSeries(), resp.GetWorkerDeploymentVersionInfo().GetDeploymentName())
a.Equal(tv2.BuildID(), resp.GetWorkerDeploymentVersionInfo().GetVersion())
}, time.Second*5, time.Millisecond*200)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not be what you are intending to do - I assume you wanted to start a new version workflow, under the same deployment name, right? However, both tv1 and tv2 are using tv.BuildID() which generates the same BuildID making only one version workflow registered

IMHO, maybe try using .WithBuildIDNumber(number) for both these separate tvs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since I initiated them with this, now the build ids are different whenever I do tv.BuildId(). Thanks for catching this!

	tv1 := testvars.New(s).WithBuildIDNumber(1)
	tv2 := testvars.New(s).WithBuildIDNumber(2)

@@ -192,3 +207,173 @@ func (s *DeploymentVersionSuite) Name() string {
farm.Fingerprint32([]byte(fullName)),
)
}

func (s *DeploymentVersionSuite) TestDrainageStatus_SetCurrentVersion_NoOpenWFs() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests the new parts of the feature (version stops accepting new executions --> child workflow starts --> child sets the status to DRAINING and sleeps for visibilityGracePeriod --> child queries visibility, counts 0 open workflows, and sets the status to DRAINED).

It does not test the visibility query, since there are no open workflows, but the visibility query code has not changed from the pre-release, so that should all still work. There are TODOs to test it in the comments below.

Copy link
Member

@Shivs11 Shivs11 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't yet peeked at the functional tests but the code looks nice - thanks for the annoying refactorings required with the latest API refinements too - I shall take a peek at the tests sometime later today but merging to iterate faster

Comment on lines +156 to +172
message SyncVersionStateUpdateArgs {
// Last time `current_since_time`, `ramping_since_time, or `ramp_percentage` of this version changed.
google.protobuf.Timestamp routing_update_time = 4;

message SetCurrent {
// If last_became_current_time is present, then set the deployment's
// last_became_current_time to it and set is_current true. If it's missing,
// set is_current false.
google.protobuf.Timestamp last_became_current_time = 1;
}
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: 'Since' captures the field semantics despite being a preposition. --)
// Nil if not current.
google.protobuf.Timestamp current_since_time = 5;

// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: 'Since' captures the field semantics despite being a preposition. --)
// Nil if not ramping. Updated when the version first starts ramping, not on each ramp change.
google.protobuf.Timestamp ramping_since_time = 6;

// Range: [0, 100]. Must be zero if the version is not ramping (i.e. `ramping_since_time` is nil).
// Can be in the range [0, 100] if the version is ramping.
float ramp_percentage = 7;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these new fields are nice but they can be numbered from 1 since we haven't released this feature yet to customers and changes will be backwards compatible

if info.Status == enumspb.VERSION_DRAINAGE_STATUS_DRAINED {
return nil
}
workflow.Sleep(ctx, refreshInterval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole new file is really nice and the workflow makes sense - I wonder, in the future certainly not right now, if we want to implement a backoff strategy when the workflow sleeps

just some food for thought

},
WorkflowVersioningMode: 0, // todo
CreateTime: now,
RoutingUpdateTime: now,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think I might have realized there might be a lag in our implementation: Assume the following flow:

RegisterTaskQueueWorker -> updateWithStartWorkerDeploymentVersion -> handleRegisterWorker

In the piece of code highlighted, we update the routingUpdateTime to be now which is right. This shall make the local state of the version wf have the routingUpdateTime to also be now.

Now, when the update is going to take place, in handleRegisterWorker, we update the routingUpdateTime for the task-queue to be registered by again doing time.Now(). This may cause a lag between the local state's update time and the time being sent to the task-queue.

In handleRegisterWorker:236

	// initial data
	data := &deploymentspb.DeploymentVersionData{
		Version:           d.VersionState.Version,
		RoutingUpdateTime: timestamppb.Now(),
		CurrentSinceTime:  nil, // not current
		RampingSinceTime:  nil, // not ramping
		RampPercentage:    0,   // not ramping
		FirstPollerTime:   args.FirstPollerTime,
	}

again, not a big deal given that routing will work - but just some food for thought if we should take the extra step and achieve no lag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm yeah I think it would be better to pass the original routing time along

@Shivs11
Copy link
Member

Shivs11 commented Jan 29, 2025

merging this to speed up development - more tests to follow
cc - @carlydf

@Shivs11 Shivs11 merged commit 3d40535 into versioning-3.1 Jan 29, 2025
10 checks passed
@Shivs11 Shivs11 deleted the cdf/drainage branch January 29, 2025 14:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants