From f6578e3817dcc0486bb9295e4e9fd447562eb373 Mon Sep 17 00:00:00 2001 From: duoertai Date: Wed, 13 Sep 2023 16:00:56 -0700 Subject: [PATCH] handle workflow already started error --- service/api/cadence/client.go | 15 ++++++++++++--- service/api/service.go | 2 ++ service/api/temporal/client.go | 2 +- service/interpreter/activityImpl.go | 10 +++++++--- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/service/api/cadence/client.go b/service/api/cadence/client.go index 9bfd71d5..f05bbc0d 100644 --- a/service/api/cadence/client.go +++ b/service/api/cadence/client.go @@ -116,12 +116,21 @@ func (t *cadenceClient) StartInterpreterWorkflow(ctx context.Context, options ap func (t *cadenceClient) StartWaitForStateCompletionWorkflow(ctx context.Context, options api.StartWorkflowOptions) (runId string, err error) { workflowOptions := client.StartWorkflowOptions{ - ID: options.ID, - TaskList: options.TaskQueue, - WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate, + ID: options.ID, + TaskList: options.TaskQueue, + WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate, + ExecutionStartToCloseTimeout: 600 * time.Second, // TODO, make this configurable } run, err := t.cClient.StartWorkflow(ctx, workflowOptions, cadence.WaitforStateCompletionWorkflow) if err != nil { + if t.IsWorkflowAlreadyStartedError(err) { + describeResponse, error := t.cClient.DescribeWorkflowExecution(ctx, workflowOptions.ID, "") + if error != nil { + return "", error + } + + return *describeResponse.WorkflowExecutionInfo.Execution.RunId, nil + } return "", err } return run.RunID, nil diff --git a/service/api/service.go b/service/api/service.go index d23ec1ed..9b0eefd1 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -132,6 +132,8 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(ctx context.Context, r return nil, s.handleError(err) } + return nil, nil + subCtx, cancFunc := utils.TrimContextByTimeoutWithCappedDDL(ctx, iwfidl.PtrInt32(60), s.config.Api.MaxWaitSeconds) defer cancFunc() var output service.WaitForStateCompletionWorkflowOutput diff --git a/service/api/temporal/client.go b/service/api/temporal/client.go index d25f968a..cbf2c0c1 100644 --- a/service/api/temporal/client.go +++ b/service/api/temporal/client.go @@ -122,7 +122,7 @@ func (t *temporalClient) StartInterpreterWorkflow(ctx context.Context, options a func (t *temporalClient) StartWaitForStateCompletionWorkflow(ctx context.Context, options api.StartWorkflowOptions) (runId string, err error) { workflowOptions := client.StartWorkflowOptions{ ID: options.ID, - WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, TaskQueue: options.TaskQueue, } diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index 04719ab0..a822f50f 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -3,6 +3,10 @@ package interpreter import ( "context" "fmt" + "io/ioutil" + "net/http" + "os" + "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" @@ -10,9 +14,6 @@ import ( "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" "github.com/indeedeng/iwf/service/interpreter/env" - "io/ioutil" - "net/http" - "os" ) // StateStart is Deprecated, will be removed in next release @@ -59,6 +60,9 @@ func StateDecide(ctx context.Context, backendType service.BackendType, input ser } func StateApiExecute(ctx context.Context, backendType service.BackendType, input service.StateDecideActivityInput) (*iwfidl.WorkflowStateDecideResponse, error) { + defer func() { + + }() provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateDecideActivity", "input", input)