Skip to content

Commit

Permalink
handle workflow already started error
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai committed Sep 13, 2023
1 parent 6a9180d commit f6578e3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
15 changes: 12 additions & 3 deletions service/api/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,21 @@ func (t *cadenceClient) StartInterpreterWorkflow(ctx context.Context, options ap

func (t *cadenceClient) StartWaitForStateCompletionWorkflow(ctx context.Context, options api.StartWorkflowOptions) (runId string, err error) {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskList: options.TaskQueue,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate,
ID: options.ID,
TaskList: options.TaskQueue,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate,
ExecutionStartToCloseTimeout: 600 * time.Second, // TODO, make this configurable
}
run, err := t.cClient.StartWorkflow(ctx, workflowOptions, cadence.WaitforStateCompletionWorkflow)
if err != nil {
if t.IsWorkflowAlreadyStartedError(err) {
describeResponse, error := t.cClient.DescribeWorkflowExecution(ctx, workflowOptions.ID, "")
if error != nil {
return "", error
}

return *describeResponse.WorkflowExecutionInfo.Execution.RunId, nil
}
return "", err
}
return run.RunID, nil
Expand Down
2 changes: 2 additions & 0 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(ctx context.Context, r
return nil, s.handleError(err)
}

return nil, nil

subCtx, cancFunc := utils.TrimContextByTimeoutWithCappedDDL(ctx, iwfidl.PtrInt32(60), s.config.Api.MaxWaitSeconds)
defer cancFunc()
var output service.WaitForStateCompletionWorkflowOutput
Expand Down
2 changes: 1 addition & 1 deletion service/api/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *temporalClient) StartInterpreterWorkflow(ctx context.Context, options a
func (t *temporalClient) StartWaitForStateCompletionWorkflow(ctx context.Context, options api.StartWorkflowOptions) (runId string, err error) {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
TaskQueue: options.TaskQueue,
}

Expand Down
10 changes: 7 additions & 3 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package interpreter
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/compatibility"
"github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/rpc"
"github.com/indeedeng/iwf/service/common/urlautofix"
"github.com/indeedeng/iwf/service/interpreter/env"
"io/ioutil"
"net/http"
"os"
)

// StateStart is Deprecated, will be removed in next release
Expand Down Expand Up @@ -59,6 +60,9 @@ func StateDecide(ctx context.Context, backendType service.BackendType, input ser
}

func StateApiExecute(ctx context.Context, backendType service.BackendType, input service.StateDecideActivityInput) (*iwfidl.WorkflowStateDecideResponse, error) {
defer func() {

}()
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateDecideActivity", "input", input)
Expand Down

0 comments on commit f6578e3

Please sign in to comment.