Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve waitForStateCompletion API reliability for signalWithStart order #354

Merged
merged 6 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/static.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ jobs:
uses: actions/checkout@v3
- name: golangci-lint by reviewdog
uses: reviewdog/action-golangci-lint@v2
with:
golangci_lint_flags: '--timeout=5m'
- name: Check code formatting using gofmt
uses: Jerome1337/gofmt-action@v1.0.5
uses: Jerome1337/gofmt-action@v1.0.5
30 changes: 8 additions & 22 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/common/compatibility"
"github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/rpc"
Expand All @@ -23,7 +22,9 @@ func StateStart(ctx context.Context, backendType service.BackendType, input serv
return StateApiWaitUntil(ctx, backendType, input)
}

func StateApiWaitUntil(ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput) (*iwfidl.WorkflowStateStartResponse, error) {
func StateApiWaitUntil(
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput,
) (*iwfidl.WorkflowStateStartResponse, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateStartActivity", "input", input)
Expand Down Expand Up @@ -62,33 +63,18 @@ func StateDecide(
backendType service.BackendType,
input service.StateDecideActivityInput,
shouldSendSignalOnCompletion bool,
timeout time.Duration) (*iwfidl.WorkflowStateDecideResponse, error) {
timeout time.Duration,
) (*iwfidl.WorkflowStateDecideResponse, error) {
return StateApiExecute(ctx, backendType, input, shouldSendSignalOnCompletion, timeout)
}

func StateApiExecute(
ctx context.Context,
backendType service.BackendType,
input service.StateDecideActivityInput,
shouldSendSignalOnCompletion bool,
timeout time.Duration) (*iwfidl.WorkflowStateDecideResponse, error) {
defer func() {
if shouldSendSignalOnCompletion {
unifiedCleint := env.GetUnifiedClient()
err := unifiedCleint.SignalWithStartWaitForStateCompletionWorkflow(
ctx, uclient.StartWorkflowOptions{
ID: service.IwfSystemConstPrefix + input.Request.Context.WorkflowId + "_" + *input.Request.Context.StateExecutionId,
TaskQueue: env.GetTaskQueue(),
WorkflowExecutionTimeout: 600 * time.Second, // TODO: make it configurable
},
iwfidl.StateCompletionOutput{
CompletedStateExecutionId: *input.Request.Context.StateExecutionId,
})
if err != nil {
getActivityProviderByType(backendType).GetLogger(ctx).Error("failed to signal on completion", "err", err)
}
}
}()
_ bool, // no used anymore, keep for compatibility
_ time.Duration, // no used anymore, keep for compatibility
) (*iwfidl.WorkflowStateDecideResponse, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateDecideActivity", "input", input)
Expand Down
12 changes: 11 additions & 1 deletion service/interpreter/cadence/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query
return workflow.SetQueryHandler(wfCtx, queryType, handler)
}

func (w *workflowProvider) SetRpcUpdateHandler(ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler) error {
func (w *workflowProvider) SetRpcUpdateHandler(
ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler,
) error {
// NOTE: this feature is not available in Cadence
return nil
}
Expand Down Expand Up @@ -192,6 +194,14 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time {
return workflow.Now(wfCtx)
}

func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to cadence workflow context")
}
return workflow.IsReplaying(wfCtx)
}

func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down
1 change: 1 addition & 0 deletions service/interpreter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type WorkflowProvider interface {
WithActivityOptions(ctx UnifiedContext, options ActivityOptions) UnifiedContext
ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future)
Now(ctx UnifiedContext) time.Time
IsReplaying(ctx UnifiedContext) bool
Sleep(ctx UnifiedContext, d time.Duration) (err error)
NewTimer(ctx UnifiedContext, d time.Duration) Future
GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
Expand Down
12 changes: 11 additions & 1 deletion service/interpreter/temporal/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query
return workflow.SetQueryHandler(wfCtx, queryType, handler)
}

func (w *workflowProvider) SetRpcUpdateHandler(ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler) error {
func (w *workflowProvider) SetRpcUpdateHandler(
ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler,
) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
Expand Down Expand Up @@ -241,6 +243,14 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}
return workflow.IsReplaying(wfCtx)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package interpreter

import (
"context"
"fmt"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/interpreter/env"
"time"

"github.com/indeedeng/iwf/service/common/compatibility"
Expand Down Expand Up @@ -732,6 +735,25 @@ func executeStateDecide(
},
}, shouldSendSignalOnCompletion, workflowTimeout).Get(ctx, &decideResponse)
persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy)
if err == nil && shouldSendSignalOnCompletion && !provider.IsReplaying(ctx) {
// NOTE: here uses NOT IsReplaying to signalWithStart, to save an activity for this operation
// this is not a problem because the signalWithStart will be very fast and highly available
unifiedClient := env.GetUnifiedClient()
err := unifiedClient.SignalWithStartWaitForStateCompletionWorkflow(
context.Background(),
uclient.StartWorkflowOptions{
ID: service.IwfSystemConstPrefix + executionContext.WorkflowId + "_" + *executionContext.StateExecutionId,
TaskQueue: env.GetTaskQueue(),
WorkflowExecutionTimeout: workflowTimeout,
},
iwfidl.StateCompletionOutput{
CompletedStateExecutionId: *executionContext.StateExecutionId,
})
if err != nil {
// for any reasons this fail, just panic and the workflow task will retry
panic(fmt.Errorf("failed to signal on completion %w", err))
}
}
if err != nil {
if shouldProceedOnExecuteApiError(state) {
return nil, service.ExecuteApiFailedAndProceed, nil
Expand Down