Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-136: Support initial data attributes when starting workflow #435

Merged
merged 6 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Or maybe both just for testing to ensure the code works for both Cadence and Tem

### Option 1: Run with our docker-compose file (Recommended)

Simply run `docker compose -f docker-compose/integ-dependencies.yml up -` will:
Simply run `docker compose -f docker-compose/integ-dependencies.yml up` will:

* Start both Cadence & Temporal as dependencies
* Set up required system search attributes
Expand Down
24 changes: 23 additions & 1 deletion gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,15 @@ components:
workflowIDReusePolicy: null
useMemoForDataAttributes: true
cronSchedule: cronSchedule
dataAttributes:
- value:
data: data
encoding: encoding
key: key
- value:
data: data
encoding: encoding
key: key
properties:
workflowIDReusePolicy:
$ref: '#/components/schemas/WorkflowIDReusePolicy'
Expand All @@ -686,6 +695,10 @@ components:
items:
$ref: '#/components/schemas/SearchAttribute'
type: array
dataAttributes:
items:
$ref: '#/components/schemas/KeyValue'
type: array
workflowConfigOverride:
$ref: '#/components/schemas/WorkflowConfig'
idReusePolicy:
Expand All @@ -702,7 +715,7 @@ components:
type: string
IDReusePolicy:
enum:
- ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY # Keeping typo enum for backwards compatibility
- ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY
- ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY
- ALLOW_IF_NO_RUNNING
- DISALLOW_REUSE
Expand Down Expand Up @@ -891,6 +904,15 @@ components:
workflowIDReusePolicy: null
useMemoForDataAttributes: true
cronSchedule: cronSchedule
dataAttributes:
- value:
data: data
encoding: encoding
key: key
- value:
data: data
encoding: encoding
key: key
iwfWorkerUrl: iwfWorkerUrl
workflowId: workflowId
stateInput:
Expand Down
2 changes: 1 addition & 1 deletion gen/iwfidl/docs/IDReusePolicy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Enum

# Keeping typo enum for backwards compatibility

* `ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY` (value: `"ALLOW_IF_PREVIOUS_EXISTS_ABNORMALLY"`)

* `ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY` (value: `"ALLOW_IF_PREVIOUS_EXITS_ABNORMALLY"`)
Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/WorkflowStartOptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Name | Type | Description | Notes
**WorkflowStartDelaySeconds** | Pointer to **int32** | | [optional]
**RetryPolicy** | Pointer to [**WorkflowRetryPolicy**](WorkflowRetryPolicy.md) | | [optional]
**SearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional]
**DataAttributes** | Pointer to [**[]KeyValue**](KeyValue.md) | | [optional]
**WorkflowConfigOverride** | Pointer to [**WorkflowConfig**](WorkflowConfig.md) | | [optional]
**IdReusePolicy** | Pointer to [**IDReusePolicy**](IDReusePolicy.md) | | [optional]
**UseMemoForDataAttributes** | Pointer to **bool** | | [optional]
Expand Down Expand Up @@ -157,6 +158,31 @@ SetSearchAttributes sets SearchAttributes field to given value.

HasSearchAttributes returns a boolean if a field has been set.

### GetDataAttributes

`func (o *WorkflowStartOptions) GetDataAttributes() []KeyValue`

GetDataAttributes returns the DataAttributes field if non-nil, zero value otherwise.

### GetDataAttributesOk

`func (o *WorkflowStartOptions) GetDataAttributesOk() (*[]KeyValue, bool)`

GetDataAttributesOk returns a tuple with the DataAttributes field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetDataAttributes

`func (o *WorkflowStartOptions) SetDataAttributes(v []KeyValue)`

SetDataAttributes sets DataAttributes field to given value.

### HasDataAttributes

`func (o *WorkflowStartOptions) HasDataAttributes() bool`

HasDataAttributes returns a boolean if a field has been set.

### GetWorkflowConfigOverride

