Skip to content

Commit

Permalink
Implement AnyCommandCompleted decider trigger type (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Oct 30, 2022
1 parent 6a5ddf5 commit 59aa576
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 18 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ tctl adm cl asa -n CustomIntField -t Int
## 1.1
- [x] Reset workflow API (Cadence only, TODO for Temporal)
- [x] Command type(s) for inter-state communications (e.g. internal channel)
- [x] AnyCommandCompleted Decider trigger type
- [ ] More workflow start options: IdReusePolicy, initial earch attributes, cron schedule, retry, etc
- [ ] StateOption: Start/Decide API timeout and retry
- [ ] Reset workflow by stateId

## 1.2
- [ ] AnyCommandCompleted Decider trigger type and WaitForMoreResults in StateDecision
- [ ] Decider trigger types: AnyCommandClosed
- [ ] Decider trigger type: AnyCommandClosed
- [ ] WaitForMoreResults in StateDecision
- [ ] Skip timer API for testing/operation
- [ ] LongRunningActivityCommand
- [ ] Failing workflow details
Expand Down
1 change: 1 addition & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ components:
deciderTriggerType:
enum:
- ALL_COMMAND_COMPLETED
- ANY_COMMAND_COMPLETED
type: string
timerCommands:
items:
Expand Down
87 changes: 87 additions & 0 deletions integ/any_command_close_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/gen/iwfidl"
anycommandclose "github.com/indeedeng/iwf/integ/workflow/any_command_close"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)

func TestAnyCommandCloseWorkflowTemporal(t *testing.T) {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal)
}

func TestAnyCommandCloseWorkflowCadence(t *testing.T) {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence)
}

func doTestAnyCommandCloseWorkflow(t *testing.T, backendType service.BackendType) {
// start test workflow server
wfHandler := anycommandclose.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(backendType)
defer closeFunc2()

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := anycommandclose.WorkflowType + strconv.Itoa(int(time.Now().Unix()))
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: anycommandclose.WorkflowType,
WorkflowTimeoutSeconds: 10,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: anycommandclose.State1,
}).Execute()
panicAtHttpError(err, httpResp)

signalValue := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("test-data-1"),
}

req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
httpResp, err = req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: anycommandclose.SignalName2,
SignalValue: &signalValue,
}).Execute()
panicAtHttpError(err, httpResp)

// wait for the workflow
reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp, err = reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

history, data := wfHandler.GetTestResult()
assertions := assert.New(t)
assertions.Equalf(map[string]int64{
"S1_start": 1,
"S1_decide": 1,
"S2_start": 1,
"S2_decide": 1,
}, history, "anycommandclose test fail, %v", history)

assertions.Equal(anycommandclose.SignalName2, data["signalChannelName1"])
assertions.Equal("signal-cmd-id2", data["signalCommandId1"])
assertions.Equal(signalValue, data["signalValue1"])
assertions.Equal(service.SignalStatusReceived, data["signalStatus1"])

assertions.Equal(anycommandclose.SignalName1, data["signalChannelName0"])
assertions.Equal("signal-cmd-id1", data["signalCommandId0"])
assertions.Equal(service.SignalStatusWaiting, data["signalStatus0"])
}
9 changes: 9 additions & 0 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,12 @@ func startIwfService(backendType service.BackendType) (closeFunc func()) {
panic("not supported backend type " + backendType)
}
}

func panicAtHttpError(err error, httpResp *http.Response) {
if err != nil {
panic(err)
}
if httpResp.StatusCode != http.StatusOK {
panic("Status not success" + httpResp.Status)
}
}
127 changes: 127 additions & 0 deletions integ/workflow/any_command_close/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package anycommandclose

import (
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/common"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
)

const (
WorkflowType = "any_command_close"
State1 = "S1"
State2 = "S2"
SignalName1 = "test-signal-name1"
SignalName2 = "test-signal-name2"
)

type handler struct {
invokeHistory map[string]int64
invokeData map[string]interface{}
}

func NewHandler() common.WorkflowHandler {
return &handler{
invokeHistory: make(map[string]int64),
invokeData: make(map[string]interface{}),
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
var req iwfidl.WorkflowStateStartRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state start request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
{
CommandId: "signal-cmd-id1",
SignalChannelName: SignalName1,
},
{
CommandId: "signal-cmd-id2",
SignalChannelName: SignalName2,
},
},
DeciderTriggerType: service.DeciderTypeAnyCommandCompleted,
},
})
return
}
if req.GetWorkflowStateId() == State2 {
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: service.DeciderTypeAllCommandCompleted,
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
signalResults := req.GetCommandResults()
h.invokeData["signalCommandResultsLength"] = len(signalResults.SignalResults)

h.invokeData["signalChannelName0"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId0"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus0"] = signalResults.SignalResults[0].GetSignalRequestStatus()

h.invokeData["signalChannelName1"] = signalResults.SignalResults[1].GetSignalChannelName()
h.invokeData["signalCommandId1"] = signalResults.SignalResults[1].GetCommandId()
h.invokeData["signalStatus1"] = signalResults.SignalResults[1].GetSignalRequestStatus()
h.invokeData["signalValue1"] = signalResults.SignalResults[1].GetSignalValue()

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
},
},
},
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
},
},
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, h.invokeData
}
2 changes: 1 addition & 1 deletion iwf-idl
1 change: 1 addition & 0 deletions service/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
StateDecideApi = "/api/v1/workflowState/decide"

