Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-163: Add retry policy to QueryWorkflow calls #491

Merged
merged 10 commits into from
Nov 26, 2024
5 changes: 3 additions & 2 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false)
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, &config.Api.QueryWorkflowFailedRetryPolicy)

for _, svcName := range services {
go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger)
Expand All @@ -146,7 +146,8 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Cadence because of error %v", err)
}
unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)

unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &config.Api.QueryWorkflowFailedRetryPolicy)

for _, svcName := range services {
go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger)
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ type (
OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"`
// WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows
WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"`
QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"`
}

QueryWorkflowFailedRetryPolicy struct {
// defaults to 1
InitialIntervalSeconds int `yaml:"initialIntervalSeconds"`
// defaults to 5
MaximumAttempts int `yaml:"maximumAttempts"`
}

WaitForStateCompletionMigration struct {
Expand Down
3 changes: 3 additions & 0 deletions config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
defaultWorkflowConfig:
continueAsNewThreshold: 100
Expand Down
3 changes: 3 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
temporal:
hostPort: localhost:7233
Expand Down
3 changes: 3 additions & 0 deletions config/development_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
# interpreterActivityConfig:
# disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
Expand Down
4 changes: 4 additions & 0 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
SignalWithStartOn: "old",
WaitForOn: "old",
},
QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 1,
MaximumAttempts: 5,
},
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
2 changes: 0 additions & 2 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 2)

// workflow shouldn't executed any state
var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
Expand Down
3 changes: 0 additions & 3 deletions integ/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
panicAtHttpError(err, httpResp)

// test update config
time.Sleep(time.Second)
var debugDump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
Expand All @@ -134,7 +133,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second)
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
Expand All @@ -154,7 +152,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second)
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
Expand Down
6 changes: 0 additions & 6 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 1)
timerInfos = service.GetCurrentTimerInfosQueryResponse{}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
Expand All @@ -139,15 +138,13 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
timer2.Status = service.TimerSkipped
assertTimerQueryResponseEqual(assertions, expectedTimerInfos, timerInfos)

time.Sleep(time.Second * 1)
httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{
WorkflowId: wfId,
WorkflowStateExecutionId: "S1-1",
TimerCommandIndex: iwfidl.PtrInt32(2),
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 1)
timerInfos = service.GetCurrentTimerInfosQueryResponse{}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
Expand Down Expand Up @@ -183,9 +180,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
}).Execute()
panicAtHttpError(err, httpResp)

if config != nil {
time.Sleep(time.Second * 2)
}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
Expand Down
18 changes: 12 additions & 6 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
if err != nil {
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(config), uclient, logger)

testCfg := createTestConfig(config)

uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
iwfService := api.NewService(testCfg, uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -122,7 +125,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
if *disableStickyCache {
interpreter.StartWithStickyCacheDisabledForTest()
} else {
Expand All @@ -144,8 +147,11 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
if err != nil {
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(config), uclient, logger)

testCfg := createTestConfig(config)

uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
iwfService := api.NewService(testCfg, uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -157,7 +163,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
if *disableStickyCache {
interpreter.StartWithStickyCacheDisabledForTest()
} else {
Expand Down
69 changes: 57 additions & 12 deletions service/client/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/indeedeng/iwf/config"
"time"

"github.com/indeedeng/iwf/service"
Expand All @@ -22,12 +23,18 @@ import (
"go.uber.org/cadence/encoded"
)

type QueryWorkflowFailedRetryPolicy struct {
InitialIntervalSeconds int
MaximumAttempts int
}

type cadenceClient struct {
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy
}

func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool {
Expand All @@ -52,6 +59,12 @@ func (t *cadenceClient) IsNotFoundError(err error) bool {
return ok
}

func (t *cadenceClient) isQueryFailedError(err error) bool {
var serviceError *shared.QueryFailedError
ok := errors.As(err, &serviceError)
return ok
}

func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool {
return realcadence.IsTimeoutError(err)
}
Expand Down Expand Up @@ -83,14 +96,29 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa

func NewCadenceClient(
domain string, cClient client.Client, serviceClient workflowserviceclient.Interface,
converter encoded.DataConverter, closeFunc func(),
converter encoded.DataConverter, closeFunc func(), retryPolicy *config.QueryWorkflowFailedRetryPolicy,
) uclient.UnifiedClient {
var rp QueryWorkflowFailedRetryPolicy

if retryPolicy.InitialIntervalSeconds == 0 {
rp.InitialIntervalSeconds = 1
} else {
rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds
}

if retryPolicy.MaximumAttempts == 0 {
rp.MaximumAttempts = 5
} else {
rp.MaximumAttempts = retryPolicy.MaximumAttempts
}

return &cadenceClient{
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
queryWorkflowFailedRetryPolicy: rp,
}
}

Expand Down Expand Up @@ -227,7 +255,24 @@ func (t *cadenceClient) ListWorkflow(
func (t *cadenceClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args)
var qres encoded.Value
var err error

attempt := 1
// Only QueryFailed error causes retry; all other errors make the loop to finish immediately
for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
qres, err = t.cClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err == nil {
break
} else {
if t.isQueryFailedError(err) {
time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second)
attempt++
continue
}
return err
}
}
if err != nil {
return err
}
Expand Down
66 changes: 56 additions & 10 deletions service/client/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/google/uuid"
"github.com/indeedeng/iwf/config"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
uclient "github.com/indeedeng/iwf/service/client"
Expand All @@ -20,23 +21,45 @@ import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
realtemporal "go.temporal.io/sdk/temporal"
"time"
)

type QueryWorkflowFailedRetryPolicy struct {
InitialIntervalSeconds int
MaximumAttempts int
}

type temporalClient struct {
tClient client.Client
namespace string
dataConverter converter.DataConverter
memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045
tClient client.Client
namespace string
dataConverter converter.DataConverter
memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045
queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy
}

func NewTemporalClient(
tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool,
tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy *config.QueryWorkflowFailedRetryPolicy,
) uclient.UnifiedClient {
var rp QueryWorkflowFailedRetryPolicy

if retryPolicy.InitialIntervalSeconds == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, not providing Config.ApiConfig.QueryWorkflowFailedRetryPolicy in yaml file will make the default int value show (which is 0). In other words, we shouldn't need a nil check here

rp.InitialIntervalSeconds = 1
} else {
rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds
}

if retryPolicy.MaximumAttempts == 0 {
rp.MaximumAttempts = 5
} else {
rp.MaximumAttempts = retryPolicy.MaximumAttempts
}

return &temporalClient{
tClient: tClient,
namespace: namespace,
dataConverter: dataConverter,
memoEncryption: memoEncryption,
tClient: tClient,
namespace: namespace,
dataConverter: dataConverter,
memoEncryption: memoEncryption,
queryWorkflowFailedRetryPolicy: rp,
}
}

Expand Down Expand Up @@ -71,6 +94,12 @@ func (t *temporalClient) IsNotFoundError(err error) bool {
return ok
}

func (t *temporalClient) isQueryFailedError(err error) bool {
var serviceError *serviceerror.QueryFailed
ok := errors.As(err, &serviceError)
return ok
}

func (t *temporalClient) IsRequestTimeoutError(err error) bool {
var deadlineExceeded *serviceerror.DeadlineExceeded
ok := errors.As(err, &deadlineExceeded)
Expand Down Expand Up @@ -257,7 +286,24 @@ func (t *temporalClient) ListWorkflow(
func (t *temporalClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
var qres converter.EncodedValue
var err error

attempt := 1
// Only QueryFailed error causes retry; all other errors make the loop to finish immediately
for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err == nil {
break
} else {
if t.isQueryFailedError(err) {
time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second)
attempt++
continue
}
return err
}
}
if err != nil {
return err
}
Expand Down
Loading
Loading