`func (o *WorkflowStartOptions) GetWorkflowConfigOverride() WorkflowConfig`
Expand Down
4 changes: 2 additions & 2 deletions gen/iwfidl/model_id_reuse_policy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions gen/iwfidl/model_workflow_start_options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 47 additions & 1 deletion integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/common/timeparser"
"log"
"net/http"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -123,6 +124,13 @@ func doTestPersistenceWorkflow(
nowTime := time.Now()
notTimeNanoStr := fmt.Sprintf("%v", nowTime.UnixNano())
nowTimeStr := nowTime.Format(timeparser.DateTimeFormat)
expectedDataAttribute := iwfidl.KeyValue{
Key: ptr.Any("TestKey"),
Value: &iwfidl.EncodedObject{
Encoding: ptr.Any("TestEncoding"),
Data: ptr.Any("TestValue"),
},
}
expectedDatetimeSearchAttribute := iwfidl.SearchAttribute{
Key: iwfidl.PtrString("CustomDatetimeField"),
ValueType: ptr.Any(iwfidl.DATETIME),
Expand All @@ -148,13 +156,39 @@ func doTestPersistenceWorkflow(
SearchAttributes: []iwfidl.SearchAttribute{
expectedDatetimeSearchAttribute,
},
DataAttributes: []iwfidl.KeyValue{
expectedDataAttribute,
},
WorkflowConfigOverride: config,
UseMemoForDataAttributes: ptr.Any(useMemo),
},
}
_, httpResp, err := reqStart.WorkflowStartRequest(wfReq).Execute()
panicAtHttpError(err, httpResp)

initReqQry := apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(context.Background())

queryResult, httpResp, err := getDataAttributes(initReqQry, wfId, expectedDataAttribute, useMemo)

retryCount := 0

// Config is only present for continueAsNew tests
if config != nil {
for {
if err == nil || retryCount >= 5 {
break
}
// Loading data to a continuedAsNew workflow might take a few seconds thus retry mechanism is needed
time.Sleep(time.Millisecond * 1000)
retryCount += 1
queryResult, httpResp, err = getDataAttributes(initReqQry, wfId, expectedDataAttribute, useMemo)
}
}
Comment on lines +175 to +186
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@longquanzheng what are your thoughts about this?

Copy link
Contributor

@longquanzheng longquanzheng Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good. I was thinking about the same way to workaround.

However, ideally we can do this in api service for everyone (that will saves all the sleeping in the tests).

But it's better to do in a separate PR (need to figure out checking error types, and backoff strategy and refactoring all the tests to remove the sleep).

Could you also raise another ticket for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!


panicAtHttpError(err, httpResp)

assert.Contains(t, queryResult.GetObjects(), expectedDataAttribute)

reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
wfResponse, httpResp, err := reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
Expand All @@ -165,7 +199,7 @@ func doTestPersistenceWorkflow(
queryResult1, httpResp, err := reqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
persistence.TestDataObjectKey,
persistence.TestDataObjectKey, expectedDataAttribute.GetKey(),
},
UseMemoForDataAttributes: ptr.Any(useMemo),
}).Execute()
Expand Down Expand Up @@ -232,6 +266,7 @@ func doTestPersistenceWorkflow(
Key: iwfidl.PtrString(persistence.TestDataObjectKey),
Value: &persistence.TestDataObjectVal2,
},
expectedDataAttribute,
}
expected2 := []iwfidl.KeyValue{
{
Expand All @@ -242,6 +277,7 @@ func doTestPersistenceWorkflow(
Key: iwfidl.PtrString(persistence.TestDataObjectKey2),
Value: &persistence.TestDataObjectVal1,
},
expectedDataAttribute,
}
assertions.ElementsMatch(expected1, queryResult1.GetObjects())
assertions.ElementsMatch(expected2, queryResult2.GetObjects())
Expand Down Expand Up @@ -366,6 +402,16 @@ func doTestPersistenceWorkflow(
}
}

func getDataAttributes(initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue, useMemo bool) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) {
return initReqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
persistence.TestDataObjectKey, expectedDataAttribute.GetKey(),
},
UseMemoForDataAttributes: ptr.Any(useMemo),
}).Execute()
}

