Skip to content

Commit

Permalink
IWF-163: Add retry policy to QueryWorkflow calls
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolczynski committed Nov 22, 2024
1 parent cd87220 commit 7feffc2
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 37 deletions.
14 changes: 12 additions & 2 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false)
retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(),
MaximumAttempts: config.GetMaximumAttemptsWithDefault(),
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, retryPolicy)

for _, svcName := range services {
go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger)
Expand All @@ -146,7 +150,13 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Cadence because of error %v", err)
}
unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)

retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: config.GetInitialIntervalSecondsWithDefault(),
MaximumAttempts: config.GetMaximumAttemptsWithDefault(),
}

unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy)

for _, svcName := range services {
go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger)
Expand Down
22 changes: 22 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ type (
OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"`
// WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows
WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"`
QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"`
}

QueryWorkflowFailedRetryPolicy struct {
// defaults to 1
InitialIntervalSeconds int `yaml:"initialIntervalSeconds"`
// defaults to 5
MaximumAttempts int `yaml:"maximumAttempts"`
}

WaitForStateCompletionMigration struct {
Expand Down Expand Up @@ -147,3 +155,17 @@ func (c Config) GetWaitForOnWithDefault() string {
}
return "old"
}

func (c Config) GetInitialIntervalSecondsWithDefault() int {
if c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds != 0 {
return c.Api.QueryWorkflowFailedRetryPolicy.InitialIntervalSeconds
}
return 1
}

func (c Config) GetMaximumAttemptsWithDefault() int {
if c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts != 0 {
return c.Api.QueryWorkflowFailedRetryPolicy.MaximumAttempts
}
return 5
}
3 changes: 3 additions & 0 deletions config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
defaultWorkflowConfig:
continueAsNewThreshold: 100
Expand Down
3 changes: 3 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
temporal:
hostPort: localhost:7233
Expand Down
3 changes: 3 additions & 0 deletions config/development_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
# interpreterActivityConfig:
# disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
Expand Down
4 changes: 4 additions & 0 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
SignalWithStartOn: "old",
WaitForOn: "old",
},
QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 1,
MaximumAttempts: 5,
},
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
6 changes: 0 additions & 6 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 1)
timerInfos = service.GetCurrentTimerInfosQueryResponse{}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
Expand All @@ -139,15 +138,13 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
timer2.Status = service.TimerSkipped
assertTimerQueryResponseEqual(assertions, expectedTimerInfos, timerInfos)

time.Sleep(time.Second * 1)
httpResp, err = req3.WorkflowSkipTimerRequest(iwfidl.WorkflowSkipTimerRequest{
WorkflowId: wfId,
WorkflowStateExecutionId: "S1-1",
TimerCommandIndex: iwfidl.PtrInt32(2),
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 1)
timerInfos = service.GetCurrentTimerInfosQueryResponse{}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
Expand Down Expand Up @@ -183,9 +180,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
}).Execute()
panicAtHttpError(err, httpResp)

if config != nil {
time.Sleep(time.Second * 2)
}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
Expand Down
22 changes: 19 additions & 3 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,16 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
if err != nil {
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(config), uclient, logger)

testCfg := createTestConfig(config)

retryPolicy := temporalapi.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(),
MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(),
}

uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, retryPolicy)
iwfService := api.NewService(testCfg, uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand Down Expand Up @@ -144,7 +152,15 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
if err != nil {
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)

testCfg := createTestConfig(config)

retryPolicy := cadenceapi.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: testCfg.GetInitialIntervalSecondsWithDefault(),
MaximumAttempts: testCfg.GetMaximumAttemptsWithDefault(),
}

uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, retryPolicy)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Expand Down
56 changes: 42 additions & 14 deletions service/client/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ import (
"go.uber.org/cadence/encoded"
)

type QueryWorkflowFailedRetryPolicy struct {
InitialIntervalSeconds int
MaximumAttempts int
}

type cadenceClient struct {
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy
}

func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool {
Expand All @@ -52,6 +58,12 @@ func (t *cadenceClient) IsNotFoundError(err error) bool {
return ok
}

func (t *cadenceClient) isQueryFailedError(err error) bool {
var serviceError *shared.QueryFailedError
ok := errors.As(err, &serviceError)
return ok
}

func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool {
return realcadence.IsTimeoutError(err)
}
Expand Down Expand Up @@ -83,14 +95,15 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa

func NewCadenceClient(
domain string, cClient client.Client, serviceClient workflowserviceclient.Interface,
converter encoded.DataConverter, closeFunc func(),
converter encoded.DataConverter, closeFunc func(), retryPolicy QueryWorkflowFailedRetryPolicy,
) uclient.UnifiedClient {
return &cadenceClient{
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
queryWorkflowFailedRetryPolicy: retryPolicy,
}
}

Expand Down Expand Up @@ -227,9 +240,24 @@ func (t *cadenceClient) ListWorkflow(
func (t *cadenceClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args)
if err != nil {
return err
var qres encoded.Value
var err error

attempt := 1
for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
qres, err = queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args)
if err != nil {
if t.isQueryFailedError(err) {
if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
return err
} else {
time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second)
attempt++
continue
}
}
return err
}
}
return qres.Get(valuePtr)
}
Expand Down
53 changes: 41 additions & 12 deletions service/client/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,31 @@ import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
realtemporal "go.temporal.io/sdk/temporal"
"time"
)

type QueryWorkflowFailedRetryPolicy struct {
InitialIntervalSeconds int
MaximumAttempts int
}

type temporalClient struct {
tClient client.Client
namespace string
dataConverter converter.DataConverter
memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045
tClient client.Client
namespace string
dataConverter converter.DataConverter
memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045
queryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy
}

func NewTemporalClient(
tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool,
tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy QueryWorkflowFailedRetryPolicy,
) uclient.UnifiedClient {
return &temporalClient{
tClient: tClient,
namespace: namespace,
dataConverter: dataConverter,
memoEncryption: memoEncryption,
tClient: tClient,
namespace: namespace,
dataConverter: dataConverter,
memoEncryption: memoEncryption,
queryWorkflowFailedRetryPolicy: retryPolicy,
}
}

Expand Down Expand Up @@ -71,6 +79,12 @@ func (t *temporalClient) IsNotFoundError(err error) bool {
return ok
}

func (t *temporalClient) isQueryFailedError(err error) bool {
var serviceError *serviceerror.QueryFailed
ok := errors.As(err, &serviceError)
return ok
}

func (t *temporalClient) IsRequestTimeoutError(err error) bool {
var deadlineExceeded *serviceerror.DeadlineExceeded
ok := errors.As(err, &deadlineExceeded)
Expand Down Expand Up @@ -257,9 +271,24 @@ func (t *temporalClient) ListWorkflow(
func (t *temporalClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err != nil {
return err
var qres converter.EncodedValue
var err error

attempt := 1
for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err != nil {
if t.isQueryFailedError(err) {
if attempt == t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
return err
} else {
time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second)
attempt++
continue
}
}
return err
}
}
return qres.Get(valuePtr)
}
Expand Down

0 comments on commit 7feffc2

Please sign in to comment.