Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reset by stateId and stateExecutionId #99

Merged
merged 5 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0
- [x] AnyCommandCompleted Decider trigger type
- [x] More workflow start options: IdReusePolicy, cron schedule, retry
- [x] StateOption: Start/Decide API timeout and retry policy
- [x] Reset workflow by stateId or stateExecutionId
- [ ] More workflow start options: initial search attributes/memo
- [ ] Reset workflow by stateId

### 1.2
- [ ] Decider trigger type: AnyCommandClosed
Expand Down
6 changes: 4 additions & 2 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"github.com/urfave/cli"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/compatibility"
"go.uber.org/cadence/encoded"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
"log"
Expand Down Expand Up @@ -99,7 +101,7 @@ func start(c *cli.Context) {
if err != nil {
log.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Backend.Temporal.Namespace)
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Backend.Temporal.Namespace, converter.GetDefaultDataConverter())

for _, svcName := range services {
go launchTemporalService(svcName, config, unifiedClient, temporalClient)
Expand All @@ -121,7 +123,7 @@ func start(c *cli.Context) {
if err != nil {
log.Fatalf("Unable to connect to Cadence because of error %v", err)
}
unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, closeFunc)
unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)

for _, svcName := range services {
go launchCadenceService(svcName, config, unifiedClient, serviceClient, domain, closeFunc)
Expand Down
8 changes: 8 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,12 @@ components:
reason: reason
historyEventId: 0
historyEventTime: historyEventTime
stateId: stateId
resetType: HISTORY_EVENT_ID
skipSignalReapply: true
workflowRunId: workflowRunId
workflowId: workflowId
stateExecutionId: stateExecutionId
properties:
workflowId:
type: string
Expand All @@ -723,13 +725,19 @@ components:
- HISTORY_EVENT_ID
- BEGINNING
- HISTORY_EVENT_TIME
- STATE_ID
- STATE_EXECUTION_ID
type: string
historyEventId:
type: integer
reason:
type: string
historyEventTime:
type: string
stateId:
type: string
stateExecutionId:
type: string
skipSignalReapply:
type: boolean
required:
Expand Down
52 changes: 52 additions & 0 deletions gen/iwfidl/docs/WorkflowResetRequest.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Name | Type | Description | Notes
**HistoryEventId** | Pointer to **int32** | | [optional]
**Reason** | Pointer to **string** | | [optional]
**HistoryEventTime** | Pointer to **string** | | [optional]
**StateId** | Pointer to **string** | | [optional]
**StateExecutionId** | Pointer to **string** | | [optional]
**SkipSignalReapply** | Pointer to **bool** | | [optional]

## Methods
Expand Down Expand Up @@ -171,6 +173,56 @@ SetHistoryEventTime sets HistoryEventTime field to given value.

HasHistoryEventTime returns a boolean if a field has been set.

### GetStateId

`func (o *WorkflowResetRequest) GetStateId() string`

GetStateId returns the StateId field if non-nil, zero value otherwise.

### GetStateIdOk

`func (o *WorkflowResetRequest) GetStateIdOk() (*string, bool)`

GetStateIdOk returns a tuple with the StateId field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetStateId

`func (o *WorkflowResetRequest) SetStateId(v string)`

SetStateId sets StateId field to given value.

### HasStateId

`func (o *WorkflowResetRequest) HasStateId() bool`

HasStateId returns a boolean if a field has been set.

### GetStateExecutionId

`func (o *WorkflowResetRequest) GetStateExecutionId() string`

GetStateExecutionId returns the StateExecutionId field if non-nil, zero value otherwise.

### GetStateExecutionIdOk

`func (o *WorkflowResetRequest) GetStateExecutionIdOk() (*string, bool)`

GetStateExecutionIdOk returns a tuple with the StateExecutionId field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetStateExecutionId

`func (o *WorkflowResetRequest) SetStateExecutionId(v string)`

SetStateExecutionId sets StateExecutionId field to given value.

### HasStateExecutionId

`func (o *WorkflowResetRequest) HasStateExecutionId() bool`

HasStateExecutionId returns a boolean if a field has been set.

### GetSkipSignalReapply

`func (o *WorkflowResetRequest) GetSkipSignalReapply() bool`
Expand Down
72 changes: 72 additions & 0 deletions gen/iwfidl/model_workflow_reset_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/indeedeng/iwf/service/interpreter/cadence"
"github.com/indeedeng/iwf/service/interpreter/temporal"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.uber.org/cadence/encoded"
"log"
"net/http"
)
Expand Down Expand Up @@ -47,7 +49,7 @@ func startWorkflowWorker(handler common.WorkflowHandler) (closeFunc func()) {
func startIwfService(backendType service.BackendType) (closeFunc func()) {
if backendType == service.BackendTypeTemporal {
temporalClient := createTemporalClient()
iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient, testNamespace))
iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient, testNamespace, converter.GetDefaultDataConverter()))
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -73,7 +75,7 @@ func startIwfService(backendType service.BackendType) (closeFunc func()) {

cadenceClient, err := iwf.BuildCadenceClient(serviceClient, iwf.DefaultCadenceDomain)

iwfService := api.NewService(cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, closeFunc))
iwfService := api.NewService(cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc))
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand Down
2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 1 files
+6 −0 iwf.yaml
8 changes: 5 additions & 3 deletions script/http/local/home.http
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ Content-Type: application/json
"stateOptions": {}
}

### test reset API NOTE: try different resetType: BEGINNING, HISTORY_EVENT_ID, HISTORY_EVENT_TIME
### test reset API NOTE: try different resetType: BEGINNING, HISTORY_EVENT_ID, HISTORY_EVENT_TIME, STATE_ID, STATE_EXECUTION_ID
POST http://localhost:8801/api/v1/workflow/reset
Content-Type: application/json

{
"workflowId": "timer1671219152",
"workflowRunId": "fae51e69-7e41-4658-89aa-663463951dc7",
"resetType": "HISTORY_EVENT_TIME",
"resetType": "STATE_EXECUTION_ID",
"historyEventId":15,
"historyEventTime": "150m"
"historyEventTime": "150m",
"stateId": "S2",
"stateExecutionId": "S2-1"
}
7 changes: 5 additions & 2 deletions service/api/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ import (
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/client"
"go.uber.org/cadence/encoded"
)

type cadenceClient struct {
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
}

func NewCadenceClient(domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, closeFunc func()) api.UnifiedClient {
func NewCadenceClient(domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, converter encoded.DataConverter, closeFunc func()) api.UnifiedClient {
return &cadenceClient{
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
}
}

Expand Down Expand Up @@ -222,7 +225,7 @@ func (t *cadenceClient) ResetWorkflow(ctx context.Context, request iwfidl.Workfl

resetType := service.ResetType(request.GetResetType())
resetBaseRunID, decisionFinishID, err := getResetIDsByType(ctx, resetType, t.domain, request.GetWorkflowId(),
reqRunId, t.serviceClient, request.GetHistoryEventId(), request.GetHistoryEventTime())
reqRunId, t.serviceClient, t.converter, request.GetHistoryEventId(), request.GetHistoryEventTime(), request.GetStateId(), request.GetStateExecutionId())

if err != nil {
return "", err
Expand Down
Loading