Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support wait for state execution completion #345

Merged
merged 14 commits into from
Sep 18, 2023
Merged
20 changes: 11 additions & 9 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ package iwf

import (
"fmt"
rawLog "log"
"strings"
"sync"
"time"

isvc "github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/api"
cadenceapi "github.com/indeedeng/iwf/service/api/cadence"
temporalapi "github.com/indeedeng/iwf/service/api/temporal"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/log"
"github.com/indeedeng/iwf/service/common/log/loggerimpl"
Expand All @@ -46,10 +52,6 @@ import (
"go.uber.org/cadence/encoded"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
rawLog "log"
"strings"
"sync"
"time"
)

const serviceAPI = "api"
Expand Down Expand Up @@ -106,7 +108,7 @@ func start(c *cli.Context) {
services := getServices(c)

// The client is a heavyweight object that should be created once per process.
var unifiedClient api.UnifiedClient
var unifiedClient uclient.UnifiedClient
if config.Interpreter.Temporal != nil {
var metricHandler client.MetricsHandler
if config.Interpreter.Temporal.Prometheus != nil {
Expand Down Expand Up @@ -159,13 +161,13 @@ func start(c *cli.Context) {
wg.Wait()
}

func launchTemporalService(svcName string, config config.Config, unifiedClient api.UnifiedClient, temporalClient client.Client, logger log.Logger) {
func launchTemporalService(svcName string, config config.Config, unifiedClient uclient.UnifiedClient, temporalClient client.Client, logger log.Logger) {
switch svcName {
case serviceAPI:
svc := api.NewService(config, unifiedClient, logger.WithTags(tag.Service(svcName)))
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
case serviceInterpreter:
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil)
interpreter := temporal.NewInterpreterWorker(config, temporalClient, isvc.TaskQueue, false, nil, unifiedClient)
interpreter.Start()
default:
rawLog.Fatalf("Invalid service: %v", svcName)
Expand All @@ -175,7 +177,7 @@ func launchTemporalService(svcName string, config config.Config, unifiedClient a
func launchCadenceService(
svcName string,
config config.Config,
unifiedClient api.UnifiedClient,
unifiedClient uclient.UnifiedClient,
service workflowserviceclient.Interface,
domain string,
closeFunc func(),
Expand All @@ -185,7 +187,7 @@ func launchCadenceService(
svc := api.NewService(config, unifiedClient, logger.WithTags(tag.Service(svcName)))
rawLog.Fatal(svc.Run(fmt.Sprintf(":%v", config.Api.Port)))
case serviceInterpreter:
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc)
interpreter := cadence.NewInterpreterWorker(config, service, domain, isvc.TaskQueue, closeFunc, unifiedClient)
interpreter.Start()
default:
rawLog.Fatalf("Invalid service: %v", svcName)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose/integ-dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ services:
cadence-web:
image: ubercadence/web:latest
environment:
- "CADENCE_TCHANNEL_PEERS=cadence:7933"
- "CADENCE_TCHANNEL_PEERS=cadence:7833"
ports:
- "8088:8088"
depends_on:
Expand Down
4 changes: 4 additions & 0 deletions gen/iwfidl/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ docs/WorkflowStateStartResponse.md
docs/WorkflowStatus.md
docs/WorkflowStopRequest.md
docs/WorkflowStopType.md
docs/WorkflowWaitForStateCompletionRequest.md
docs/WorkflowWaitForStateCompletionResponse.md
docs/WorkflowWorkerRpcRequest.md
docs/WorkflowWorkerRpcResponse.md
git_push.sh
Expand Down Expand Up @@ -152,6 +154,8 @@ model_workflow_state_start_response.go
model_workflow_status.go
model_workflow_stop_request.go
model_workflow_stop_type.go
model_workflow_wait_for_state_completion_request.go
model_workflow_wait_for_state_completion_response.go
model_workflow_worker_rpc_request.go
model_workflow_worker_rpc_response.go
response.go
Expand Down
3 changes: 3 additions & 0 deletions gen/iwfidl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Class | Method | HTTP request | Description
*DefaultApi* | [**ApiV1WorkflowStateStartPost**](docs/DefaultApi.md#apiv1workflowstatestartpost) | **Post** /api/v1/workflowState/start | for invoking WorkflowState.start API
*DefaultApi* | [**ApiV1WorkflowStopPost**](docs/DefaultApi.md#apiv1workflowstoppost) | **Post** /api/v1/workflow/stop | stop a workflow
*DefaultApi* | [**ApiV1WorkflowTimerSkipPost**](docs/DefaultApi.md#apiv1workflowtimerskippost) | **Post** /api/v1/workflow/timer/skip | skip the timer of a workflow
*DefaultApi* | [**ApiV1WorkflowWaitForStateCompletionPost**](docs/DefaultApi.md#apiv1workflowwaitforstatecompletionpost) | **Post** /api/v1/workflow/waitForStateCompletion |
*DefaultApi* | [**ApiV1WorkflowWorkerRpcPost**](docs/DefaultApi.md#apiv1workflowworkerrpcpost) | **Post** /api/v1/workflowWorker/rpc | for invoking workflow RPC API in the worker
*DefaultApi* | [**InfoHealthcheckGet**](docs/DefaultApi.md#infohealthcheckget) | **Get** /info/healthcheck | return health info of the server

Expand Down Expand Up @@ -168,6 +169,8 @@ Class | Method | HTTP request | Description
- [WorkflowStatus](docs/WorkflowStatus.md)
- [WorkflowStopRequest](docs/WorkflowStopRequest.md)
- [WorkflowStopType](docs/WorkflowStopType.md)
- [WorkflowWaitForStateCompletionRequest](docs/WorkflowWaitForStateCompletionRequest.md)
- [WorkflowWaitForStateCompletionResponse](docs/WorkflowWaitForStateCompletionResponse.md)
- [WorkflowWorkerRpcRequest](docs/WorkflowWorkerRpcRequest.md)
- [WorkflowWorkerRpcResponse](docs/WorkflowWorkerRpcResponse.md)

Expand Down
61 changes: 61 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,32 @@ paths:
$ref: '#/components/schemas/HealthInfo'
description: successful operation
summary: return health info of the server
/api/v1/workflow/waitForStateCompletion:
post:
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/WorkflowWaitForStateCompletionRequest'
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/WorkflowWaitForStateCompletionResponse'
description: successful operation
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
description: Invalid input
"420":
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
description: wait has exceeded timeout
/api/v1/workflowState/start:
post:
requestBody:
Expand Down Expand Up @@ -727,6 +753,9 @@ components:
example:
startStateId: startStateId
workflowTimeoutSeconds: 0
waitForCompletionStateExecutionIds:
- waitForCompletionStateExecutionIds
- waitForCompletionStateExecutionIds
stateOptions:
searchAttributesLoadingPolicy:
lockingKeys:
Expand Down Expand Up @@ -837,6 +866,10 @@ components:
type: string
startStateId:
type: string
waitForCompletionStateExecutionIds:
items:
type: string
type: array
stateInput:
$ref: '#/components/schemas/EncodedObject'
stateOptions:
Expand Down Expand Up @@ -2102,6 +2135,34 @@ components:
$ref: '#/components/schemas/InterStateChannelPublishing'
type: array
type: object
WorkflowWaitForStateCompletionRequest:
example:
waitTimeSeconds: 0
workflowId: workflowId
stateExecutionId: stateExecutionId
properties:
workflowId:
type: string
stateExecutionId:
type: string
waitTimeSeconds:
type: integer
required:
- stateExecutionId
- workflowId
type: object
WorkflowWaitForStateCompletionResponse:
example:
stateCompletionOutput:
completedStateOutput:
data: data
encoding: encoding
completedStateExecutionId: completedStateExecutionId
completedStateId: completedStateId
properties:
stateCompletionOutput:
$ref: '#/components/schemas/StateCompletionOutput'
type: object
StateDecision:
example:
nextStates:
Expand Down
Loading