Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement system search attributes(global version, executingStateIds, IwfWorkflowType) #68

Merged
merged 5 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,40 @@ Any contribution is welcome.

### Run with local Temporalite
1. Run a local Temporalite following the [instruction](https://github.com/temporalio/temporalite). If you see error `error setting up schema`, try use command `temporalite start --namespace default -f my_test.db` instead to start.
2. Go to http://localhost:8233/ for Temporal WebUI
2. Register a default namespace
```shell
tctl --ns default n re
```
3. Go to http://localhost:8233/ for Temporal WebUI

NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporalio/docker-compose) to run with docker

3. For `attribute_test.go` integTests, you need to register search attributes:
```bash
3. Register system search attributes required by iWF server
```shell
tctl adm cl asa -n IwfWorkflowType -t Keyword
tctl adm cl asa -n GlobalWorkflowVersion -t Int
tctl adm cl asa -n ExecutingStateIds -t Keyword

```
4 For `attribute_test.go` integTests, you need to register search attributes:
```shell
tctl adm cl asa -n CustomKeywordField -t Keyword
tctl adm cl asa -n CustomIntField -t Int
```

### Run with local Cadence
1. Run a local Cadence server following the [instructions](https://github.com/uber/cadence/tree/master/docker)
```
docker-compose -f docker-compose-es-v7.yml up
```
2. Register a new domain if not haven `cadence --do default domain register`
3. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days
3. Register system search attributes required by iWF server
```
cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2
cadence adm cl asa --search_attr_key ExecutingStateIds --search_attr_type 0
cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0
```
4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days

## Development Plan
### 1.0
Expand Down
2 changes: 1 addition & 1 deletion integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType) {
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(service.BackendTypeTemporal)
closeFunc2 := startIwfService(backendType)
defer closeFunc2()

// start a workflow
Expand Down
4 changes: 4 additions & 0 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
WorkflowStatusTerminated = "TERMINATED"
WorkflowStatusCanceled = "CANCELED"
WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW"

SearchAttributeGlobalVersion = "GlobalWorkflowVersion"
SearchAttributeExecutingStateIds = "ExecutingStateIds"
SearchAttributeIwfWorkflowType = "IwfWorkflowType"
)

type BackendType string
Expand Down
10 changes: 10 additions & 0 deletions service/interpreter/cadence/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}

version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported))
return int(version)
}

type cadenceReceiveChannel struct {
channel workflow.Channel
}
Expand Down
29 changes: 29 additions & 0 deletions service/interpreter/globalVersionProvider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package interpreter

import "github.com/indeedeng/iwf/service"

const globalChangeId = "global"
const startingVersionUsingGlobalVersioning = 1
const maxOfAllVersions = startingVersionUsingGlobalVersioning

// see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api
type globalVersioner struct {
workflowProvider WorkflowProvider
}

func newGlobalVersionProvider(workflowProvider WorkflowProvider) *globalVersioner {
return &globalVersioner{
workflowProvider: workflowProvider,
}
}

func (p *globalVersioner) isAfterVersionOfUsingGlobalVersioning(ctx UnifiedContext) bool {
version := p.workflowProvider.GetVersion(ctx, globalChangeId, 0, maxOfAllVersions)
return version >= startingVersionUsingGlobalVersioning
}

func (p *globalVersioner) upsertGlobalVersionSearchAttribute(ctx UnifiedContext) error {
return p.workflowProvider.UpsertSearchAttributes(ctx, map[string]interface{}{
service.SearchAttributeGlobalVersion: maxOfAllVersions,
})
}
1 change: 1 addition & 0 deletions service/interpreter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type WorkflowProvider interface {
Sleep(ctx UnifiedContext, d time.Duration) (err error)
GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
GetContextValue(ctx UnifiedContext, key string) interface{}
GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int
GetBackendType() service.BackendType
}

Expand Down
63 changes: 63 additions & 0 deletions service/interpreter/stateExecutingManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package interpreter

import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
)

type stateExecutingManager struct {
ctx UnifiedContext
provider WorkflowProvider

stateIdCount map[string]int
totalExecutingCount int
}

func newStateExecutingManager(ctx UnifiedContext, provider WorkflowProvider) *stateExecutingManager {
return &stateExecutingManager{
ctx: ctx,
provider: provider,
stateIdCount: map[string]int{},
totalExecutingCount: 0,
}
}

func (e *stateExecutingManager) startStates(states []iwfidl.StateMovement) error {
needsUpdate := false
for _, s := range states {
e.stateIdCount[s.StateId]++
if e.stateIdCount[s.StateId] == 1 {
// first time the stateId show up
needsUpdate = true
}
}
e.totalExecutingCount += len(states)
if needsUpdate {
return e.updateSearchAttribute()
}
return nil
}

func (e *stateExecutingManager) completeStates(state iwfidl.StateMovement) error {
e.stateIdCount[state.StateId]--
e.totalExecutingCount -= 1
if e.stateIdCount[state.StateId] == 0 {
delete(e.stateIdCount, state.StateId)
return e.updateSearchAttribute()
}
return nil
}

func (e *stateExecutingManager) getTotalExecutingStates() int {
return e.totalExecutingCount
}

func (e *stateExecutingManager) updateSearchAttribute() error {
var executingStateIds []string
for sid := range e.stateIdCount {
executingStateIds = append(executingStateIds, sid)
}
return e.provider.UpsertSearchAttributes(e.ctx, map[string]interface{}{
service.SearchAttributeExecutingStateIds: executingStateIds,
})
}
10 changes: 10 additions & 0 deletions service/interpreter/temporal/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}

version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported))
return int(version)
}

type temporalReceiveChannel struct {
channel workflow.ReceiveChannel
}
Expand Down
37 changes: 29 additions & 8 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ import (
)

func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) {
globalVersionProvider := newGlobalVersionProvider(provider)
if globalVersionProvider.isAfterVersionOfUsingGlobalVersioning(ctx) {
err := globalVersionProvider.upsertGlobalVersionSearchAttribute(ctx)
if err != nil {
return nil, err
}
}

err := provider.UpsertSearchAttributes(ctx, map[string]interface{}{
service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType,
})
if err != nil {
return nil, err
}

execution := service.IwfWorkflowExecution{
IwfWorkerUrl: input.IwfWorkerUrl,
WorkflowType: input.IwfWorkflowType,
Expand All @@ -28,7 +43,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
return provider.UpsertSearchAttributes(ctx, attributes)
})

err := provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) {
err = provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) {
return attrMgr.GetQueryAttributesByKey(req), nil
})
if err != nil {
Expand All @@ -38,12 +53,16 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
var errToFailWf error // TODO Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve.
var outputsToReturnWf []iwfidl.StateCompletionOutput
var forceCompleteWf bool
inFlightExecutingStateCount := 0
stateExecutingMgr := newStateExecutingManager(ctx, provider)
//inFlightExecutingStateCount := 0

for len(currentStates) > 0 {
// copy the whole slice(pointer)
inFlightExecutingStateCount += len(currentStates)
statesToExecute := currentStates
err := stateExecutingMgr.startStates(currentStates)
if err != nil {
return nil, err
}
//reset to empty slice since each iteration will process all current states in the queue
currentStates = nil

Expand All @@ -52,10 +71,6 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
// state must be passed via parameter https://stackoverflow.com/questions/67263092
stateCtx := provider.ExtendContextWithValue(ctx, "state", stateToExecute)
provider.GoNamed(stateCtx, stateToExecute.GetStateId(), func(ctx UnifiedContext) {
defer func() {
inFlightExecutingStateCount--
}()

state, ok := provider.GetContextValue(ctx, "state").(iwfidl.StateMovement)
if !ok {
errToFailWf = provider.NewApplicationError(
Expand All @@ -64,6 +79,12 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
)
return
}
defer func() {
err := stateExecutingMgr.completeStates(state)
if err != nil {
errToFailWf = err
}
}()

stateExeId := stateExeIdMgr.IncAndGetNextExecutionId(state.GetStateId())
decision, err := executeState(ctx, provider, state, execution, stateExeId, attrMgr, interStateChannel)
Expand Down Expand Up @@ -94,7 +115,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
}

awaitError := provider.Await(ctx, func() bool {
return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || inFlightExecutingStateCount == 0
return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || stateExecutingMgr.getTotalExecutingStates() == 0
})
if errToFailWf != nil || forceCompleteWf {
return &service.InterpreterWorkflowOutput{
Expand Down