Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deny ReusePolicy RejectDuplicate for ConflictPolicy TerminateExisting #7099

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2077,6 +2077,13 @@ the user has not specified an explicit RetryPolicy`,
retrypolicy.DefaultDefaultRetrySettings,
`DefaultWorkflowRetryPolicy represents the out-of-box retry policy for unset fields
where the user has set an explicit RetryPolicy, but not specified all the fields`,
)
FollowReusePolicyRejectAfterConflictPolicyTerminate = NewNamespaceTypedSetting(
"history.followReusePolicyRejectAfterConflictPolicyTerminate",
true,
`Follows WorkflowIdReusePolicy RejectDuplicate and RejectDuplicateFailedOnly after WorkflowIdReusePolicy TerminateExisting was applied.
If true (the default), RejectDuplicate is disallowed and RejectDuplicateFailedOnly will be honored after TerminateExisting is applied.
This configuration will be become the default behavior in the next release and removed subsequently.`,
)
AllowResetWithPendingChildren = NewNamespaceBoolSetting(
"history.allowResetWithPendingChildren",
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ var (
errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errUseVersioningWithoutNormalName = serviceerror.NewInvalidArgument("NormalName must be set on sticky queue if UseVersioning is true.")
errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.")
errIncompatibleIDReusePolicy = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.")
errIncompatibleIDReusePolicyTerminateIfRunning = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy")
errIncompatibleIDReusePolicyRejectDuplicate = serviceerror.NewInvalidArgument("Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING")
errUseEnhancedDescribeOnStickyQueue = serviceerror.NewInvalidArgument("Enhanced DescribeTaskQueue is not valid for a sticky queue, use api_mode=UNSPECIFIED or a normal queue.")
errUseEnhancedDescribeOnNonRootQueue = serviceerror.NewInvalidArgument("Enhanced DescribeTaskQueue is not valid for non-root queue partitions, use api_mode=UNSPECIFIED or a normal queue root name.")
errTaskQueuePartitionInvalid = serviceerror.NewInvalidArgument("Task Queue Partition invalid, use a different Task Queue or Task Queue Type")
Expand Down
67 changes: 35 additions & 32 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ type Config struct {
// specified RetryPolicy
DefaultWorkflowRetryPolicy dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]

FollowReusePolicyRejectAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]

// VisibilityArchival system protection
VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn

Expand Down Expand Up @@ -276,38 +278,39 @@ func NewConfig(
InternalFEGlobalNamespaceVisibilityRPS: dynamicconfig.InternalFrontendGlobalNamespaceVisibilityRPS.Get(dc),
// Overshoot since these low rate limits don't work well in an uncoordinated global limiter.
GlobalNamespaceNamespaceReplicationInducingAPIsRPS: dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc),
ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc),
ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc),
ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc),
ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc),
ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc),
MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc),
DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc),
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc),
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc),
DisallowQuery: dynamicconfig.DisallowQuery.Get(dc),
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc),
EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc),
EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc),
KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc),
KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc),
KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc),
KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc),
KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc),
KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc),
KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc),
ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc),
ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc),
ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc),
ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc),
ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc),
MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc),
DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc),
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc),
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc),
DisallowQuery: dynamicconfig.DisallowQuery.Get(dc),
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
FollowReusePolicyRejectAfterConflictPolicyTerminate: dynamicconfig.FollowReusePolicyRejectAfterConflictPolicyTerminate.Get(dc),
DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc),
EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc),
EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc),
KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc),
KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc),
KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc),
KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc),
KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc),
KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc),
KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc),

DeleteNamespaceDeleteActivityRPS: dynamicconfig.DeleteNamespaceDeleteActivityRPS.Get(dc),
DeleteNamespacePageSize: dynamicconfig.DeleteNamespacePageSize.Get(dc),
Expand Down
103 changes: 57 additions & 46 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,34 +119,35 @@ type (
workflowservice.UnsafeWorkflowServiceServer
status int32

tokenSerializer common.TaskTokenSerializer
config *Config
versionChecker headers.VersionChecker
namespaceHandler *namespaceHandler
getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
visibilityMgr manager.VisibilityManager
logger log.Logger
throttledLogger log.Logger
persistenceExecutionName string
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata cluster.Metadata
historyClient historyservice.HistoryServiceClient
matchingClient matchingservice.MatchingServiceClient
deploymentStoreClient deployment.DeploymentStoreClient
archiverProvider provider.ArchiverProvider
payloadSerializer serialization.Serializer
namespaceRegistry namespace.Registry
saMapperProvider searchattribute.MapperProvider
saProvider searchattribute.Provider
saValidator *searchattribute.Validator
archivalMetadata archiver.ArchivalMetadata
healthServer *health.Server
overrides *Overrides
membershipMonitor membership.Monitor
healthInterceptor *interceptor.HealthInterceptor
scheduleSpecBuilder *scheduler.SpecBuilder
outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]]
httpEnabled bool
tokenSerializer common.TaskTokenSerializer
config *Config
versionChecker headers.VersionChecker
namespaceHandler *namespaceHandler
getDefaultWorkflowRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
followReusePolicyRejectAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]
visibilityMgr manager.VisibilityManager
logger log.Logger
throttledLogger log.Logger
persistenceExecutionName string
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata cluster.Metadata
historyClient historyservice.HistoryServiceClient
matchingClient matchingservice.MatchingServiceClient
deploymentStoreClient deployment.DeploymentStoreClient
archiverProvider provider.ArchiverProvider
payloadSerializer serialization.Serializer
namespaceRegistry namespace.Registry
saMapperProvider searchattribute.MapperProvider
saProvider searchattribute.Provider
saValidator *searchattribute.Validator
archivalMetadata archiver.ArchivalMetadata
healthServer *health.Server
overrides *Overrides
membershipMonitor membership.Monitor
healthInterceptor *interceptor.HealthInterceptor
scheduleSpecBuilder *scheduler.SpecBuilder
outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]]
httpEnabled bool
}
)

Expand Down Expand Up @@ -177,7 +178,6 @@ func NewWorkflowHandler(
scheduleSpecBuilder *scheduler.SpecBuilder,
httpEnabled bool,
) *WorkflowHandler {

handler := &WorkflowHandler{
status: common.DaemonStatusInitialized,
config: config,
Expand All @@ -193,21 +193,22 @@ func NewWorkflowHandler(
timeSource,
config,
),
getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
visibilityMgr: visibilityMgr,
logger: logger,
throttledLogger: throttledLogger,
persistenceExecutionName: persistenceExecutionName,
clusterMetadataManager: clusterMetadataManager,
clusterMetadata: clusterMetadata,
historyClient: historyClient,
matchingClient: matchingClient,
deploymentStoreClient: deploymentStoreClient,
archiverProvider: archiverProvider,
payloadSerializer: payloadSerializer,
namespaceRegistry: namespaceRegistry,
saProvider: saProvider,
saMapperProvider: saMapperProvider,
getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
followReusePolicyRejectAfterConflictPolicyTerminate: config.FollowReusePolicyRejectAfterConflictPolicyTerminate,
visibilityMgr: visibilityMgr,
logger: logger,
throttledLogger: throttledLogger,
persistenceExecutionName: persistenceExecutionName,
clusterMetadataManager: clusterMetadataManager,
clusterMetadata: clusterMetadata,
historyClient: historyClient,
matchingClient: matchingClient,
deploymentStoreClient: deploymentStoreClient,
archiverProvider: archiverProvider,
payloadSerializer: payloadSerializer,
namespaceRegistry: namespaceRegistry,
saProvider: saProvider,
saMapperProvider: saMapperProvider,
saValidator: searchattribute.NewValidator(
saProvider,
saMapperProvider,
Expand Down Expand Up @@ -468,7 +469,10 @@ func (wh *WorkflowHandler) prepareStartWorkflowRequest(
return nil, err
}

if err := wh.validateWorkflowIdReusePolicy(request.WorkflowIdReusePolicy, request.WorkflowIdConflictPolicy); err != nil {
if err := wh.validateWorkflowIdReusePolicy(
namespaceName,
request.WorkflowIdReusePolicy,
request.WorkflowIdConflictPolicy); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1976,6 +1980,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
}

if err := wh.validateWorkflowIdReusePolicy(
namespaceName,
request.WorkflowIdReusePolicy,
request.WorkflowIdConflictPolicy,
); err != nil {
Expand Down Expand Up @@ -4777,12 +4782,18 @@ func (wh *WorkflowHandler) validateVersionRuleBuildId(request *workflowservice.U
}

func (wh *WorkflowHandler) validateWorkflowIdReusePolicy(
namespaceName namespace.Name,
reusePolicy enumspb.WorkflowIdReusePolicy,
conflictPolicy enumspb.WorkflowIdConflictPolicy,
) error {
if conflictPolicy != enumspb.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED &&
reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING {
return errIncompatibleIDReusePolicy
return errIncompatibleIDReusePolicyTerminateIfRunning
}
if conflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING &&
reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE &&
wh.followReusePolicyRejectAfterConflictPolicyTerminate(namespaceName.String()) {
return errIncompatibleIDReusePolicyRejectDuplicate
}
return nil
}
Expand Down
Loading
Loading