Skip to content

Commit

Permalink
Follow ReusePolicy AllowDuplicateFailedOnly with TerminateExisting
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Jan 17, 2025
1 parent 46236b0 commit 1feda6c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 43 deletions.
65 changes: 32 additions & 33 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,39 +278,38 @@ func NewConfig(
InternalFEGlobalNamespaceVisibilityRPS: dynamicconfig.InternalFrontendGlobalNamespaceVisibilityRPS.Get(dc),
// Overshoot since these low rate limits don't work well in an uncoordinated global limiter.
GlobalNamespaceNamespaceReplicationInducingAPIsRPS: dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc),
ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc),
ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc),
ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc),
ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc),
ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc),
MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc),
DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc),
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc),
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc),
DisallowQuery: dynamicconfig.DisallowQuery.Get(dc),
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
FollowReusePolicyAfterConflictPolicyTerminate: dynamicconfig.FollowReusePolicyAfterConflictPolicyTerminate.Get(dc),
DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc),
EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc),
EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc),
KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc),
KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc),
KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc),
KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc),
KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc),
KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc),
KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
WorkerBuildIdSizeLimit: dynamicconfig.WorkerBuildIdSizeLimit.Get(dc),
ReachabilityTaskQueueScanLimit: dynamicconfig.ReachabilityTaskQueueScanLimit.Get(dc),
ReachabilityQueryBuildIdLimit: dynamicconfig.ReachabilityQueryBuildIdLimit.Get(dc),
ReachabilityCacheOpenWFsTTL: dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc),
ReachabilityCacheClosedWFsTTL: dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc),
ReachabilityQuerySetDurationSinceDefault: dynamicconfig.ReachabilityQuerySetDurationSinceDefault.Get(dc),
MaxBadBinaries: dynamicconfig.FrontendMaxBadBinaries.Get(dc),
DisableListVisibilityByFilter: dynamicconfig.DisableListVisibilityByFilter.Get(dc),
BlobSizeLimitError: dynamicconfig.BlobSizeLimitError.Get(dc),
BlobSizeLimitWarn: dynamicconfig.BlobSizeLimitWarn.Get(dc),
ThrottledLogRPS: dynamicconfig.FrontendThrottledLogRPS.Get(dc),
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
VisibilityArchivalQueryMaxPageSize: dynamicconfig.VisibilityArchivalQueryMaxPageSize.Get(dc),
DisallowQuery: dynamicconfig.DisallowQuery.Get(dc),
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
DefaultWorkflowTaskTimeout: dynamicconfig.DefaultWorkflowTaskTimeout.Get(dc),
EnableServerVersionCheck: dynamicconfig.EnableServerVersionCheck.Get(dc),
EnableTokenNamespaceEnforcement: dynamicconfig.EnableTokenNamespaceEnforcement.Get(dc),
KeepAliveMinTime: dynamicconfig.KeepAliveMinTime.Get(dc),
KeepAlivePermitWithoutStream: dynamicconfig.KeepAlivePermitWithoutStream.Get(dc),
KeepAliveMaxConnectionIdle: dynamicconfig.KeepAliveMaxConnectionIdle.Get(dc),
KeepAliveMaxConnectionAge: dynamicconfig.KeepAliveMaxConnectionAge.Get(dc),
KeepAliveMaxConnectionAgeGrace: dynamicconfig.KeepAliveMaxConnectionAgeGrace.Get(dc),
KeepAliveTime: dynamicconfig.KeepAliveTime.Get(dc),
KeepAliveTimeout: dynamicconfig.KeepAliveTimeout.Get(dc),

DeleteNamespaceDeleteActivityRPS: dynamicconfig.DeleteNamespaceDeleteActivityRPS.Get(dc),
DeleteNamespacePageSize: dynamicconfig.DeleteNamespacePageSize.Get(dc),
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4800,6 +4800,7 @@ func (wh *WorkflowHandler) validateWorkflowIdReusePolicy(
reusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE &&
wh.followReusePolicyAfterConflictPolicyTerminate(namespaceName.String()) {
return errIncompatibleIDReusePolicyRejectDuplicate
return errIncompatibleIDReusePolicyTerminateIfRunning
}
return nil
}
Expand Down
28 changes: 28 additions & 0 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,34 @@ func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReuse
s.NoError(err)
}

func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_RejectDuplicate() {

Check failure on line 676 in service/frontend/workflow_handler_test.go

View workflow job for this annotation

GitHub Actions / golangci

method WorkflowHandlerSuite.TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_RejectDuplicate already declared at service/frontend/workflow_handler_test.go:648:32

Check failure on line 676 in service/frontend/workflow_handler_test.go

View workflow job for this annotation

GitHub Actions / Unit test

method WorkflowHandlerSuite.TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_RejectDuplicate already declared at service/frontend/workflow_handler_test.go:648:32

Check failure on line 676 in service/frontend/workflow_handler_test.go

View workflow job for this annotation

GitHub Actions / Unit test

method WorkflowHandlerSuite.TestStartWorkflowExecution_InvalidWorkflowIdReusePolicy_RejectDuplicate already declared at service/frontend/workflow_handler_test.go:648:32
req := &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: testWorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: "WORKFLOW"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "TASK_QUEUE", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
}

// by default, disallow
config := s.newConfig()
wh := s.getWorkflowHandler(config)
resp, err := wh.StartWorkflowExecution(context.Background(), req)
s.Nil(resp)
s.Equal(err, serviceerror.NewInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING."))

// allow if explicitly allowed
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil)
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespace.NewID(), nil)
s.mockHistoryClient.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).Return(&historyservice.StartWorkflowExecutionResponse{Started: true}, nil)