DeciderTypeAllCommandCompleted = "ALL_COMMAND_COMPLETED"
DeciderTypeAnyCommandCompleted = "ANY_COMMAND_COMPLETED"

TimerStatusFired = "FIRED"
TimerStatusScheduled = "SCHEDULED"
Expand Down
56 changes: 41 additions & 15 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func executeState(

commandReq := startResponse.GetCommandRequest()

completedTimerCmds := 0
completedTimerCmds := map[int]bool{}
if len(commandReq.GetTimerCommands()) > 0 {
for idx, cmd := range commandReq.GetTimerCommands() {
cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd)
Expand All @@ -213,12 +213,16 @@ func executeState(
if !ok {
panic("critical code bug")
}
idx, ok := provider.GetContextValue(ctx, "idx").(int)
if !ok {
panic("critical code bug")
}

now := provider.Now(ctx).Unix()
fireAt := cmd.GetFiringUnixTimestampSeconds()
duration := time.Duration(fireAt-now) * time.Second
_ = provider.Sleep(ctx, duration)
completedTimerCmds++
completedTimerCmds[idx] = true
})
}
}
Expand Down Expand Up @@ -273,26 +277,36 @@ func executeState(
// TODO process long running activity command

triggerType := commandReq.GetDeciderTriggerType()
if triggerType != service.DeciderTypeAllCommandCompleted {
if triggerType == service.DeciderTypeAllCommandCompleted {
err = provider.Await(ctx, func() bool {
return len(completedTimerCmds) == len(commandReq.GetTimerCommands()) &&
len(completedSignalCmds) == len(commandReq.GetSignalCommands()) &&
len(completedInterStateChannelCmds) == len(commandReq.GetInterStateChannelCommands())
})
} else if triggerType == service.DeciderTypeAnyCommandCompleted {
err = provider.Await(ctx, func() bool {
return len(completedTimerCmds)+
len(completedSignalCmds)+
len(completedInterStateChannelCmds) > 0
})
} else {
return nil, provider.NewApplicationError("unsupported decider trigger type", "unsupported", triggerType)
}

err = provider.Await(ctx, func() bool {
return completedTimerCmds == len(commandReq.GetTimerCommands()) &&
len(completedSignalCmds) == len(commandReq.GetSignalCommands()) &&
len(completedInterStateChannelCmds) == len(commandReq.GetInterStateChannelCommands())
})

if err != nil {
return nil, err
}
commandRes := &iwfidl.CommandResults{}
if len(commandReq.GetTimerCommands()) > 0 {
var timerResults []iwfidl.TimerResult
for _, cmd := range commandReq.GetTimerCommands() {
for idx, cmd := range commandReq.GetTimerCommands() {
status := service.TimerStatusFired
if !completedTimerCmds[idx] {
status = service.TimerStatusScheduled
}
timerResults = append(timerResults, iwfidl.TimerResult{
CommandId: cmd.GetCommandId(),
TimerStatus: service.TimerStatusFired,
TimerStatus: status,
})
}
commandRes.SetTimerResults(timerResults)
Expand All @@ -301,11 +315,17 @@ func executeState(
if len(commandReq.GetSignalCommands()) > 0 {
var signalResults []iwfidl.SignalResult
for idx, cmd := range commandReq.GetSignalCommands() {
status := service.SignalStatusReceived
result, completed := completedSignalCmds[idx]
if !completed {
status = service.SignalStatusWaiting
}

signalResults = append(signalResults, iwfidl.SignalResult{
CommandId: cmd.GetCommandId(),
SignalChannelName: cmd.GetSignalChannelName(),
SignalValue: completedSignalCmds[idx],
SignalRequestStatus: service.SignalStatusReceived,
SignalValue: result,
SignalRequestStatus: status,
})
}
commandRes.SetSignalResults(signalResults)
Expand All @@ -314,11 +334,17 @@ func executeState(
if len(commandReq.GetInterStateChannelCommands()) > 0 {
var interStateChannelResults []iwfidl.InterStateChannelResult
for idx, cmd := range commandReq.GetInterStateChannelCommands() {
status := service.InternStateChannelCommandReceived
result, completed := completedInterStateChannelCmds[idx]
if !completed {
status = service.InternStateChannelCommandStatusWaiting
}

interStateChannelResults = append(interStateChannelResults, iwfidl.InterStateChannelResult{
CommandId: cmd.CommandId,
RequestStatus: service.InternStateChannelCommandReceived,
ChannelName: cmd.ChannelName,
Value: completedInterStateChannelCmds[idx],
RequestStatus: status,
Value: result,
})
}
commandRes.SetInterStateChannelResults(interStateChannelResults)
Expand Down

0 comments on commit 59aa576

Please sign in to comment.