From 7feffc261646f9aeac4edb0ee8f05d712ac844c7 Mon Sep 17 00:00:00 2001 From: lwolczynski Date: Fri, 22 Nov 2024 16:08:48 -0600 Subject: [PATCH] IWF-163: Add retry policy to QueryWorkflow calls --- cmd/server/iwf/iwf.go | 14 ++++++-- config/config.go | 22 ++++++++++++ config/config_template.yaml | 3 ++ config/development.yaml | 3 ++ config/development_cadence.yaml | 3 ++ integ/config.go | 4 +++ integ/timer_test.go | 6 ---- integ/util.go | 22 ++++++++++-- service/client/cadence/client.go | 56 +++++++++++++++++++++++-------- service/client/temporal/client.go | 53 ++++++++++++++++++++++------- 10 files changed, 149 insertions(+), 37 deletions(-) diff --git a/cmd/server/iwf/iwf.go b/cmd/server/iwf/iwf.go index 7a865997..e9f9f6be 100644 --- a/cmd/server/iwf/iwf.go +++ b/cmd/server/iwf/iwf.go @@ -124,7 +124,11 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Temporal because of error %v", err) } - unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false) + retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: config.GetMaximumAttemptsWithDefault(), + } + unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, retryPolicy) for _, svcName := range services { go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger) @@ -146,7 +150,13 @@ func start(c *cli.Context) { if err != nil { rawLog.Fatalf("Unable to connect to Cadence because of error %v", err) } - unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) + + retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: config.GetMaximumAttemptsWithDefault(), + } + + unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy) for _, svcName := range services { go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger) diff --git a/config/config.go b/config/config.go index 6bdfacea..d83e8081 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,14 @@ type ( OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"` // WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"` + QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"` + } + + QueryWorkflowFailedRetryPolicy struct { + // defaults to 1 + InitialIntervalSeconds int `yaml:"initialIntervalSeconds"` + // defaults to 5 + MaximumAttempts int `yaml:"maximumAttempts"` } WaitForStateCompletionMigration struct { @@ -147,3 +155,17 @@ func (c Config) GetWaitForOnWithDefault() string { } return "old" } + +func (c Config) GetInitialIntervalSecondsWithDefault() int { + if c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds != 0 { + return c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds + } + return 1 +} + +func (c Config) GetMaximumAttemptsWithDefault() int { + if c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts != 0 { + return c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts + } + return 5 +} diff --git a/config/config_template.yaml b/config/config_template.yaml index ff8d2a3e..a6ba4a22 100644 --- a/config/config_template.yaml +++ b/config/config_template.yaml @@ -3,6 +3,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: defaultWorkflowConfig: continueAsNewThreshold: 100 diff --git a/config/development.yaml b/config/development.yaml index 3dbac3d2..a3b0817f 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -7,6 +7,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: temporal: hostPort: localhost:7233 diff --git a/config/development_cadence.yaml b/config/development_cadence.yaml index 977a9a10..f7858bec 100644 --- a/config/development_cadence.yaml +++ b/config/development_cadence.yaml @@ -3,6 +3,9 @@ api: waitForStateCompletionMigration: signalWithStartOn: old waitForOn: old + queryWorkflowFailedRetryPolicy: + initialIntervalSeconds: 1 + maximumAttempts: 5 interpreter: # interpreterActivityConfig: # disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085 diff --git a/integ/config.go b/integ/config.go index 91258333..e9afbba8 100644 --- a/integ/config.go +++ b/integ/config.go @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config { SignalWithStartOn: "old", WaitForOn: "old", }, + QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: 1, + MaximumAttempts: 5, + }, }, Interpreter: config.Interpreter{ VerboseDebug: false, diff --git a/integ/timer_test.go b/integ/timer_test.go index e62dd843..3c7ab159 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -130,7 +130,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second * 1) timerInfos = service.GetCurrentTimerInfosQueryResponse{} err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { @@ -139,7 +138,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * timer2.Status = service.TimerSkipped assertTimerQueryResponseEqual(assertions, expectedTimerInfos, timerInfos) - time.Sleep(time.Second * 1) httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{ WorkflowId: wfId, WorkflowStateExecutionId: "S1-1", @@ -147,7 +145,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - time.Sleep(time.Second * 1) timerInfos = service.GetCurrentTimerInfosQueryResponse{} err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { @@ -183,9 +180,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config * }).Execute() panicAtHttpError(err, httpResp) - if config != nil { - time.Sleep(time.Second * 2) - } err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType) if err != nil { log.Fatalf("Fail to invoke query %v", err) diff --git a/integ/util.go b/integ/util.go index 8affc720..7bd64bdc 100644 --- a/integ/util.go +++ b/integ/util.go @@ -109,8 +109,16 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U if err != nil { panic(err) } - uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption) - iwfService := api.NewService(createTestConfig(config), uclient, logger) + + testCfg := createTestConfig(config) + + retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(), + } + + uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, retryPolicy) + iwfService := api.NewService(testCfg, uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, @@ -144,7 +152,15 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U if err != nil { panic(err) } - uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc) + + testCfg := createTestConfig(config) + + retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{ + InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(), + MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(), + } + + uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy) iwfService := api.NewService(createTestConfig(config), uclient, logger) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, diff --git a/service/client/cadence/client.go b/service/client/cadence/client.go index 2b2ca693..559573ad 100644 --- a/service/client/cadence/client.go +++ b/service/client/cadence/client.go @@ -22,12 +22,18 @@ import ( "go.uber.org/cadence/encoded" ) +type QueryWorkflowFailedRetryPolicy struct { + InitialIntervalSeconds int + MaximumAttempts int +} + type cadenceClient struct { - domain string - cClient client.Client - closeFunc func() - serviceClient workflowserviceclient.Interface - converter encoded.DataConverter + domain string + cClient client.Client + closeFunc func() + serviceClient workflowserviceclient.Interface + converter encoded.DataConverter + queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy } func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool { @@ -52,6 +58,12 @@ func (t *cadenceClient) IsNotFoundError(err error) bool { return ok } +func (t *cadenceClient) isQueryFailedError(err error) bool { + var serviceError *shared.QueryFailedError + ok := errors.As(err, &serviceError) + return ok +} + func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool { return realcadence.IsTimeoutError(err) } @@ -83,14 +95,15 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa func NewCadenceClient( domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, - converter encoded.DataConverter, closeFunc func(), + converter encoded.DataConverter, closeFunc func(), retryPolicy QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { return &cadenceClient{ - domain: domain, - cClient: cClient, - closeFunc: closeFunc, - serviceClient: serviceClient, - converter: converter, + domain: domain, + cClient: cClient, + closeFunc: closeFunc, + serviceClient: serviceClient, + converter: converter, + queryWorkflowFailedRetryPolicy: retryPolicy, } } @@ -227,9 +240,24 @@ func (t *cadenceClient) ListWorkflow( func (t *cadenceClient) QueryWorkflow( ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}, ) error { - qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args) - if err != nil { - return err + var qres encoded.Value + var err error + + attempt := 1 + for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + qres, err = queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args) + if err != nil { + if t.isQueryFailedError(err) { + if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + return err + } else { + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue + } + } + return err + } } return qres.Get(valuePtr) } diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index 9559638b..c96737cf 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -20,23 +20,31 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" realtemporal "go.temporal.io/sdk/temporal" + "time" ) +type QueryWorkflowFailedRetryPolicy struct { + InitialIntervalSeconds int + MaximumAttempts int +} + type temporalClient struct { - tClient client.Client - namespace string - dataConverter converter.DataConverter - memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 + tClient client.Client + namespace string + dataConverter converter.DataConverter + memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045 + queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy } func NewTemporalClient( - tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, + tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy QueryWorkflowFailedRetryPolicy, ) uclient.UnifiedClient { return &temporalClient{ - tClient: tClient, - namespace: namespace, - dataConverter: dataConverter, - memoEncryption: memoEncryption, + tClient: tClient, + namespace: namespace, + dataConverter: dataConverter, + memoEncryption: memoEncryption, + queryWorkflowFailedRetryPolicy: retryPolicy, } } @@ -71,6 +79,12 @@ func (t *temporalClient) IsNotFoundError(err error) bool { return ok } +func (t *temporalClient) isQueryFailedError(err error) bool { + var serviceError *serviceerror.QueryFailed + ok := errors.As(err, &serviceError) + return ok +} + func (t *temporalClient) IsRequestTimeoutError(err error) bool { var deadlineExceeded *serviceerror.DeadlineExceeded ok := errors.As(err, &deadlineExceeded) @@ -257,9 +271,24 @@ func (t *temporalClient) ListWorkflow( func (t *temporalClient) QueryWorkflow( ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}, ) error { - qres, err := t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) - if err != nil { - return err + var qres converter.EncodedValue + var err error + + attempt := 1 + for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...) + if err != nil { + if t.isQueryFailedError(err) { + if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts { + return err + } else { + time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second) + attempt++ + continue + } + } + return err + } } return qres.Get(valuePtr) }