config.AllowReusePolicyRejectWithConflictPolicyTerminate = dc.GetBoolPropertyFnFilteredByNamespace(true)

Check failure on line 698 in service/frontend/workflow_handler_test.go

View workflow job for this annotation

GitHub Actions / golangci

config.AllowReusePolicyRejectWithConflictPolicyTerminate undefined (type *Config has no field or method AllowReusePolicyRejectWithConflictPolicyTerminate) (typecheck)

Check failure on line 698 in service/frontend/workflow_handler_test.go

View workflow job for this annotation

GitHub Actions / Unit test

config.AllowReusePolicyRejectWithConflictPolicyTerminate undefined (type *Config has no field or method AllowReusePolicyRejectWithConflictPolicyTerminate)

Check failure on line 698 in service/frontend/workflow_handler_test.go

View workflow job for this annotation

GitHub Actions / Unit test

config.AllowReusePolicyRejectWithConflictPolicyTerminate undefined (type *Config has no field or method AllowReusePolicyRejectWithConflictPolicyTerminate)
wh = s.getWorkflowHandler(config)
_, err = wh.StartWorkflowExecution(context.Background(), req)
s.NoError(err)
}

func (s *WorkflowHandlerSuite) TestStartWorkflowExecution_DefaultWorkflowIdDuplicationPolicies() {
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(gomock.Any()).Return(nil, nil)
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespace.NewID(), nil)
Expand Down
58 changes: 49 additions & 9 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package startworkflow
import (
"context"
"errors"
"fmt"
"sync"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand All @@ -37,6 +39,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/enums"
"go.temporal.io/server/common/locks"
"go.temporal.io/server/common/metrics"
Expand All @@ -53,6 +56,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

var killOnce sync.Once

type (
eagerStartDeniedReason metrics.ReasonString
BeforeCreateHookFunc func(lease api.WorkflowLease) error
Expand All @@ -74,13 +79,14 @@ const (

// Starter starts a new workflow execution.
type Starter struct {
shardContext shard.Context
workflowConsistencyChecker api.WorkflowConsistencyChecker
tokenSerializer common.TaskTokenSerializer
visibilityManager manager.VisibilityManager
request *historyservice.StartWorkflowExecutionRequest
namespace *namespace.Namespace
createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc
shardContext shard.Context
workflowConsistencyChecker api.WorkflowConsistencyChecker
tokenSerializer common.TaskTokenSerializer
visibilityManager manager.VisibilityManager
request *historyservice.StartWorkflowExecutionRequest
namespace *namespace.Namespace
createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc
followReusePolicyRejectFailedOnlyWithConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]
}

// creationParams is a container for all information obtained from creating the uncommitted execution.
Expand Down Expand Up @@ -124,6 +130,7 @@ func NewStarter(
request: request,
namespace: namespaceEntry,
createOrUpdateLeaseFn: createLeaseFn,
followReusePolicyRejectFailedOnlyWithConflictPolicyTerminate: shardContext.GetConfig().FollowReusePolicyRejectFailedOnlyWithConflictPolicyTerminate,
}, nil
}

Expand Down Expand Up @@ -431,6 +438,35 @@ func (s *Starter) resolveDuplicateWorkflowID(
return nil, StartNew, nil
}

killOnce.Do(func() {
err = api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
definition.NewWorkflowKey(
s.namespace.ID().String(),
workflowID,
currentWorkflowConditionFailed.RunID,
),
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
return api.UpdateWorkflowTerminate, workflow.TerminateWorkflow(
mutableState,
"TESTING",
nil,
"TESTING",
false,
nil,
)
},
nil,
s.shardContext,
s.workflowConsistencyChecker,
)
if err != nil {
panic(err)
}
})

var workflowLease api.WorkflowLease
var mutableStateInfo *mutableStateInfo
// Update current execution and create new execution in one transaction.
Expand Down Expand Up @@ -493,8 +529,12 @@ func (s *Starter) resolveDuplicateWorkflowID(
resp, err := s.generateResponse(newRunID, mutableStateInfo.workflowTask, events)
return resp, StartNew, err
case consts.ErrWorkflowCompleted:
// current workflow already closed
// fallthough to the logic for only creating the new workflow below
if s.followReusePolicyRejectFailedOnlyWithConflictPolicyTerminate(s.namespace.ID().String()) {
// Exit and retry again from the top.
// By returning an Unavailable service error, the entire Start request will be retried.
return nil, StartErr, serviceerror.NewUnavailable(fmt.Sprintf("Termination failed: %v", err))
}
// Fallthough to the logic for only creating the new workflow below.
return nil, StartNew, nil
default:
return nil, StartErr, err
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ type Config struct {
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn

FollowReusePolicyRejectFailedOnlyWithConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]

// ArchivalQueueProcessor settings
ArchivalProcessorSchedulerWorkerCount dynamicconfig.TypedSubscribable[int]
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
Expand Down
6 changes: 5 additions & 1 deletion tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution() {
}

func (s *WorkflowTestSuite) TestStartWorkflowExecution_Terminate() {

// setting this to 0 to be sure we are terminating old workflow
s.OverrideDynamicConfig(dynamicconfig.WorkflowIdReuseMinimalInterval, 0)

Expand All @@ -147,6 +146,11 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution_Terminate() {
enumspb.WORKFLOW_ID_REUSE_POLICY_UNSPECIFIED,
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
},
{
"TerminateExisting with AllowDuplicateFailedOnly",
enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
},
}

for i, tc := range testCases {
Expand Down

0 comments on commit 1feda6c

Please sign in to comment.