From 5e22be50b13e677bcb3de700b963318f80cd733e Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 19:11:46 -0800 Subject: [PATCH 1/5] README for 3 sys SAs --- README.md | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7d238a65..b7771955 100644 --- a/README.md +++ b/README.md @@ -137,20 +137,40 @@ Any contribution is welcome. ### Run with local Temporalite 1. Run a local Temporalite following the [instruction](https://github.com/temporalio/temporalite). If you see error `error setting up schema`, try use command `temporalite start --namespace default -f my_test.db` instead to start. -2. Go to http://localhost:8233/ for Temporal WebUI +2. Register a default namespace +```shell +tctl --ns default n re +``` +3. Go to http://localhost:8233/ for Temporal WebUI NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporalio/docker-compose) to run with docker -3. For `attribute_test.go` integTests, you need to register search attributes: -```bash +3. Register system search attributes required by iWF server +```shell +tctl adm cl asa -n IWorklowType -t Keyword +tctl adm cl asa -n GlobalWorkflowVersion -t Int +tctl adm cl asa -n StateExecutionStatus -t Keyword + +``` +4 For `attribute_test.go` integTests, you need to register search attributes: +```shell tctl adm cl asa -n CustomKeywordField -t Keyword tctl adm cl asa -n CustomIntField -t Int ``` ### Run with local Cadence 1. Run a local Cadence server following the [instructions](https://github.com/uber/cadence/tree/master/docker) +``` +docker-compose -f docker-compose-es-v7.yml up +``` 2. Register a new domain if not haven `cadence --do default domain register` -3. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days +3. Register system search attributes required by iWF server +``` +cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2 +cadence adm cl asa --search_attr_key StateExecutionStatus --search_attr_type 0 +cadence adm cl asa --search_attr_key IWorklowType --search_attr_type 0 +``` +4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days ## Development Plan ### 1.0 From f921dcb0c0c883d96c6eed9bfab648ebe21bb7c5 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 19:24:38 -0800 Subject: [PATCH 2/5] prepare --- service/const.go | 4 +++ .../interpreter/cadence/workflowProvider.go | 10 +++++++ service/interpreter/globalVersionProvider.go | 29 +++++++++++++++++++ service/interpreter/interfaces.go | 1 + .../interpreter/temporal/workflowProvider.go | 10 +++++++ 5 files changed, 54 insertions(+) create mode 100644 service/interpreter/globalVersionProvider.go diff --git a/service/const.go b/service/const.go index c13bc37f..20bd921f 100644 --- a/service/const.go +++ b/service/const.go @@ -37,6 +37,10 @@ const ( WorkflowStatusTerminated = "TERMINATED" WorkflowStatusCanceled = "CANCELED" WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW" + + SearchAttributeGlobalVersion = "GlobalWorkflowVersion" + SearchAttributeStateExecutionStatus = "StateExecutionStatus" + SearchAttributeIWorkflowType = "IWorkflowType" ) type BackendType string diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 7c15851f..87c4dedf 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -142,6 +142,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } +func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + + version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported)) + return int(version) +} + type cadenceReceiveChannel struct { channel workflow.Channel } diff --git a/service/interpreter/globalVersionProvider.go b/service/interpreter/globalVersionProvider.go new file mode 100644 index 00000000..5ecf96eb --- /dev/null +++ b/service/interpreter/globalVersionProvider.go @@ -0,0 +1,29 @@ +package interpreter + +import "github.com/indeedeng/iwf/service" + +const globalChangeId = "global" +const startingVersionUsingGlobalVersioning = 1 +const maxOfAllVersions = startingVersionUsingGlobalVersioning + +// see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api +type globalVersioner struct { + provider WorkflowProvider +} + +func newGlobalVersionProvider(provider WorkflowProvider) *globalVersioner { + return &globalVersioner{ + provider: provider, + } +} + +func (p *globalVersioner) isAfterVersionOfUsingGlobalVersioning(ctx UnifiedContext) bool { + version := p.provider.GetVersion(ctx, globalChangeId, 0, maxOfAllVersions) + return version >= startingVersionUsingGlobalVersioning +} + +func (p *globalVersioner) upsertGlobalVersionSearchAttribute(ctx UnifiedContext) error { + return p.provider.UpsertSearchAttributes(ctx, map[string]interface{}{ + service.SearchAttributeGlobalVersion: maxOfAllVersions, + }) +} diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index b867ee46..b96ffa32 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -83,6 +83,7 @@ type WorkflowProvider interface { Sleep(ctx UnifiedContext, d time.Duration) (err error) GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel) GetContextValue(ctx UnifiedContext, key string) interface{} + GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int GetBackendType() service.BackendType } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index 9d05aeb7..08e92368 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -130,6 +130,16 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } +func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int { + wfCtx, ok := ctx.GetContext().(workflow.Context) + if !ok { + panic("cannot convert to temporal workflow context") + } + + version := workflow.GetVersion(wfCtx, changeID, workflow.Version(minSupported), workflow.Version(maxSupported)) + return int(version) +} + type temporalReceiveChannel struct { channel workflow.ReceiveChannel } From a791049ce6c397b95dfce2cb3207505a1a68cdde Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 19:26:14 -0800 Subject: [PATCH 3/5] use global versioning --- service/interpreter/globalVersionProvider.go | 10 +++++----- service/interpreter/workflowImpl.go | 8 ++++++++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/service/interpreter/globalVersionProvider.go b/service/interpreter/globalVersionProvider.go index 5ecf96eb..5d638ff9 100644 --- a/service/interpreter/globalVersionProvider.go +++ b/service/interpreter/globalVersionProvider.go @@ -8,22 +8,22 @@ const maxOfAllVersions = startingVersionUsingGlobalVersioning // see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api type globalVersioner struct { - provider WorkflowProvider + workflowProvider WorkflowProvider } -func newGlobalVersionProvider(provider WorkflowProvider) *globalVersioner { +func newGlobalVersionProvider(workflowProvider WorkflowProvider) *globalVersioner { return &globalVersioner{ - provider: provider, + workflowProvider: workflowProvider, } } func (p *globalVersioner) isAfterVersionOfUsingGlobalVersioning(ctx UnifiedContext) bool { - version := p.provider.GetVersion(ctx, globalChangeId, 0, maxOfAllVersions) + version := p.workflowProvider.GetVersion(ctx, globalChangeId, 0, maxOfAllVersions) return version >= startingVersionUsingGlobalVersioning } func (p *globalVersioner) upsertGlobalVersionSearchAttribute(ctx UnifiedContext) error { - return p.provider.UpsertSearchAttributes(ctx, map[string]interface{}{ + return p.workflowProvider.UpsertSearchAttributes(ctx, map[string]interface{}{ service.SearchAttributeGlobalVersion: maxOfAllVersions, }) } diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 008179df..dcfe0628 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -8,6 +8,14 @@ import ( ) func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { + globalVersionProvider := newGlobalVersionProvider(provider) + if globalVersionProvider.isAfterVersionOfUsingGlobalVersioning(ctx) { + err := globalVersionProvider.upsertGlobalVersionSearchAttribute(ctx) + if err != nil { + return nil, err + } + } + execution := service.IwfWorkflowExecution{ IwfWorkerUrl: input.IwfWorkerUrl, WorkflowType: input.IwfWorkflowType, From 6bcd7b9e6f8339cb837d3e7dc9ad16351210a0c6 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 20:31:48 -0800 Subject: [PATCH 4/5] rename iwfWorkflowType --- README.md | 4 ++-- service/const.go | 2 +- service/interpreter/workflowImpl.go | 11 +++++++++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b7771955..a5e96c55 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporali 3. Register system search attributes required by iWF server ```shell -tctl adm cl asa -n IWorklowType -t Keyword +tctl adm cl asa -n IwfWorkflowType -t Keyword tctl adm cl asa -n GlobalWorkflowVersion -t Int tctl adm cl asa -n StateExecutionStatus -t Keyword @@ -168,7 +168,7 @@ docker-compose -f docker-compose-es-v7.yml up ``` cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2 cadence adm cl asa --search_attr_key StateExecutionStatus --search_attr_type 0 -cadence adm cl asa --search_attr_key IWorklowType --search_attr_type 0 +cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0 ``` 4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days diff --git a/service/const.go b/service/const.go index 20bd921f..2a517d1b 100644 --- a/service/const.go +++ b/service/const.go @@ -40,7 +40,7 @@ const ( SearchAttributeGlobalVersion = "GlobalWorkflowVersion" SearchAttributeStateExecutionStatus = "StateExecutionStatus" - SearchAttributeIWorkflowType = "IWorkflowType" + SearchAttributeIwfWorkflowType = "IwfWorkflowType" ) type BackendType string diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index dcfe0628..abc66d27 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -15,7 +15,14 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic return nil, err } } - + + err := provider.UpsertSearchAttributes(ctx, map[string]interface{}{ + service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType, + }) + if err != nil { + return nil, err + } + execution := service.IwfWorkflowExecution{ IwfWorkerUrl: input.IwfWorkerUrl, WorkflowType: input.IwfWorkflowType, @@ -36,7 +43,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic return provider.UpsertSearchAttributes(ctx, attributes) }) - err := provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) { + err = provider.SetQueryHandler(ctx, service.AttributeQueryType, func(req service.QueryAttributeRequest) (service.QueryAttributeResponse, error) { return attrMgr.GetQueryAttributesByKey(req), nil }) if err != nil { From bdc86e7e3b5bb23abd61f5e4b37aa1eb7e6c06a6 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 7 Nov 2022 21:09:43 -0800 Subject: [PATCH 5/5] done all --- README.md | 4 +- integ/timer_test.go | 2 +- service/const.go | 6 +- service/interpreter/stateExecutingManager.go | 63 ++++++++++++++++++++ service/interpreter/workflowImpl.go | 20 ++++--- 5 files changed, 82 insertions(+), 13 deletions(-) create mode 100644 service/interpreter/stateExecutingManager.go diff --git a/README.md b/README.md index a5e96c55..cf57db24 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,7 @@ NOTE: alternatively, go to [Temporal-dockercompose](https://github.com/temporali ```shell tctl adm cl asa -n IwfWorkflowType -t Keyword tctl adm cl asa -n GlobalWorkflowVersion -t Int -tctl adm cl asa -n StateExecutionStatus -t Keyword +tctl adm cl asa -n ExecutingStateIds -t Keyword ``` 4 For `attribute_test.go` integTests, you need to register search attributes: @@ -167,7 +167,7 @@ docker-compose -f docker-compose-es-v7.yml up 3. Register system search attributes required by iWF server ``` cadence adm cl asa --search_attr_key GlobalWorkflowVersion --search_attr_type 2 -cadence adm cl asa --search_attr_key StateExecutionStatus --search_attr_type 0 +cadence adm cl asa --search_attr_key ExecutingStateIds --search_attr_type 0 cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0 ``` 4. Go to Cadence http://localhost:8088/domains/default/workflows?range=last-30-days diff --git a/integ/timer_test.go b/integ/timer_test.go index 15202b72..eb56015f 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -27,7 +27,7 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType) { closeFunc1 := startWorkflowWorker(wfHandler) defer closeFunc1() - closeFunc2 := startIwfService(service.BackendTypeTemporal) + closeFunc2 := startIwfService(backendType) defer closeFunc2() // start a workflow diff --git a/service/const.go b/service/const.go index 2a517d1b..5ce77646 100644 --- a/service/const.go +++ b/service/const.go @@ -38,9 +38,9 @@ const ( WorkflowStatusCanceled = "CANCELED" WorkflowStatusContinueAsNew = "CONTINUED_AS_NEW" - SearchAttributeGlobalVersion = "GlobalWorkflowVersion" - SearchAttributeStateExecutionStatus = "StateExecutionStatus" - SearchAttributeIwfWorkflowType = "IwfWorkflowType" + SearchAttributeGlobalVersion = "GlobalWorkflowVersion" + SearchAttributeExecutingStateIds = "ExecutingStateIds" + SearchAttributeIwfWorkflowType = "IwfWorkflowType" ) type BackendType string diff --git a/service/interpreter/stateExecutingManager.go b/service/interpreter/stateExecutingManager.go new file mode 100644 index 00000000..b11c9e7d --- /dev/null +++ b/service/interpreter/stateExecutingManager.go @@ -0,0 +1,63 @@ +package interpreter + +import ( + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +type stateExecutingManager struct { + ctx UnifiedContext + provider WorkflowProvider + + stateIdCount map[string]int + totalExecutingCount int +} + +func newStateExecutingManager(ctx UnifiedContext, provider WorkflowProvider) *stateExecutingManager { + return &stateExecutingManager{ + ctx: ctx, + provider: provider, + stateIdCount: map[string]int{}, + totalExecutingCount: 0, + } +} + +func (e *stateExecutingManager) startStates(states []iwfidl.StateMovement) error { + needsUpdate := false + for _, s := range states { + e.stateIdCount[s.StateId]++ + if e.stateIdCount[s.StateId] == 1 { + // first time the stateId show up + needsUpdate = true + } + } + e.totalExecutingCount += len(states) + if needsUpdate { + return e.updateSearchAttribute() + } + return nil +} + +func (e *stateExecutingManager) completeStates(state iwfidl.StateMovement) error { + e.stateIdCount[state.StateId]-- + e.totalExecutingCount -= 1 + if e.stateIdCount[state.StateId] == 0 { + delete(e.stateIdCount, state.StateId) + return e.updateSearchAttribute() + } + return nil +} + +func (e *stateExecutingManager) getTotalExecutingStates() int { + return e.totalExecutingCount +} + +func (e *stateExecutingManager) updateSearchAttribute() error { + var executingStateIds []string + for sid := range e.stateIdCount { + executingStateIds = append(executingStateIds, sid) + } + return e.provider.UpsertSearchAttributes(e.ctx, map[string]interface{}{ + service.SearchAttributeExecutingStateIds: executingStateIds, + }) +} diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index abc66d27..4362e728 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -53,12 +53,16 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic var errToFailWf error // TODO Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve. var outputsToReturnWf []iwfidl.StateCompletionOutput var forceCompleteWf bool - inFlightExecutingStateCount := 0 + stateExecutingMgr := newStateExecutingManager(ctx, provider) + //inFlightExecutingStateCount := 0 for len(currentStates) > 0 { // copy the whole slice(pointer) - inFlightExecutingStateCount += len(currentStates) statesToExecute := currentStates + err := stateExecutingMgr.startStates(currentStates) + if err != nil { + return nil, err + } //reset to empty slice since each iteration will process all current states in the queue currentStates = nil @@ -67,10 +71,6 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic // state must be passed via parameter https://stackoverflow.com/questions/67263092 stateCtx := provider.ExtendContextWithValue(ctx, "state", stateToExecute) provider.GoNamed(stateCtx, stateToExecute.GetStateId(), func(ctx UnifiedContext) { - defer func() { - inFlightExecutingStateCount-- - }() - state, ok := provider.GetContextValue(ctx, "state").(iwfidl.StateMovement) if !ok { errToFailWf = provider.NewApplicationError( @@ -79,6 +79,12 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic ) return } + defer func() { + err := stateExecutingMgr.completeStates(state) + if err != nil { + errToFailWf = err + } + }() stateExeId := stateExeIdMgr.IncAndGetNextExecutionId(state.GetStateId()) decision, err := executeState(ctx, provider, state, execution, stateExeId, attrMgr, interStateChannel) @@ -109,7 +115,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic } awaitError := provider.Await(ctx, func() bool { - return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || inFlightExecutingStateCount == 0 + return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || stateExecutingMgr.getTotalExecutingStates() == 0 }) if errToFailWf != nil || forceCompleteWf { return &service.InterpreterWorkflowOutput{