Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add any command combination completed decider trigger type #135

Merged
merged 6 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,19 @@ requested `Commands` are completed, `decide` API will be triggered. The number o
Application can start a workflow instance with a `workflowId` for any workflow definition. A workflow instance is called `WorkflowExecution`.
iWF server returns `runId` of UUID as the identifier of the WorkflowExecution. The runId is globally unique.

WorkflowId uniqueness: At anytime, there must be at most one WorkflowExecution running with the same workflowId. However,
after a previous WorkflowExecution finished running (in any closed status),
application may start a new WorkflowExecutions with the same workflowId using appropriate `IdReusePolicy`.
:warning: Note:
> Depends on the context, the only word `workflow` may mean WorkflowExecution(most commonly), WorkflowDefinition or both.

There must be at least one WorkflowState being executed for a running WorkflowExecution. The instance of WorkflowState is called `StateExecution`.
For a running WorkflowExecution, there must be at least one `WorkflowState` being executed, otherwise the workflow execution will complete.
An execution instance of WorkflowState is called `StateExecution`, which by identified `StateExecutionId`. A `StateExecutionId` is formatted
as `<StateId>-<Number>`. `StateId` is defined by workflow state definition, while `Number` is how many times this `StateId` has been executed.
StateExecutionId is only unique within the workflow execution.


WorkflowId uniqueness and reuse: For the same workflowId, there must be at most one WorkflowExecution running at anytime. However,
after a previous WorkflowExecution finished running (in any closed status),
application may start a new WorkflowExecution with the same workflowId using appropriate `IdReusePolicy`.

:warning: Note:
> Depends on the context, the only word `workflow` may mean WorkflowExecution(most commonly), WorkflowDefinition or both.

### Commands
These are the three command types:
Expand All @@ -122,7 +127,7 @@ These are the three command types:
Note that `start` API can return multiple commands, and choose different DeciderTriggerType for triggering decide API:
* `AllCommandCompleted`: this will wait for all command completed
* `AnyCommandCompleted`: this will wait for any command completed
* `AnyCommandCombinationCompleted`: (WIP) this will wait for a list of command combinations on any combination completed
* `AnyCommandCombinationCompleted`: this will wait for a list of command combinations on any combination completed

### Persistence
iWF provides super simple persistence abstraction. Developers don't need to touch any database system to register/maintain the schemas.
Expand Down Expand Up @@ -398,7 +403,7 @@ When something goes wrong in your applications, here are the tips:

### 1.2
- [x] Skip timer API for testing/operation
- [ ] Decider trigger type: any command combination
- [x] Decider trigger type: any command combination

### Future
- [ ] Auto continueAsNew([WIP](https://github.com/indeedeng/iwf/issues/107))
Expand Down
150 changes: 150 additions & 0 deletions integ/any_command_combination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package integ

import (
"context"
"encoding/json"
"github.com/indeedeng/iwf/gen/iwfidl"
anycommandconbination "github.com/indeedeng/iwf/integ/workflow/any_command_combination"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)

func TestAnyCommandCombinationWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal)
time.Sleep(time.Millisecond * time.Duration(*repeatInterval))
}
}

func TestAnyCommandCombinationWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence)
time.Sleep(time.Millisecond * time.Duration(*repeatInterval))
}
}

func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.BackendType) {
assertions := assert.New(t)
// start test workflow server
wfHandler := anycommandconbination.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(backendType)
defer closeFunc2()

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := anycommandconbination.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: anycommandconbination.WorkflowType,
WorkflowTimeoutSeconds: 10,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: anycommandconbination.State1,
}).Execute()
panicAtHttpError(err, httpResp)

signalValue := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("test-data-1"),
}

// send the signal
req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId1,
SignalValue: &signalValue,
}).Execute()
panicAtHttpError(err, httpResp)

// skip the timer
time.Sleep(time.Second * 2) // wait for a second so that timer is ready to be skipped
req3 := apiClient.DefaultApi.ApiV1WorkflowTimerSkipPost(context.Background())
httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{
WorkflowId: wfId,
WorkflowStateExecutionId: "S1-1",
TimerCommandId: iwfidl.PtrString(anycommandconbination.TimerId1),
}).Execute()
panicAtHttpError(err, httpResp)

// now it should be running at S2
// Future: we can check it is already done S1

// send first signal for s2
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId1,
SignalValue: &signalValue,
}).Execute()
panicAtHttpError(err, httpResp)

// wait and check the workflow, it should be still running
time.Sleep(time.Second)
reqDesc := apiClient.DefaultApi.ApiV1WorkflowGetPost(context.Background())
descResp, httpResp, err := reqDesc.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)
assertions.Equal(iwfidl.RUNNING, descResp.GetWorkflowStatus())

// send 2nd signal for s2
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandconbination.SignalNameAndId2,
SignalValue: &signalValue,
}).Execute()
panicAtHttpError(err, httpResp)

// workflow should be completed now
time.Sleep(time.Second)
descResp, httpResp, err = reqDesc.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)
assertions.Equal(iwfidl.COMPLETED, descResp.GetWorkflowStatus())

history, data := wfHandler.GetTestResult()

assertions.Equalf(map[string]int64{
"S1_start": 2,
"S1_decide": 1,
"S2_start": 2,
"S2_decide": 1,
}, history, "anycommandconbination test fail, %v", history)

