Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batch update options #6894

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions client/frontend/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions client/frontend/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions client/frontend/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions common/rpc/interceptor/logtags/workflow_service_server_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.temporal.io/api v1.39.1-0.20241121195644-de125cd2868b
go.temporal.io/api v1.42.1-0.20241127002502-4bef51a251c3
go.temporal.io/sdk v1.29.1
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
go.temporal.io/api v1.39.1-0.20241121195644-de125cd2868b h1:yyC9IlAA9dvluyaECfsKNJbUluB9r1W6q4GO4pj6K/g=
go.temporal.io/api v1.39.1-0.20241121195644-de125cd2868b/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.42.1-0.20241127002502-4bef51a251c3 h1:mEFYZlGzw0i4sEKdf5mrFl4QpsO+UGlKkqIcF/Tu5lY=
go.temporal.io/api v1.42.1-0.20241127002502-4bef51a251c3/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0=
go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
43 changes: 32 additions & 11 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,18 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows
return resp, nil
}

func (wh *WorkflowHandler) PauseActivityById(ctx context.Context, _ *workflowservice.PauseActivityByIdRequest) (*workflowservice.PauseActivityByIdResponse, error) {
return nil, serviceerror.NewUnimplemented("pause activity by id not implemented")
}

func (wh *WorkflowHandler) ResetActivityById(ctx context.Context, _ *workflowservice.ResetActivityByIdRequest) (*workflowservice.ResetActivityByIdResponse, error) {
return nil, serviceerror.NewUnimplemented("reset activity by id not implemented")
}

func (wh *WorkflowHandler) UnpauseActivityById(ctx context.Context, _ *workflowservice.UnpauseActivityByIdRequest) (*workflowservice.UnpauseActivityByIdResponse, error) {
return nil, serviceerror.NewUnimplemented("unpause activity by id not implemented")
}

// RespondQueryTaskCompleted is called by application worker to complete a QueryTask (which is a WorkflowTask for query)
// as a result of 'PollWorkflowTaskQueue' API call. Completing a QueryTask will unblock the client call to 'QueryWorkflow'
// API and return the query result to client as a response to 'QueryWorkflow' API call.
Expand Down Expand Up @@ -4234,6 +4246,7 @@ func (wh *WorkflowHandler) StartBatchOperation(
var operationType string
var signalParams batcher.SignalParams
var resetParams batcher.ResetParams
var updateOptionsParams batcher.UpdateOptionsParams
switch op := request.Operation.(type) {
case *workflowservice.StartBatchOperationRequest_TerminationOperation:
identity = op.TerminationOperation.GetIdentity()
Expand Down Expand Up @@ -4270,23 +4283,29 @@ func (wh *WorkflowHandler) StartBatchOperation(
resetParams.ResetType = resetType
resetParams.ResetReapplyType = op.ResetOperation.GetResetReapplyType()
}
case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation:
identity = op.UpdateWorkflowOptionsOperation.GetIdentity()
operationType = batcher.BatchTypeUpdateOptions
updateOptionsParams.WorkflowExecutionOptions = op.UpdateWorkflowOptionsOperation.GetWorkflowExecutionOptions()
updateOptionsParams.UpdateMask = op.UpdateWorkflowOptionsOperation.GetUpdateMask()

default:
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("The operation type %T is not supported", op))
}

input := &batcher.BatchParams{
Namespace: request.GetNamespace(),
Query: request.GetVisibilityQuery(),
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
Namespace: request.GetNamespace(),
Query: request.GetVisibilityQuery(),
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
}
inputPayload, err := sdk.PreferProtoDataConverter.ToPayloads(input)
if err != nil {
Expand Down Expand Up @@ -4449,6 +4468,8 @@ func (wh *WorkflowHandler) DescribeBatchOperation(
operationType = enumspb.BATCH_OPERATION_TYPE_DELETE
case batcher.BatchTypeReset:
operationType = enumspb.BATCH_OPERATION_TYPE_RESET
case batcher.BatchTypeUpdateOptions:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
default:
operationType = enumspb.BATCH_OPERATION_TYPE_UNSPECIFIED
wh.throttledLogger.Warn("Unknown batch operation type", tag.NewStringTag("batch-operation-type", operationTypeString))
Expand Down
Loading
Loading