Skip to content

Commit

Permalink
Support wait for state execution completion (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai authored Sep 18, 2023
1 parent 1f40c37 commit 9b549fe
Show file tree
Hide file tree
Showing 41 changed files with 1,507 additions and 225 deletions.
20 changes: 11 additions & 9 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ package iwf

import (
"fmt"
rawLog "log"
"strings"
"sync"
"time"

isvc "github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/api"
cadenceapi "github.com/indeedeng/iwf/service/api/cadence"
temporalapi "github.com/indeedeng/iwf/service/api/temporal"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/log"
"github.com/indeedeng/iwf/service/common/log/loggerimpl"
Expand All @@ -46,10 +52,6 @@ import (
"go.uber.org/cadence/encoded"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
rawLog "log"
"strings"
"sync"
"time"
)

const serviceAPI = "api"
Expand Down Expand Up @@ -106,7 +108,7 @@ func start(c *cli.Context) {
services := getServices(c)

// The client is a heavyweight object that should be created once per process.
var unifiedClient api.UnifiedClient
var unifiedClient uclient.UnifiedClient
if config.Interpreter.Temporal != nil {
var metricHandler client.MetricsHandler
if config.Interpreter.Temporal.Prometheus != nil {
Expand Down Expand Up @@ -159,13 +161,13 @@ func start(c *cli.Context) {
wg.Wait()
}

func launchTemporalService(svcName string, config config.Config, unifiedClient api.UnifiedClient, temporalClient client.Client, logger log.Logger) {
func launchTemporalService(svcName string, config config.Config, unifiedClient uclient.UnifiedClient, temporalClient client.Client, logger log.Logger) {
switch svcName {
case serviceAPI:
svc := api.NewService(config, unifiedClient, logger.WithTags(tag.Service(svcName)))
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
case serviceInterpreter:
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil)
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil, unifiedClient)
interpreter.Start()
default:
rawLog.Fatalf("Invalid service: %v", svcName)
Expand All @@ -175,7 +177,7 @@ func launchTemporalService(svcName string, config config.Config, unifiedClient a
func launchCadenceService(
svcName string,
config config.Config,
unifiedClient api.UnifiedClient,
unifiedClient uclient.UnifiedClient,
service workflowserviceclient.Interface,
domain string,
closeFunc func(),
Expand All @@ -185,7 +187,7 @@ func launchCadenceService(
svc := api.NewService(config, unifiedClient, logger.WithTags(tag.Service(svcName)))
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
case serviceInterpreter:
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc)
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc, unifiedClient)
interpreter.Start()
default:
rawLog.Fatalf("Invalid service: %v", svcName)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose/integ-dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ services:
cadence-web:
image: ubercadence/web:latest
environment:
- "CADENCE_TCHANNEL_PEERS=cadence:7933"
- "CADENCE_TCHANNEL_PEERS=cadence:7833"
ports:
- "8088:8088"
depends_on:
Expand Down
4 changes: 4 additions & 0 deletions gen/iwfidl/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ docs/WorkflowStateStartResponse.md
docs/WorkflowStatus.md
docs/WorkflowStopRequest.md
docs/WorkflowStopType.md
docs/WorkflowWaitForStateCompletionRequest.md
docs/WorkflowWaitForStateCompletionResponse.md
docs/WorkflowWorkerRpcRequest.md
docs/WorkflowWorkerRpcResponse.md
git_push.sh
Expand Down Expand Up @@ -152,6 +154,8 @@ model_workflow_state_start_response.go
model_workflow_status.go
model_workflow_stop_request.go
model_workflow_stop_type.go
model_workflow_wait_for_state_completion_request.go
model_workflow_wait_for_state_completion_response.go
model_workflow_worker_rpc_request.go
model_workflow_worker_rpc_response.go
response.go
Expand Down
3 changes: 3 additions & 0 deletions gen/iwfidl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Class | Method | HTTP request | Description
*DefaultApi* | [**ApiV1WorkflowStateStartPost**](docs/DefaultApi.md#apiv1workflowstatestartpost) | **Post** /api/v1/workflowState/start | for invoking WorkflowState.start API
*DefaultApi* | [**ApiV1WorkflowStopPost**](docs/DefaultApi.md#apiv1workflowstoppost) | **Post** /api/v1/workflow/stop | stop a workflow
*DefaultApi* | [**ApiV1WorkflowTimerSkipPost**](docs/DefaultApi.md#apiv1workflowtimerskippost) | **Post** /api/v1/workflow/timer/skip | skip the timer of a workflow
*DefaultApi* | [**ApiV1WorkflowWaitForStateCompletionPost**](docs/DefaultApi.md#apiv1workflowwaitforstatecompletionpost) | **Post** /api/v1/workflow/waitForStateCompletion |
*DefaultApi* | [**ApiV1WorkflowWorkerRpcPost**](docs/DefaultApi.md#apiv1workflowworkerrpcpost) | **Post** /api/v1/workflowWorker/rpc | for invoking workflow RPC API in the worker
*DefaultApi* | [**InfoHealthcheckGet**](docs/DefaultApi.md#infohealthcheckget) | **Get** /info/healthcheck | return health info of the server

Expand Down Expand Up @@ -168,6 +169,8 @@ Class | Method | HTTP request | Description
- [WorkflowStatus](docs/WorkflowStatus.md)
- [WorkflowStopRequest](docs/WorkflowStopRequest.md)
- [WorkflowStopType](docs/WorkflowStopType.md)
- [WorkflowWaitForStateCompletionRequest](docs/WorkflowWaitForStateCompletionRequest.md)
- [WorkflowWaitForStateCompletionResponse](docs/WorkflowWaitForStateCompletionResponse.md)
- [WorkflowWorkerRpcRequest](docs/WorkflowWorkerRpcRequest.md)
- [WorkflowWorkerRpcResponse](docs/WorkflowWorkerRpcResponse.md)

Expand Down
61 changes: 61 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,32 @@ paths:
$ref: '#/components/schemas/HealthInfo'
description: successful operation
summary: return health info of the server
/api/v1/workflow/waitForStateCompletion:
post:
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/WorkflowWaitForStateCompletionRequest'
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/WorkflowWaitForStateCompletionResponse'
description: successful operation
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
description: Invalid input
"420":
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
description: wait has exceeded timeout
/api/v1/workflowState/start:
post:
requestBody:
Expand Down Expand Up @@ -727,6 +753,9 @@ components:
example:
startStateId: startStateId
workflowTimeoutSeconds: 0
waitForCompletionStateExecutionIds:
- waitForCompletionStateExecutionIds
- waitForCompletionStateExecutionIds
stateOptions:
searchAttributesLoadingPolicy:
lockingKeys:
Expand Down Expand Up @@ -837,6 +866,10 @@ components:
type: string
startStateId:
type: string
waitForCompletionStateExecutionIds:
items:
type: string
type: array
stateInput:
$ref: '#/components/schemas/EncodedObject'
stateOptions:
Expand Down Expand Up @@ -2102,6 +2135,34 @@ components:
$ref: '#/components/schemas/InterStateChannelPublishing'
type: array
type: object
WorkflowWaitForStateCompletionRequest:
example:
waitTimeSeconds: 0
workflowId: workflowId
stateExecutionId: stateExecutionId
properties:
workflowId:
type: string
stateExecutionId:
type: string
waitTimeSeconds:
type: integer
required:
- stateExecutionId
- workflowId
type: object
WorkflowWaitForStateCompletionResponse:
example:
stateCompletionOutput:
completedStateOutput:
data: data
encoding: encoding
completedStateExecutionId: completedStateExecutionId
completedStateId: completedStateId
properties:
stateCompletionOutput:
$ref: '#/components/schemas/StateCompletionOutput'
type: object
StateDecision:
example:
nextStates:
Expand Down
Loading

0 comments on commit 9b549fe

Please sign in to comment.