Skip to content

Commit

Permalink
Revert "add cron schedule and workflow id reuse policy (#79)" (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 15, 2022
1 parent 9a5c25e commit fe375a9
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 70 deletions.
26 changes: 0 additions & 26 deletions service/api/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,10 @@ func (t *cadenceClient) Close() {
}

func (t *cadenceClient) StartInterpreterWorkflow(ctx context.Context, options api.StartWorkflowOptions, args ...interface{}) (runId string, err error) {
workflowIdReusePolicy, err := mapToCadenceWorkflowIdReusePolicy(options.WorkflowIDReusePolicy)
if err != nil {
return "", nil
}
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskList: options.TaskQueue,
ExecutionStartToCloseTimeout: options.WorkflowRunTimeout,
WorkflowIDReusePolicy: *workflowIdReusePolicy,
CronSchedule: options.CronSchedule,
}

run, err := t.cClient.ExecuteWorkflow(ctx, workflowOptions, cadence.Interpreter, args...)
Expand Down Expand Up @@ -149,26 +143,6 @@ func mapToIwfSearchAttributes(searchAttributes *shared.SearchAttributes) (map[st
return result, nil
}

func mapToCadenceWorkflowIdReusePolicy(workflowIdReusePolicy string) (*client.WorkflowIDReusePolicy, error) {
var res client.WorkflowIDReusePolicy
switch workflowIdReusePolicy {
case service.WorkflowIDReusePolicyAllowDuplicate:
res = client.WorkflowIDReusePolicyAllowDuplicate
return &res, nil
case service.WorkflowIDReusePolicyAllowDuplicateFailedOnly:
res = client.WorkflowIDReusePolicyAllowDuplicateFailedOnly
return &res, nil
case service.WorkflowIDReusePolicyRejectDuplicate:
res = client.WorkflowIDReusePolicyRejectDuplicate
return &res, nil
case service.WorkflowIDReusePolicyTerminateIfRunning:
res = client.WorkflowIDReusePolicyTerminateIfRunning
return &res, nil
default:
return nil, fmt.Errorf("unsupported workflow id reuse policy %s", workflowIdReusePolicy)
}
}

func mapToIwfWorkflowStatus(status *shared.WorkflowExecutionCloseStatus) (string, error) {
if status == nil {
return service.WorkflowStatusRunning, nil
Expand Down
8 changes: 3 additions & 5 deletions service/api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ type UnifiedClient interface {
}

type StartWorkflowOptions struct {
ID string
TaskQueue string
WorkflowRunTimeout time.Duration
WorkflowIDReusePolicy string
CronSchedule string
ID string
TaskQueue string
WorkflowRunTimeout time.Duration
}

type ListWorkflowExecutionsRequest struct {
Expand Down
8 changes: 3 additions & 5 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ func NewApiService(client UnifiedClient, taskQueue string) (ApiService, error) {

func (s *serviceImpl) ApiV1WorkflowStartPost(req iwfidl.WorkflowStartRequest) (*iwfidl.WorkflowStartResponse, *ErrorAndStatus) {
workflowOptions := StartWorkflowOptions{
ID: req.GetWorkflowId(),
TaskQueue: s.taskQueue,
WorkflowRunTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second,
WorkflowIDReusePolicy: *req.WorkflowStartOptions.WorkflowIDReusePolicy,
CronSchedule: *req.WorkflowStartOptions.CronSchedule,
ID: req.GetWorkflowId(),
TaskQueue: s.taskQueue,
WorkflowRunTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second,
}

input := service.InterpreterWorkflowInput{
Expand Down
32 changes: 3 additions & 29 deletions service/api/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,10 @@ func (t *temporalClient) Close() {
}

func (t *temporalClient) StartInterpreterWorkflow(ctx context.Context, options api.StartWorkflowOptions, args ...interface{}) (runId string, err error) {
workflowIdReusePolicy, err := mapToTemporalWorkflowIdReusePolicy(options.WorkflowIDReusePolicy)
if err != nil {
return "", nil
}
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskQueue: options.TaskQueue,
WorkflowRunTimeout: options.WorkflowRunTimeout,
WorkflowIDReusePolicy: *workflowIdReusePolicy,
CronSchedule: options.CronSchedule,
ID: options.ID,
TaskQueue: options.TaskQueue,
WorkflowRunTimeout: options.WorkflowRunTimeout,
}

run, err := t.tClient.ExecuteWorkflow(ctx, workflowOptions, temporal.Interpreter, args...)
Expand Down Expand Up @@ -142,26 +136,6 @@ func mapToIwfSearchAttributes(searchAttributes *common.SearchAttributes) (map[st
return result, nil
}

func mapToTemporalWorkflowIdReusePolicy(workflowIdReusePolicy string) (*enums.WorkflowIdReusePolicy, error) {
var res enums.WorkflowIdReusePolicy
switch workflowIdReusePolicy {
case service.WorkflowIDReusePolicyAllowDuplicate:
res = enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
return &res, nil
case service.WorkflowIDReusePolicyAllowDuplicateFailedOnly:
res = enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
return &res, nil
case service.WorkflowIDReusePolicyRejectDuplicate:
res = enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
return &res, nil
case service.WorkflowIDReusePolicyTerminateIfRunning:
res = enums.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
return &res, nil
default:
return nil, fmt.Errorf("unsupported workflow id reuse policy %s", workflowIdReusePolicy)
}
}

func mapToIwfWorkflowStatus(status enums.WorkflowExecutionStatus) (string, error) {
switch status {
case enums.WORKFLOW_EXECUTION_STATUS_CANCELED:
Expand Down
5 changes: 0 additions & 5 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ const (
SearchAttributeGlobalVersion = "IwfGlobalWorkflowVersion"
SearchAttributeExecutingStateIds = "IwfExecutingStateIds"
SearchAttributeIwfWorkflowType = "IwfWorkflowType"

WorkflowIDReusePolicyAllowDuplicateFailedOnly = "ALLOW_DUPLICATE_FAILED_ONLY"
WorkflowIDReusePolicyAllowDuplicate = "ALLOW_DUPLICATE"
WorkflowIDReusePolicyRejectDuplicate = "REJECT_DUPLICATE"
WorkflowIDReusePolicyTerminateIfRunning = "TERMINATE_IF_RUNNING"
)

type BackendType string
Expand Down

0 comments on commit fe375a9

Please sign in to comment.