Skip to content

Commit

Permalink
Auto ContinueAsNew part 2-1: Implement query handler for interStateCh…
Browse files Browse the repository at this point in the history
…annel (#132)
  • Loading branch information
longquanzheng authored Jan 5, 2023
1 parent fdea7c9 commit 18c562b
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 28 deletions.
43 changes: 37 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Related projects:
- [Search Attribute](#search-attribute)
- [Versioning and change compatibility](#versioning-and-change-compatibility)
- [Parallel execution with synchronization](#parallel-execution-with-synchronization)
- [ContinueAsNew](#continueasnew)
- [Non-workflow code](#non-workflow-code)
- [Anything else](#anything-else)
- [Monitoring and Operations](#monitoring-and-operations)
Expand All @@ -53,6 +54,7 @@ Related projects:
- [Development Plan](#development-plan)
- [Some history](#some-history)
- [Contribution](CONTRIBUTING.md)
- [Posts & Articles](#posts--articles)

# Community & Help
* [Slack Channels](https://iworkflow-slack.work/)
Expand All @@ -74,6 +76,8 @@ the application workflow workers. Internally, the two APIs are executed by Caden

![architecture diagram](https://user-images.githubusercontent.com/4523955/207514928-56fea636-c711-4f20-9e90-94ddd1c9844d.png)

* See [Design doc](https://docs.google.com/document/d/1BpJuHf67ibaOWmN_uWw_pbrBVyb6U1PILXyzohxA5Ms/edit) for more details.

## Basic Concepts

### Workflow and WorkflowState definition
Expand Down Expand Up @@ -164,8 +168,7 @@ Client APIs are hosted by iWF server for user workflow application to interact w
# Why iWF

## If you are familiar with Cadence/Temporal
* See [Slide deck](https://docs.google.com/presentation/d/1CpsroSf6NeVce_XyUhFTkd9bLHN8UHRtM9NavPCMhj8/edit#slide=id.gfe2f455492_0_56) for what problems it is solving
* See [Design doc](https://docs.google.com/document/d/1BpJuHf67ibaOWmN_uWw_pbrBVyb6U1PILXyzohxA5Ms/edit) for how it works
* See [Slide deck](https://docs.google.com/presentation/d/1CpsroSf6NeVce_XyUhFTkd9bLHN8UHRtM9NavPCMhj8/edit#slide=id.gfe2f455492_0_56) for comparison with Cadence/Temporal.

## If you are not
* Check out this [doc](https://docs.google.com/document/d/1zyCKvy4S2l7XBVJzZuS65OIsqV9CRPPYJY3OBbuWrPE) to understand some history
Expand Down Expand Up @@ -281,6 +284,26 @@ Some notes:
2) Because of above, there could be zero, or more than one state completing with data as workflow results.
3) To get multiple state results from a workflow execution, use the special API `getComplexWorkflowResult` of client API.

## ContinueAsNew

There is on ContinueAsNew API exposed to user workflow!
ContinueAsNew of Cadence/Temporal is a purely leaked technical details. It's due to the replay model conflicting with the underlying storage limit/performance.
As iWF is built on Cadence/Temporal, it will be implemented in a way that is transparent to user workflows.

Internally the interpreter workflow can continueAsNew without letting iWF user workflow to know. This is called "auto continueAsNew" --

After exceeding the history threshold(defined by numOfStateExecutionCompleted) auto continueAsNew,
AutoContinueAsNew will carry over the pending states, along with all the internal states like DataObjects, interStateChannels, searchAttributes.

_This feature is WIP._ See [this issue](https://github.com/indeedeng/iwf/issues/107) for progress.

For now, the recommended workaround is to do a manual "continueAsNew" by using "TerminateIfRunning" IdReusePolicy to start a new
workflow execution with the same workflowId. This is almost same as Cadence/Temporal's ContinueAsNew API(an atomic operation), except for some minor difference:
* It won't carry over Search Attributes automatically. You have to carry over them using initial search attributes by WorkflowOptions
* The old execution will be terminated, rather than in "ContinuedAsNew" status

Same as using Cadence/Temporal's ContinueAsNew API, user must ensure all the signals are drained otherwise signals could be lost.

## Non-workflow code
Check [Client APIs](#client-apis) for all the APIs that are equivalent to Cadence/Temporal client APIs.

Expand All @@ -298,7 +321,6 @@ So what about something else like:
* Timeout and backoff retry: State start/decide APIs have default timeout and infinite backoff retry. You can customize in StateOptions.
* ChildWorkflow can be replaced with regular workflow + signal. See this [StackOverflow](https://stackoverflow.com/questions/74494134/should-i-use-child-workflow-or-use-activity-to-start-new-workflow) for why.
* SignalWithStart: Use start + signal API will be the same except for more exception handling work. We have seen a lot of people don't know how to use it correctly in Cadence/Temporal. We will consider provide it in a better way in the future.
* ContinueAsNew: this is missing in iWF for now. But as the philosophy of hiding internal details, we will implement it in a way that is transparent to user workflows. Internally the interpreter workflow can continueAsNew without letting iWF user workflow to know.
* Long-running activity with stateful recovery(heartbeat details): this is indeed a good one that we want to add. But we don't see Cadence/Temporal activity is very commonly used yet. Please leave your message if you are in a need.

If you believe there is something else you really need, open a [ticket](https://github.com/indeedeng/iwf/issues) or join us in the [discussion](https://github.com/indeedeng/iwf/discussions).
Expand Down Expand Up @@ -355,12 +377,15 @@ When something goes wrong in your applications, here are the tips:
- [x] More Search attribute types: Datetime, double, bool, keyword array, text
- [x] More workflow start options: initial search attributes

### 1.2
- [ ] Auto continueAsNew(WIP)
- [ ] Skip timer API for testing/operation
- [ ] Decider trigger type: any command combination

### Future
- [ ] Auto ContinueAsNew
- [ ] WaitForMoreResults in StateDecision
- [ ] Skip timer API for testing/operation
- [ ] LongRunningActivityCommand
- [ ] Decider trigger type: AnyCommandClosed
- [ ] More Decider trigger type
- [ ] Failing workflow details
- [ ] StateOption.PersistenceLoadingPolicy: LOAD_ALL_WITH_EXCLUSIVE_LOCK and LOAD_PARTIAL_WITH_EXCLUSIVE_LOCK

Expand All @@ -374,3 +399,9 @@ and provide clean and simple API to use.
Read this [doc](https://docs.google.com/document/d/1zyCKvy4S2l7XBVJzZuS65OIsqV9CRPPYJY3OBbuWrPE) for more.

<img width="916" alt="history diagram" src="https://user-images.githubusercontent.com/4523955/201188875-32e1d070-ab53-4ac5-92fd-bb8ed16dd7dc.png">

# Posts & Articles
* [A Letter to Cadence/Temporal, and Workflow Tech Community](https://medium.com/@qlong/a-letter-to-cadence-temporal-and-workflow-tech-community-b32e9fa97a0c)
* [iWF vs Cadence/Temporal](https://medium.com/@qlong/iwf-vs-cadence-temporal-1e11b35960fe)
* [iWF vs other general purposed workflow Engines](https://medium.com/@qlong/iwf-vs-other-general-purposed-workflow-engines-f8f3e3d8993d)
* [Cadence® iWF](https://www.instaclustr.com/blog/cadence-iwf/?utm_content=1669999382&utm_medium=linkedin&utm_source=organicsocial)
5 changes: 3 additions & 2 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const (
StateStartApi = "/api/v1/workflowState/start"
StateDecideApi = "/api/v1/workflowState/decide"

GetDataObjectsWorkflowQueryType = "GetDataObjects"
GetDataObjectsWorkflowQueryType = "GetDataObjects"
GetInterStateChannelDataQueryType = "GetInterStateChannelData"

WorkflowErrorTypeUserWorkflowDecision = "UserWorkflowDecision"
WorkflowErrorTypeUserWorkflowError = "UserWorkflowError"
Expand All @@ -25,4 +26,4 @@ const (

BackendTypeCadence BackendType = "cadence"
BackendTypeTemporal BackendType = "temporal"
)
)
11 changes: 11 additions & 0 deletions service/interpreter/InterStateChannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ func RebuildInterStateChannel(refill map[string][]*iwfidl.EncodedObject) *InterS
}
}

func (i *InterStateChannel) ReadData(channelNames []string) map[string][]*iwfidl.EncodedObject {
if len(channelNames) == 0 {
return i.receivedData
}
data := make(map[string][]*iwfidl.EncodedObject)
for _, n := range channelNames {
data[n] = i.receivedData[n]
}
return data
}

func (i *InterStateChannel) HasData(channelName string) bool {
l := i.receivedData[channelName]
return len(l) > 0
Expand Down
16 changes: 16 additions & 0 deletions service/interpreter/continue_as_new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package interpreter

import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
)

func setQueryHandlersForContinueAsNew(ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel) error {
err := provider.SetQueryHandler(ctx, service.GetInterStateChannelDataQueryType, func(channelNames []string) (map[string][]*iwfidl.EncodedObject, error) {
return interStateChannel.ReadData(channelNames), nil
})
if err != nil {
return err
}
return nil
}
45 changes: 25 additions & 20 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
StartedTimestamp: provider.GetWorkflowInfo(ctx).WorkflowStartTime.Unix(),
}
interStateChannel := NewInterStateChannel()
currentStates := []iwfidl.StateMovement{
statesToExecuteQueue := []iwfidl.StateMovement{
{
StateId: input.StartStateId,
StateOptions: &input.StateOptions,
Expand All @@ -48,22 +48,25 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
if err != nil {
return nil, err
}
err = setQueryHandlersForContinueAsNew(ctx, provider, interStateChannel)
if err != nil {
return nil, err
}

var errToFailWf error // TODO Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve.
var errToFailWf error // Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve.
var outputsToReturnWf []iwfidl.StateCompletionOutput
var forceCompleteWf bool
stateExecutionMgr := newStateExecutionManager(ctx, provider)
//inFlightExecutingStateCount := 0

for len(currentStates) > 0 {
for len(statesToExecuteQueue) > 0 {
// copy the whole slice(pointer)
statesToExecute := currentStates
err := stateExecutionMgr.markStatesPending(currentStates)
statesToExecute := statesToExecuteQueue
err := stateExecutionMgr.markStatesPending(statesToExecuteQueue)
if err != nil {
return nil, err
}
//reset to empty slice since each iteration will process all current states in the queue
currentStates = nil
statesToExecuteQueue = nil

for _, stateToExecute := range statesToExecute {
// execute in another thread for parallelism
Expand All @@ -78,16 +81,6 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
)
return
}
manualDeferFn := func() {
// using defer will cause https://github.com/uber-go/cadence-client/issues/1198 in Cadence
// so we use manual defer here...
// NOTE: must execute this in every place when return...
// TODO be extremely careful in this piece of code, and remove this hack when the bug is fixed in Cadence client
err := stateExecutionMgr.markStateCompleted(state)
if err != nil {
errToFailWf = err
}
}

stateExeId := stateExecutionMgr.createNextExecutionId(state.GetStateId())
decision, err := executeState(ctx, provider, state, execution, stateExeId, persistenceManager, interStateChannel)
Expand All @@ -112,14 +105,26 @@ func InterpreterImpl(ctx UnifiedContext, provider WorkflowProvider, input servic
)
}
if !shouldClose && decision.HasNextStates() {
currentStates = append(currentStates, decision.GetNextStates()...)
statesToExecuteQueue = append(statesToExecuteQueue, decision.GetNextStates()...)
}

// finally, mark state completed and may also update system search attribute(IwfExecutingStateIds)
err = stateExecutionMgr.markStateCompleted(state)
if err != nil {
errToFailWf = err
}
manualDeferFn()
})
}

// The conditions here are quite tricky:
// For len(statesToExecuteQueue) > 0: First of all, we need some condition to wait here because all the stateToExecute are running in different thread.
// Right after the stateToExecute are pop from queue, the len(...) becomes zero. So when the len(...) >0, it means there are new states to execute pushed into the queue,
// and it's time to wake up the outer loop to go to next iteration. Alternatively, using totalPendingCount == 0 will work, but not as efficient as this one --
// it will wait for all current pending to complete will take much longer time
// For errToFailWf != nil || forceCompleteWf: this means we need to close workflow immediately
// For stateExecutionMgr.getTotalPendingStates() == 0: this means all the state executions have reach "Dead Ends" so the workflow can complete gracefully without output
awaitError := provider.Await(ctx, func() bool {
return len(currentStates) > 0 || errToFailWf != nil || forceCompleteWf || stateExecutionMgr.getTotalPendingStates() == 0
return len(statesToExecuteQueue) > 0 || errToFailWf != nil || forceCompleteWf || stateExecutionMgr.getTotalPendingStates() == 0
})
if errToFailWf != nil || forceCompleteWf {
return &service.InterpreterWorkflowOutput{
Expand Down

0 comments on commit 18c562b

Please sign in to comment.