From e0c4caa19a7c6561699618f1c92f1540bc955cb6 Mon Sep 17 00:00:00 2001 From: lwolczynski <54366429+lwolczynski@users.noreply.github.com> Date: Mon, 30 Sep 2024 16:06:48 -0500 Subject: [PATCH] IWF-136: Support initial data attributes when starting workflow (#435) --- CONTRIBUTING.md | 2 +- gen/iwfidl/api/openapi.yaml | 24 ++++++++++- gen/iwfidl/docs/IDReusePolicy.md | 2 +- gen/iwfidl/docs/WorkflowStartOptions.md | 26 ++++++++++++ gen/iwfidl/model_id_reuse_policy.go | 4 +- gen/iwfidl/model_workflow_start_options.go | 36 ++++++++++++++++ integ/persistence_test.go | 48 +++++++++++++++++++++- iwf-idl | 2 +- service/api/service.go | 7 +++- service/client/interfaces.go | 1 + service/interfaces.go | 2 + service/interpreter/persistence.go | 8 +++- service/interpreter/workflowImpl.go | 8 +++- 13 files changed, 159 insertions(+), 11 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 67e503c6..48e0390e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -64,7 +64,7 @@ Or maybe both just for testing to ensure the code works for both Cadence and Tem ### Option 1: Run with our docker-compose file (Recommended) -Simply run `docker compose -f docker-compose/integ-dependencies.yml up -` will: +Simply run `docker compose -f docker-compose/integ-dependencies.yml up` will: * Start both Cadence & Temporal as dependencies * Set up required system search attributes diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index 9f15a792..f52b178f 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -672,6 +672,15 @@ components: workflowIDReusePolicy: null useMemoForDataAttributes: true cronSchedule: cronSchedule + dataAttributes: + - value: + data: data + encoding: encoding + key: key + - value: + data: data + encoding: encoding + key: key properties: workflowIDReusePolicy: $ref: '#/components/schemas/WorkflowIDReusePolicy' @@ -686,6 +695,10 @@ components: items: $ref: '#/components/schemas/SearchAttribute' type: array + dataAttributes: + items: + $ref: '#/components/schemas/KeyValue' + type: array workflowConfigOverride: $ref: '#/components/schemas/WorkflowConfig' idReusePolicy: @@ -702,7 +715,7 @@ components: type: string IDReusePolicy: enum: - - ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY # Keeping typo enum for backwards compatibility + - ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY - ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY - ALLOW_IF_NO_RUNNING - DISALLOW_REUSE @@ -891,6 +904,15 @@ components: workflowIDReusePolicy: null useMemoForDataAttributes: true cronSchedule: cronSchedule + dataAttributes: + - value: + data: data + encoding: encoding + key: key + - value: + data: data + encoding: encoding + key: key iwfWorkerUrl: iwfWorkerUrl workflowId: workflowId stateInput: diff --git a/gen/iwfidl/docs/IDReusePolicy.md b/gen/iwfidl/docs/IDReusePolicy.md index 26f4875b..2a88e3c2 100644 --- a/gen/iwfidl/docs/IDReusePolicy.md +++ b/gen/iwfidl/docs/IDReusePolicy.md @@ -2,7 +2,7 @@ ## Enum -# Keeping typo enum for backwards compatibility + * `ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY` (value: `"ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY"`) * `ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY` (value: `"ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY"`) diff --git a/gen/iwfidl/docs/WorkflowStartOptions.md b/gen/iwfidl/docs/WorkflowStartOptions.md index 5036fb74..396d179a 100644 --- a/gen/iwfidl/docs/WorkflowStartOptions.md +++ b/gen/iwfidl/docs/WorkflowStartOptions.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **WorkflowStartDelaySeconds** | Pointer to **int32** | | [optional] **RetryPolicy** | Pointer to [**WorkflowRetryPolicy**](WorkflowRetryPolicy.md) | | [optional] **SearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional] +**DataAttributes** | Pointer to [**[]KeyValue**](KeyValue.md) | | [optional] **WorkflowConfigOverride** | Pointer to [**WorkflowConfig**](WorkflowConfig.md) | | [optional] **IdReusePolicy** | Pointer to [**IDReusePolicy**](IDReusePolicy.md) | | [optional] **UseMemoForDataAttributes** | Pointer to **bool** | | [optional] @@ -157,6 +158,31 @@ SetSearchAttributes sets SearchAttributes field to given value. HasSearchAttributes returns a boolean if a field has been set. +### GetDataAttributes + +`func (o *WorkflowStartOptions) GetDataAttributes() []KeyValue` + +GetDataAttributes returns the DataAttributes field if non-nil, zero value otherwise. + +### GetDataAttributesOk + +`func (o *WorkflowStartOptions) GetDataAttributesOk() (*[]KeyValue, bool)` + +GetDataAttributesOk returns a tuple with the DataAttributes field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetDataAttributes + +`func (o *WorkflowStartOptions) SetDataAttributes(v []KeyValue)` + +SetDataAttributes sets DataAttributes field to given value. + +### HasDataAttributes + +`func (o *WorkflowStartOptions) HasDataAttributes() bool` + +HasDataAttributes returns a boolean if a field has been set. + ### GetWorkflowConfigOverride `func (o *WorkflowStartOptions) GetWorkflowConfigOverride() WorkflowConfig` diff --git a/gen/iwfidl/model_id_reuse_policy.go b/gen/iwfidl/model_id_reuse_policy.go index dbb23918..1a526c81 100644 --- a/gen/iwfidl/model_id_reuse_policy.go +++ b/gen/iwfidl/model_id_reuse_policy.go @@ -20,7 +20,7 @@ type IDReusePolicy string // List of IDReusePolicy const ( - ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY IDReusePolicy = "ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY" // Keeping typo enum for backwards compatibility + ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY IDReusePolicy = "ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY" ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY IDReusePolicy = "ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY" ALLOW_IF_NO_RUNNING IDReusePolicy = "ALLOW_IF_NO_RUNNING" DISALLOW_REUSE IDReusePolicy = "DISALLOW_REUSE" @@ -29,7 +29,7 @@ const ( // All allowed values of IDReusePolicy enum var AllowedIDReusePolicyEnumValues = []IDReusePolicy{ - "ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY", // Keeping typo enum for backwards compatibility + "ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY", "ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY", "ALLOW_IF_NO_RUNNING", "DISALLOW_REUSE", diff --git a/gen/iwfidl/model_workflow_start_options.go b/gen/iwfidl/model_workflow_start_options.go index 0f283da3..16cc45ec 100644 --- a/gen/iwfidl/model_workflow_start_options.go +++ b/gen/iwfidl/model_workflow_start_options.go @@ -24,6 +24,7 @@ type WorkflowStartOptions struct { WorkflowStartDelaySeconds *int32 `json:"workflowStartDelaySeconds,omitempty"` RetryPolicy *WorkflowRetryPolicy `json:"retryPolicy,omitempty"` SearchAttributes []SearchAttribute `json:"searchAttributes,omitempty"` + DataAttributes []KeyValue `json:"dataAttributes,omitempty"` WorkflowConfigOverride *WorkflowConfig `json:"workflowConfigOverride,omitempty"` IdReusePolicy *IDReusePolicy `json:"idReusePolicy,omitempty"` UseMemoForDataAttributes *bool `json:"useMemoForDataAttributes,omitempty"` @@ -206,6 +207,38 @@ func (o *WorkflowStartOptions) SetSearchAttributes(v []SearchAttribute) { o.SearchAttributes = v } +// GetDataAttributes returns the DataAttributes field value if set, zero value otherwise. +func (o *WorkflowStartOptions) GetDataAttributes() []KeyValue { + if o == nil || IsNil(o.DataAttributes) { + var ret []KeyValue + return ret + } + return o.DataAttributes +} + +// GetDataAttributesOk returns a tuple with the DataAttributes field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowStartOptions) GetDataAttributesOk() ([]KeyValue, bool) { + if o == nil || IsNil(o.DataAttributes) { + return nil, false + } + return o.DataAttributes, true +} + +// HasDataAttributes returns a boolean if a field has been set. +func (o *WorkflowStartOptions) HasDataAttributes() bool { + if o != nil && !IsNil(o.DataAttributes) { + return true + } + + return false +} + +// SetDataAttributes gets a reference to the given []KeyValue and assigns it to the DataAttributes field. +func (o *WorkflowStartOptions) SetDataAttributes(v []KeyValue) { + o.DataAttributes = v +} + // GetWorkflowConfigOverride returns the WorkflowConfigOverride field value if set, zero value otherwise. func (o *WorkflowStartOptions) GetWorkflowConfigOverride() WorkflowConfig { if o == nil || IsNil(o.WorkflowConfigOverride) { @@ -327,6 +360,9 @@ func (o WorkflowStartOptions) ToMap() (map[string]interface{}, error) { if !IsNil(o.SearchAttributes) { toSerialize["searchAttributes"] = o.SearchAttributes } + if !IsNil(o.DataAttributes) { + toSerialize["dataAttributes"] = o.DataAttributes + } if !IsNil(o.WorkflowConfigOverride) { toSerialize["workflowConfigOverride"] = o.WorkflowConfigOverride } diff --git a/integ/persistence_test.go b/integ/persistence_test.go index 1d4d3e16..e09e5b33 100644 --- a/integ/persistence_test.go +++ b/integ/persistence_test.go @@ -6,6 +6,7 @@ import ( "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/timeparser" "log" + "net/http" "strconv" "testing" "time" @@ -123,6 +124,13 @@ func doTestPersistenceWorkflow( nowTime := time.Now() notTimeNanoStr := fmt.Sprintf("%v", nowTime.UnixNano()) nowTimeStr := nowTime.Format(timeparser.DateTimeFormat) + expectedDataAttribute := iwfidl.KeyValue{ + Key: ptr.Any("TestKey"), + Value: &iwfidl.EncodedObject{ + Encoding: ptr.Any("TestEncoding"), + Data: ptr.Any("TestValue"), + }, + } expectedDatetimeSearchAttribute := iwfidl.SearchAttribute{ Key: iwfidl.PtrString("CustomDatetimeField"), ValueType: ptr.Any(iwfidl.DATETIME), @@ -148,6 +156,9 @@ func doTestPersistenceWorkflow( SearchAttributes: []iwfidl.SearchAttribute{ expectedDatetimeSearchAttribute, }, + DataAttributes: []iwfidl.KeyValue{ + expectedDataAttribute, + }, WorkflowConfigOverride: config, UseMemoForDataAttributes: ptr.Any(useMemo), }, @@ -155,6 +166,29 @@ func doTestPersistenceWorkflow( _, httpResp, err := reqStart.WorkflowStartRequest(wfReq).Execute() panicAtHttpError(err, httpResp) + initReqQry := apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(context.Background()) + + queryResult, httpResp, err := getDataAttributes(initReqQry, wfId, expectedDataAttribute, useMemo) + + retryCount := 0 + + // Config is only present for continueAsNew tests + if config != nil { + for { + if err == nil || retryCount >= 5 { + break + } + // Loading data to a continuedAsNew workflow might take a few seconds thus retry mechanism is needed + time.Sleep(time.Millisecond * 1000) + retryCount += 1 + queryResult, httpResp, err = getDataAttributes(initReqQry, wfId, expectedDataAttribute, useMemo) + } + } + + panicAtHttpError(err, httpResp) + + assert.Contains(t, queryResult.GetObjects(), expectedDataAttribute) + reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) wfResponse, httpResp, err := reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ WorkflowId: wfId, @@ -165,7 +199,7 @@ func doTestPersistenceWorkflow( queryResult1, httpResp, err := reqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{ WorkflowId: wfId, Keys: []string{ - persistence.TestDataObjectKey, + persistence.TestDataObjectKey, expectedDataAttribute.GetKey(), }, UseMemoForDataAttributes: ptr.Any(useMemo), }).Execute() @@ -232,6 +266,7 @@ func doTestPersistenceWorkflow( Key: iwfidl.PtrString(persistence.TestDataObjectKey), Value: &persistence.TestDataObjectVal2, }, + expectedDataAttribute, } expected2 := []iwfidl.KeyValue{ { @@ -242,6 +277,7 @@ func doTestPersistenceWorkflow( Key: iwfidl.PtrString(persistence.TestDataObjectKey2), Value: &persistence.TestDataObjectVal1, }, + expectedDataAttribute, } assertions.ElementsMatch(expected1, queryResult1.GetObjects()) assertions.ElementsMatch(expected2, queryResult2.GetObjects()) @@ -366,6 +402,16 @@ func doTestPersistenceWorkflow( } } +func getDataAttributes(initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue, useMemo bool) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) { + return initReqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{ + WorkflowId: wfId, + Keys: []string{ + persistence.TestDataObjectKey, expectedDataAttribute.GetKey(), + }, + UseMemoForDataAttributes: ptr.Any(useMemo), + }).Execute() +} + func assertSearch(query string, expectedCount int, apiClient *iwfidl.APIClient, assertions *assert.Assertions) { // search through all wfs using search API with pagination search := apiClient.DefaultApi.ApiV1WorkflowSearchPost(context.Background()) diff --git a/iwf-idl b/iwf-idl index 8b95a11d..fdd0d8ea 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit 8b95a11db8ba9e3899173d28db3f50ea7fe2f6d7 +Subproject commit fdd0d8eae1326489e567bf0073263a3f14bad04e diff --git a/service/api/service.go b/service/api/service.go index 8ca2b941..1463c7c3 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -81,8 +81,8 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( } else { workflowConfig = *s.config.Interpreter.DefaultWorkflowConfig } - var initCustomSAs []iwfidl.SearchAttribute + var initCustomDAs []iwfidl.KeyValue // workerUrl is always needed, for optimizing None as persistence loading type workflowOptions.Memo = map[string]interface{}{ service.WorkerUrlMemoKey: iwfidl.EncodedObject{ @@ -104,6 +104,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes) initCustomSAs = startOptions.SearchAttributes + initCustomDAs = startOptions.DataAttributes if startOptions.HasWorkflowConfigOverride() { workflowConfig = startOptions.GetWorkflowConfigOverride() } @@ -113,6 +114,9 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( // Note: the value is actually not too important, we will check the presence of the key only as today Data: iwfidl.PtrString("true"), } + for _, da := range initCustomDAs { + workflowOptions.Memo[da.GetKey()] = da.GetValue() + } } if startOptions.WorkflowStartDelaySeconds != nil { workflowOptions.WorkflowStartDelay = @@ -127,6 +131,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( StateInput: req.StateInput, StateOptions: req.StateOptions, InitSearchAttributes: initCustomSAs, + InitDataAttributes: initCustomDAs, Config: workflowConfig, UseMemoForDataAttributes: useMemo, WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(), diff --git a/service/client/interfaces.go b/service/client/interfaces.go index 90e84b3d..68743d1e 100644 --- a/service/client/interfaces.go +++ b/service/client/interfaces.go @@ -51,6 +51,7 @@ type StartWorkflowOptions struct { WorkflowIDReusePolicy *iwfidl.WorkflowIDReusePolicy CronSchedule *string RetryPolicy *iwfidl.WorkflowRetryPolicy + DataAttributes map[string]interface{} SearchAttributes map[string]interface{} Memo map[string]interface{} WorkflowStartDelay *time.Duration diff --git a/service/interfaces.go b/service/interfaces.go index 65acdd12..2053ebe6 100644 --- a/service/interfaces.go +++ b/service/interfaces.go @@ -21,6 +21,8 @@ type ( InitSearchAttributes []iwfidl.SearchAttribute `json:"initSearchAttributes,omitempty"` + InitDataAttributes []iwfidl.KeyValue `json:"initDataAttributes,omitempty"` + UseMemoForDataAttributes bool `json:"useMemoForDataAttributes,omitempty"` Config iwfidl.WorkflowConfig `json:"config,omitempty"` diff --git a/service/interpreter/persistence.go b/service/interpreter/persistence.go index db3b9aee..a9b61990 100644 --- a/service/interpreter/persistence.go +++ b/service/interpreter/persistence.go @@ -19,14 +19,18 @@ type PersistenceManager struct { } func NewPersistenceManager( - provider WorkflowProvider, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool, + provider WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { searchAttributes := make(map[string]iwfidl.SearchAttribute) for _, sa := range initSearchAttributes { searchAttributes[sa.GetKey()] = sa } + dataAttributes := make(map[string]iwfidl.KeyValue) + for _, da := range initDataAttributes { + dataAttributes[da.GetKey()] = da + } return &PersistenceManager{ - dataObjects: make(map[string]iwfidl.KeyValue), + dataObjects: dataAttributes, searchAttributes: searchAttributes, provider: provider, diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 0b590bbc..2a7323d6 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -82,7 +82,7 @@ func InterpreterImpl( } else { interStateChannel = NewInterStateChannel() stateRequestQueue = NewStateRequestQueue() - persistenceManager = NewPersistenceManager(provider, input.InitSearchAttributes, input.UseMemoForDataAttributes) + persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) timerProcessor = NewTimerProcessor(ctx, provider, nil) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) @@ -95,6 +95,10 @@ func InterpreterImpl( if err != nil { return nil, err } + // We intentionally set the query handler after the continueAsNew/dumpInternal activity. + // This is to ensure the correctness. If we set the query handler before that, + // the query handler could return empty data (since the loading hasn't completed), which will be incorrect response. + // We would rather return server errors and let the client retry later. err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo) if err != nil { return nil, err @@ -315,6 +319,8 @@ func InterpreterImpl( input.StateInput = nil input.StateOptions = nil input.StartStateId = nil + input.InitDataAttributes = nil + input.InitSearchAttributes = nil return nil, provider.NewInterpreterContinueAsNewError(ctx, input) } } // end main loop