var s1CommandResults iwfidl.CommandResults
var s2CommandResults iwfidl.CommandResults
s1ResultJsonStr := "{\"signalResults\":[{\"commandId\":\"test-signal-name1\",\"signalChannelName\":\"test-signal-name1\",\"signalRequestStatus\":\"RECEIVED\",\"signalValue\":{\"data\":\"test-data-1\",\"encoding\":\"json\"}}, {\"commandId\":\"test-signal-name2\",\"signalChannelName\":\"test-signal-name2\",\"signalRequestStatus\":\"WAITING\"}],\"timerResults\":[{\"commandId\":\"test-timer-1\",\"timerStatus\":\"FIRED\"}]}"
err = json.Unmarshal([]byte(s1ResultJsonStr), &s1CommandResults)
if err != nil {
panic(err)
}
s2ResultsJsonStr := "{\"signalResults\":[{\"commandId\":\"test-signal-name1\",\"signalChannelName\":\"test-signal-name1\",\"signalRequestStatus\":\"RECEIVED\",\"signalValue\":{\"data\":\"test-data-1\",\"encoding\":\"json\"}}, {\"commandId\":\"test-signal-name2\",\"signalChannelName\":\"test-signal-name2\",\"signalRequestStatus\":\"RECEIVED\",\"signalValue\":{\"data\":\"test-data-1\",\"encoding\":\"json\"}}],\"timerResults\":[{\"commandId\":\"test-timer-1\",\"timerStatus\":\"SCHEDULED\"}]}"
err = json.Unmarshal([]byte(s2ResultsJsonStr), &s2CommandResults)
if err != nil {
panic(err)
}
expectedData := map[string]interface{}{
"s1_commandResults": s1CommandResults,
"s2_commandResults": s2CommandResults,
}
assertions.Equal(expectedData, data)
}
204 changes: 204 additions & 0 deletions integ/workflow/any_command_combination/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package anycommandcombination

import (
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/common"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
"time"
)

const (
WorkflowType = "any_command_combination"
State1 = "S1"
State2 = "S2"
TimerId1 = "test-timer-1"
SignalNameAndId1 = "test-signal-name1"
SignalNameAndId2 = "test-signal-name2"
)

type handler struct {
invokeHistory map[string]int64
invokeData map[string]interface{}
//we want to confirm that the interpreter workflow activity will fail when the commandId is empty with ANY_COMMAND_COMBINATION_COMPLETED
hasS1RetriedForInvalidCommandId bool
hasS2RetriedForInvalidCommandId bool
}

func NewHandler() common.WorkflowHandler {
return &handler{
invokeHistory: make(map[string]int64),
invokeData: make(map[string]interface{}),
hasS1RetriedForInvalidCommandId: false,
hasS2RetriedForInvalidCommandId: false,
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
var req iwfidl.WorkflowStateStartRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state start request, ", req)

invalidTimerCommands := []iwfidl.TimerCommand{
{
CommandId: "",
FiringUnixTimestampSeconds: time.Now().Unix() + 86400*365, // one year later
},
}
validTimerCommands := []iwfidl.TimerCommand{
{
CommandId: TimerId1,
FiringUnixTimestampSeconds: time.Now().Unix() + 86400*365, // one year later
},
}
invalidSignalCommands := []iwfidl.SignalCommand{
{
CommandId: "",
SignalChannelName: SignalNameAndId1,
},
{
CommandId: SignalNameAndId2,
SignalChannelName: SignalNameAndId2,
},
}
validSignalCommands := []iwfidl.SignalCommand{
{
CommandId: SignalNameAndId1,
SignalChannelName: SignalNameAndId1,
},
{
CommandId: SignalNameAndId2,
SignalChannelName: SignalNameAndId2,
},
}

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {
if h.hasS1RetriedForInvalidCommandId {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: validSignalCommands,
TimerCommands: validTimerCommands,
DeciderTriggerType: iwfidl.ANY_COMMAND_COMBINATION_COMPLETED,
CommandCombinations: []iwfidl.CommandCombination{
{
CommandIds: []string{
TimerId1, SignalNameAndId1,
},
},
{
CommandIds: []string{
TimerId1, SignalNameAndId1, SignalNameAndId2,
},
},
},
},
}

c.JSON(http.StatusOK, startResp)
} else {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: validSignalCommands,
TimerCommands: invalidTimerCommands,
DeciderTriggerType: iwfidl.ANY_COMMAND_COMBINATION_COMPLETED,
},
}
h.hasS1RetriedForInvalidCommandId = true
c.JSON(http.StatusOK, startResp)
}
return
}
if req.GetWorkflowStateId() == State2 {
if h.hasS2RetriedForInvalidCommandId {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: validSignalCommands,
TimerCommands: validTimerCommands,
DeciderTriggerType: iwfidl.ANY_COMMAND_COMBINATION_COMPLETED,
CommandCombinations: []iwfidl.CommandCombination{
{
CommandIds: []string{
SignalNameAndId2, SignalNameAndId1,
},
},
{
CommandIds: []string{
TimerId1, SignalNameAndId1, SignalNameAndId2,
},
},
},
},
}

c.JSON(http.StatusOK, startResp)
} else {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: invalidSignalCommands,
TimerCommands: validTimerCommands,
DeciderTriggerType: iwfidl.ANY_COMMAND_COMBINATION_COMPLETED,
},
}
h.hasS2RetriedForInvalidCommandId = true
c.JSON(http.StatusOK, startResp)
}
return
}
}

panic("invalid workflow type")
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
panic(err)
}
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
h.invokeData["s1_commandResults"] = req.GetCommandResults()

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
},
},
},
})
return
} else if req.GetWorkflowStateId() == State2 {
h.invokeData["s2_commandResults"] = req.GetCommandResults()

// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
},
},
},
})
return
}
}

panic("invalid workflow type")
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, h.invokeData
}
Loading