func assertSearch(query string, expectedCount int, apiClient *iwfidl.APIClient, assertions *assert.Assertions) {
// search through all wfs using search API with pagination
search := apiClient.DefaultApi.ApiV1WorkflowSearchPost(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 2 files
+5 −1 iwf-sdk.yaml
+6 −1 iwf.yaml
7 changes: 6 additions & 1 deletion service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
} else {
workflowConfig = *s.config.Interpreter.DefaultWorkflowConfig
}

var initCustomSAs []iwfidl.SearchAttribute
var initCustomDAs []iwfidl.KeyValue
// workerUrl is always needed, for optimizing None as persistence loading type
workflowOptions.Memo = map[string]interface{}{
service.WorkerUrlMemoKey: iwfidl.EncodedObject{
Expand All @@ -104,6 +104,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes)

initCustomSAs = startOptions.SearchAttributes
initCustomDAs = startOptions.DataAttributes
if startOptions.HasWorkflowConfigOverride() {
workflowConfig = startOptions.GetWorkflowConfigOverride()
}
Expand All @@ -113,6 +114,9 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
// Note: the value is actually not too important, we will check the presence of the key only as today
Data: iwfidl.PtrString("true"),
}
for _, da := range initCustomDAs {
workflowOptions.Memo[da.GetKey()] = da.GetValue()
}
}
if startOptions.WorkflowStartDelaySeconds != nil {
workflowOptions.WorkflowStartDelay =
Expand All @@ -127,6 +131,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
StateInput: req.StateInput,
StateOptions: req.StateOptions,
InitSearchAttributes: initCustomSAs,
InitDataAttributes: initCustomDAs,
Config: workflowConfig,
UseMemoForDataAttributes: useMemo,
WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(),
Expand Down
1 change: 1 addition & 0 deletions service/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type StartWorkflowOptions struct {
WorkflowIDReusePolicy *iwfidl.WorkflowIDReusePolicy
CronSchedule *string
RetryPolicy *iwfidl.WorkflowRetryPolicy
DataAttributes map[string]interface{}
SearchAttributes map[string]interface{}
Memo map[string]interface{}
WorkflowStartDelay *time.Duration
Expand Down
2 changes: 2 additions & 0 deletions service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type (

InitSearchAttributes []iwfidl.SearchAttribute `json:"initSearchAttributes,omitempty"`

InitDataAttributes []iwfidl.KeyValue `json:"initDataAttributes,omitempty"`

UseMemoForDataAttributes bool `json:"useMemoForDataAttributes,omitempty"`

Config iwfidl.WorkflowConfig `json:"config,omitempty"`
Expand Down
8 changes: 6 additions & 2 deletions service/interpreter/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ type PersistenceManager struct {
}

func NewPersistenceManager(
provider WorkflowProvider, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool,
provider WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool,
) *PersistenceManager {
searchAttributes := make(map[string]iwfidl.SearchAttribute)
for _, sa := range initSearchAttributes {
searchAttributes[sa.GetKey()] = sa
}
dataAttributes := make(map[string]iwfidl.KeyValue)
for _, da := range initDataAttributes {
dataAttributes[da.GetKey()] = da
}
return &PersistenceManager{
dataObjects: make(map[string]iwfidl.KeyValue),
dataObjects: dataAttributes,
searchAttributes: searchAttributes,
provider: provider,

Expand Down
8 changes: 7 additions & 1 deletion service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func InterpreterImpl(
} else {
interStateChannel = NewInterStateChannel()
stateRequestQueue = NewStateRequestQueue()
persistenceManager = NewPersistenceManager(provider, input.InitSearchAttributes, input.UseMemoForDataAttributes)
persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes)
timerProcessor = NewTimerProcessor(ctx, provider, nil)
continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider)
signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil)
Expand All @@ -95,6 +95,10 @@ func InterpreterImpl(
if err != nil {
return nil, err
}
// We intentionally set the query handler after the continueAsNew/dumpInternal activity.
// This is to ensure the correctness. If we set the query handler before that,
// the query handler could return empty data (since the loading hasn't completed), which will be incorrect response.
// We would rather return server errors and let the client retry later.
err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo)
if err != nil {
return nil, err
Expand Down Expand Up @@ -315,6 +319,8 @@ func InterpreterImpl(
input.StateInput = nil
input.StateOptions = nil
input.StartStateId = nil
input.InitDataAttributes = nil
input.InitSearchAttributes = nil
return nil, provider.NewInterpreterContinueAsNewError(ctx, input)
}
} // end main loop
Expand Down
Loading