diff --git a/gen/iwfidl/.openapi-generator/FILES b/gen/iwfidl/.openapi-generator/FILES index b9b3e948..bdb7f6f5 100644 --- a/gen/iwfidl/.openapi-generator/FILES +++ b/gen/iwfidl/.openapi-generator/FILES @@ -6,6 +6,7 @@ api/openapi.yaml api_default.go client.go configuration.go +docs/ChannelInfo.md docs/ChannelRequestStatus.md docs/CommandCombination.md docs/CommandRequest.md @@ -91,6 +92,7 @@ docs/WorkflowWorkerRpcResponse.md git_push.sh go.mod go.sum +model_channel_info.go model_channel_request_status.go model_command_combination.go model_command_request.go diff --git a/gen/iwfidl/README.md b/gen/iwfidl/README.md index 77482109..a5cdee03 100644 --- a/gen/iwfidl/README.md +++ b/gen/iwfidl/README.md @@ -102,6 +102,7 @@ Class | Method | HTTP request | Description ## Documentation For Models + - [ChannelInfo](docs/ChannelInfo.md) - [ChannelRequestStatus](docs/ChannelRequestStatus.md) - [CommandCombination](docs/CommandCombination.md) - [CommandRequest](docs/CommandRequest.md) diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index 78b65e99..613fea62 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -1686,9 +1686,15 @@ components: type: object WorkflowWorkerRpcRequest: example: + internalChannelInfos: + key: + size: 0 input: data: data encoding: encoding + signalChannelInfos: + key: + size: 0 context: firstAttemptTimestamp: 6 workflowStartedTimestamp: 0 @@ -1743,11 +1749,26 @@ components: items: $ref: '#/components/schemas/KeyValue' type: array + signalChannelInfos: + additionalProperties: + $ref: '#/components/schemas/ChannelInfo' + type: object + internalChannelInfos: + additionalProperties: + $ref: '#/components/schemas/ChannelInfo' + type: object required: - context - rpcName - workflowType type: object + ChannelInfo: + example: + size: 0 + properties: + size: + type: integer + type: object WorkflowWorkerRpcResponse: example: output: diff --git a/gen/iwfidl/docs/ChannelInfo.md b/gen/iwfidl/docs/ChannelInfo.md new file mode 100644 index 00000000..c85b231c --- /dev/null +++ b/gen/iwfidl/docs/ChannelInfo.md @@ -0,0 +1,56 @@ +# ChannelInfo + +## Properties + +Name | Type | Description | Notes +------------ | ------------- | ------------- | ------------- +**Size** | Pointer to **int32** | | [optional] + +## Methods + +### NewChannelInfo + +`func NewChannelInfo() *ChannelInfo` + +NewChannelInfo instantiates a new ChannelInfo object +This constructor will assign default values to properties that have it defined, +and makes sure properties required by API are set, but the set of arguments +will change when the set of required properties is changed + +### NewChannelInfoWithDefaults + +`func NewChannelInfoWithDefaults() *ChannelInfo` + +NewChannelInfoWithDefaults instantiates a new ChannelInfo object +This constructor will only assign default values to properties that have it defined, +but it doesn't guarantee that properties required by API are set + +### GetSize + +`func (o *ChannelInfo) GetSize() int32` + +GetSize returns the Size field if non-nil, zero value otherwise. + +### GetSizeOk + +`func (o *ChannelInfo) GetSizeOk() (*int32, bool)` + +GetSizeOk returns a tuple with the Size field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetSize + +`func (o *ChannelInfo) SetSize(v int32)` + +SetSize sets Size field to given value. + +### HasSize + +`func (o *ChannelInfo) HasSize() bool` + +HasSize returns a boolean if a field has been set. + + +[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) + + diff --git a/gen/iwfidl/docs/WorkflowWorkerRpcRequest.md b/gen/iwfidl/docs/WorkflowWorkerRpcRequest.md index a239cef1..a72be553 100644 --- a/gen/iwfidl/docs/WorkflowWorkerRpcRequest.md +++ b/gen/iwfidl/docs/WorkflowWorkerRpcRequest.md @@ -10,6 +10,8 @@ Name | Type | Description | Notes **Input** | Pointer to [**EncodedObject**](EncodedObject.md) | | [optional] **SearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional] **DataAttributes** | Pointer to [**[]KeyValue**](KeyValue.md) | | [optional] +**SignalChannelInfos** | Pointer to [**map[string]ChannelInfo**](ChannelInfo.md) | | [optional] +**InternalChannelInfos** | Pointer to [**map[string]ChannelInfo**](ChannelInfo.md) | | [optional] ## Methods @@ -165,6 +167,56 @@ SetDataAttributes sets DataAttributes field to given value. HasDataAttributes returns a boolean if a field has been set. +### GetSignalChannelInfos + +`func (o *WorkflowWorkerRpcRequest) GetSignalChannelInfos() map[string]ChannelInfo` + +GetSignalChannelInfos returns the SignalChannelInfos field if non-nil, zero value otherwise. + +### GetSignalChannelInfosOk + +`func (o *WorkflowWorkerRpcRequest) GetSignalChannelInfosOk() (*map[string]ChannelInfo, bool)` + +GetSignalChannelInfosOk returns a tuple with the SignalChannelInfos field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetSignalChannelInfos + +`func (o *WorkflowWorkerRpcRequest) SetSignalChannelInfos(v map[string]ChannelInfo)` + +SetSignalChannelInfos sets SignalChannelInfos field to given value. + +### HasSignalChannelInfos + +`func (o *WorkflowWorkerRpcRequest) HasSignalChannelInfos() bool` + +HasSignalChannelInfos returns a boolean if a field has been set. + +### GetInternalChannelInfos + +`func (o *WorkflowWorkerRpcRequest) GetInternalChannelInfos() map[string]ChannelInfo` + +GetInternalChannelInfos returns the InternalChannelInfos field if non-nil, zero value otherwise. + +### GetInternalChannelInfosOk + +`func (o *WorkflowWorkerRpcRequest) GetInternalChannelInfosOk() (*map[string]ChannelInfo, bool)` + +GetInternalChannelInfosOk returns a tuple with the InternalChannelInfos field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetInternalChannelInfos + +`func (o *WorkflowWorkerRpcRequest) SetInternalChannelInfos(v map[string]ChannelInfo)` + +SetInternalChannelInfos sets InternalChannelInfos field to given value. + +### HasInternalChannelInfos + +`func (o *WorkflowWorkerRpcRequest) HasInternalChannelInfos() bool` + +HasInternalChannelInfos returns a boolean if a field has been set. + [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/gen/iwfidl/model_channel_info.go b/gen/iwfidl/model_channel_info.go new file mode 100644 index 00000000..81f3cfa1 --- /dev/null +++ b/gen/iwfidl/model_channel_info.go @@ -0,0 +1,124 @@ +/* +Workflow APIs + +This APIs for iwf SDKs to operate workflows + +API version: 1.0.0 +*/ + +// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT. + +package iwfidl + +import ( + "encoding/json" +) + +// checks if the ChannelInfo type satisfies the MappedNullable interface at compile time +var _ MappedNullable = &ChannelInfo{} + +// ChannelInfo struct for ChannelInfo +type ChannelInfo struct { + Size *int32 `json:"size,omitempty"` +} + +// NewChannelInfo instantiates a new ChannelInfo object +// This constructor will assign default values to properties that have it defined, +// and makes sure properties required by API are set, but the set of arguments +// will change when the set of required properties is changed +func NewChannelInfo() *ChannelInfo { + this := ChannelInfo{} + return &this +} + +// NewChannelInfoWithDefaults instantiates a new ChannelInfo object +// This constructor will only assign default values to properties that have it defined, +// but it doesn't guarantee that properties required by API are set +func NewChannelInfoWithDefaults() *ChannelInfo { + this := ChannelInfo{} + return &this +} + +// GetSize returns the Size field value if set, zero value otherwise. +func (o *ChannelInfo) GetSize() int32 { + if o == nil || IsNil(o.Size) { + var ret int32 + return ret + } + return *o.Size +} + +// GetSizeOk returns a tuple with the Size field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *ChannelInfo) GetSizeOk() (*int32, bool) { + if o == nil || IsNil(o.Size) { + return nil, false + } + return o.Size, true +} + +// HasSize returns a boolean if a field has been set. +func (o *ChannelInfo) HasSize() bool { + if o != nil && !IsNil(o.Size) { + return true + } + + return false +} + +// SetSize gets a reference to the given int32 and assigns it to the Size field. +func (o *ChannelInfo) SetSize(v int32) { + o.Size = &v +} + +func (o ChannelInfo) MarshalJSON() ([]byte, error) { + toSerialize, err := o.ToMap() + if err != nil { + return []byte{}, err + } + return json.Marshal(toSerialize) +} + +func (o ChannelInfo) ToMap() (map[string]interface{}, error) { + toSerialize := map[string]interface{}{} + if !IsNil(o.Size) { + toSerialize["size"] = o.Size + } + return toSerialize, nil +} + +type NullableChannelInfo struct { + value *ChannelInfo + isSet bool +} + +func (v NullableChannelInfo) Get() *ChannelInfo { + return v.value +} + +func (v *NullableChannelInfo) Set(val *ChannelInfo) { + v.value = val + v.isSet = true +} + +func (v NullableChannelInfo) IsSet() bool { + return v.isSet +} + +func (v *NullableChannelInfo) Unset() { + v.value = nil + v.isSet = false +} + +func NewNullableChannelInfo(val *ChannelInfo) *NullableChannelInfo { + return &NullableChannelInfo{value: val, isSet: true} +} + +func (v NullableChannelInfo) MarshalJSON() ([]byte, error) { + return json.Marshal(v.value) +} + +func (v *NullableChannelInfo) UnmarshalJSON(src []byte) error { + v.isSet = true + return json.Unmarshal(src, &v.value) +} diff --git a/gen/iwfidl/model_workflow_worker_rpc_request.go b/gen/iwfidl/model_workflow_worker_rpc_request.go index cf0ac609..e300397f 100644 --- a/gen/iwfidl/model_workflow_worker_rpc_request.go +++ b/gen/iwfidl/model_workflow_worker_rpc_request.go @@ -19,12 +19,14 @@ var _ MappedNullable = &WorkflowWorkerRpcRequest{} // WorkflowWorkerRpcRequest struct for WorkflowWorkerRpcRequest type WorkflowWorkerRpcRequest struct { - Context Context `json:"context"` - WorkflowType string `json:"workflowType"` - RpcName string `json:"rpcName"` - Input *EncodedObject `json:"input,omitempty"` - SearchAttributes []SearchAttribute `json:"searchAttributes,omitempty"` - DataAttributes []KeyValue `json:"dataAttributes,omitempty"` + Context Context `json:"context"` + WorkflowType string `json:"workflowType"` + RpcName string `json:"rpcName"` + Input *EncodedObject `json:"input,omitempty"` + SearchAttributes []SearchAttribute `json:"searchAttributes,omitempty"` + DataAttributes []KeyValue `json:"dataAttributes,omitempty"` + SignalChannelInfos *map[string]ChannelInfo `json:"signalChannelInfos,omitempty"` + InternalChannelInfos *map[string]ChannelInfo `json:"internalChannelInfos,omitempty"` } // NewWorkflowWorkerRpcRequest instantiates a new WorkflowWorkerRpcRequest object @@ -215,6 +217,70 @@ func (o *WorkflowWorkerRpcRequest) SetDataAttributes(v []KeyValue) { o.DataAttributes = v } +// GetSignalChannelInfos returns the SignalChannelInfos field value if set, zero value otherwise. +func (o *WorkflowWorkerRpcRequest) GetSignalChannelInfos() map[string]ChannelInfo { + if o == nil || IsNil(o.SignalChannelInfos) { + var ret map[string]ChannelInfo + return ret + } + return *o.SignalChannelInfos +} + +// GetSignalChannelInfosOk returns a tuple with the SignalChannelInfos field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowWorkerRpcRequest) GetSignalChannelInfosOk() (*map[string]ChannelInfo, bool) { + if o == nil || IsNil(o.SignalChannelInfos) { + return nil, false + } + return o.SignalChannelInfos, true +} + +// HasSignalChannelInfos returns a boolean if a field has been set. +func (o *WorkflowWorkerRpcRequest) HasSignalChannelInfos() bool { + if o != nil && !IsNil(o.SignalChannelInfos) { + return true + } + + return false +} + +// SetSignalChannelInfos gets a reference to the given map[string]ChannelInfo and assigns it to the SignalChannelInfos field. +func (o *WorkflowWorkerRpcRequest) SetSignalChannelInfos(v map[string]ChannelInfo) { + o.SignalChannelInfos = &v +} + +// GetInternalChannelInfos returns the InternalChannelInfos field value if set, zero value otherwise. +func (o *WorkflowWorkerRpcRequest) GetInternalChannelInfos() map[string]ChannelInfo { + if o == nil || IsNil(o.InternalChannelInfos) { + var ret map[string]ChannelInfo + return ret + } + return *o.InternalChannelInfos +} + +// GetInternalChannelInfosOk returns a tuple with the InternalChannelInfos field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowWorkerRpcRequest) GetInternalChannelInfosOk() (*map[string]ChannelInfo, bool) { + if o == nil || IsNil(o.InternalChannelInfos) { + return nil, false + } + return o.InternalChannelInfos, true +} + +// HasInternalChannelInfos returns a boolean if a field has been set. +func (o *WorkflowWorkerRpcRequest) HasInternalChannelInfos() bool { + if o != nil && !IsNil(o.InternalChannelInfos) { + return true + } + + return false +} + +// SetInternalChannelInfos gets a reference to the given map[string]ChannelInfo and assigns it to the InternalChannelInfos field. +func (o *WorkflowWorkerRpcRequest) SetInternalChannelInfos(v map[string]ChannelInfo) { + o.InternalChannelInfos = &v +} + func (o WorkflowWorkerRpcRequest) MarshalJSON() ([]byte, error) { toSerialize, err := o.ToMap() if err != nil { @@ -237,6 +303,12 @@ func (o WorkflowWorkerRpcRequest) ToMap() (map[string]interface{}, error) { if !IsNil(o.DataAttributes) { toSerialize["dataAttributes"] = o.DataAttributes } + if !IsNil(o.SignalChannelInfos) { + toSerialize["signalChannelInfos"] = o.SignalChannelInfos + } + if !IsNil(o.InternalChannelInfos) { + toSerialize["internalChannelInfos"] = o.InternalChannelInfos + } return toSerialize, nil } diff --git a/integ/locking_test.go b/integ/locking_test.go index 72f490e2..aeba50e8 100644 --- a/integ/locking_test.go +++ b/integ/locking_test.go @@ -93,6 +93,17 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config _, httpResp, err := req.WorkflowStartRequest(startReq).Execute() panicAtHttpError(err, httpResp) + for i := 0; i < locking.NumUnusedSignals; i++ { + // send 4 unused signals at the beginning to validate the ChannelInfo feature + reqSignal := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background()) + httpResp, err = reqSignal.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ + WorkflowId: wfId, + SignalChannelName: locking.UnusedSignalChannelName, + SignalValue: nil, + }).Execute() + panicAtHttpError(err, httpResp) + } + assertions := assert.New(t) if config != nil && backendType == service.BackendTypeTemporal { diff --git a/integ/no_workflow_id_test.go b/integ/no_workflow_id_test.go new file mode 100644 index 00000000..b2c53918 --- /dev/null +++ b/integ/no_workflow_id_test.go @@ -0,0 +1,48 @@ +package integ + +import ( + "context" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/integ/workflow/signal" + "github.com/indeedeng/iwf/service" + "github.com/stretchr/testify/assert" + "log" + "net/http" + "testing" +) + +func TestSignalWorkflowNoWorkflowId(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + assertions := assert.New(t) + _, closeFunc2 := startIwfServiceWithClient(service.BackendTypeTemporal) + defer closeFunc2() + + // start a workflow + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + req := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background()) + httpResp, err := req.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ + WorkflowId: "", + SignalChannelName: signal.SignalName, + }).Execute() + + assertions.Equal(httpResp.StatusCode, http.StatusBadRequest) + + apiErr, ok := err.(*iwfidl.GenericOpenAPIError) + if !ok { + log.Fatalf("Should fail to invoke get api %v", err) + } + errResp, ok := apiErr.Model().(iwfidl.ErrorResponse) + if !ok { + log.Fatalf("should be error response") + } + assertions.Equal(iwfidl.WORKFLOW_NOT_EXISTS_SUB_STATUS, errResp.GetSubStatus()) + assertions.Equal("WorkflowId is not set on request.", errResp.GetDetail()) +} diff --git a/integ/signal_test.go b/integ/signal_test.go index d9cb48cf..60f4641b 100644 --- a/integ/signal_test.go +++ b/integ/signal_test.go @@ -2,6 +2,7 @@ package integ import ( "context" + "encoding/json" "fmt" config2 "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/gen/iwfidl" @@ -9,7 +10,6 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/ptr" "github.com/stretchr/testify/assert" - "log" "net/http" "strconv" "testing" @@ -23,6 +23,14 @@ func TestSignalWorkflowTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestSignalWorkflow(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() + } +} + +func TestSignalWorkflowTemporalContinueAsNew(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { doTestSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } @@ -35,45 +43,17 @@ func TestSignalWorkflowCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestSignalWorkflow(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) - smallWaitForFastTest() } } -func TestSignalWorkflowNoWorkflowId(t *testing.T) { - if !*temporalIntegTest { +func TestSignalWorkflowCadenceContinueAsNew(t *testing.T) { + if !*cadenceIntegTest { t.Skip() } - assertions := assert.New(t) - _, closeFunc2 := startIwfServiceWithClient(service.BackendTypeTemporal) - defer closeFunc2() - - // start a workflow - apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ - Servers: []iwfidl.ServerConfiguration{ - { - URL: "http://localhost:" + testIwfServerPort, - }, - }, - }) - req := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background()) - httpResp, err := req.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{ - WorkflowId: "", - SignalChannelName: signal.SignalName, - }).Execute() - - assertions.Equal(httpResp.StatusCode, http.StatusBadRequest) - - apiErr, ok := err.(*iwfidl.GenericOpenAPIError) - if !ok { - log.Fatalf("Should fail to invoke get api %v", err) - } - errResp, ok := apiErr.Model().(iwfidl.ErrorResponse) - if !ok { - log.Fatalf("should be error response") + for i := 0; i < *repeatIntegTest; i++ { + doTestSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) + smallWaitForFastTest() } - assertions.Equal(iwfidl.WORKFLOW_NOT_EXISTS_SUB_STATUS, errResp.GetSubStatus()) - assertions.Equal("WorkflowId is not set on request.", errResp.GetDetail()) } func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { @@ -81,7 +61,7 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config // start test workflow server wfHandler := signal.NewHandler() - closeFunc1 := startWorkflowWorker(wfHandler) + closeFunc1 := startWorkflowWorkerWithRpc(wfHandler) defer closeFunc1() uclient, closeFunc2 := startIwfServiceWithClient(backendType) @@ -100,7 +80,7 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config _, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{ WorkflowId: wfId, IwfWorkflowType: signal.WorkflowType, - WorkflowTimeoutSeconds: 10, + WorkflowTimeoutSeconds: 20, IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, StartStateId: ptr.Any(signal.State1), WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ @@ -178,8 +158,31 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config // see why in https://github.com/temporalio/temporal/issues/4801 unhandledSignalVals = append(unhandledSignalVals, sigVal) } + reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background()) + rpcResp, httpResp2, err2 := reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{ + WorkflowId: wfId, + RpcName: signal.RPCNameGetSignalChannelInfo, + }).Execute() + panicAtHttpError(err2, httpResp2) + var infos map[string]iwfidl.ChannelInfo + err = json.Unmarshal([]byte(rpcResp.Output.GetData()), &infos) + panicAtError(err) + assertions.Equal( + map[string]iwfidl.ChannelInfo{signal.UnhandledSignalName: {Size: ptr.Any(int32(i + 1))}}, infos) } + reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background()) + rpcResp, httpResp2, err2 := reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{ + WorkflowId: wfId, + RpcName: signal.RPCNameGetInternalChannelInfo, + }).Execute() + panicAtHttpError(err2, httpResp2) + var infos map[string]iwfidl.ChannelInfo + err = json.Unmarshal([]byte(rpcResp.Output.GetData()), &infos) + panicAtError(err) + assertions.Equal( + map[string]iwfidl.ChannelInfo{signal.InternalChannelName: {Size: ptr.Any(int32(10))}}, infos) + // signal the workflow var signalVals []iwfidl.EncodedObject for i := 0; i < 4; i++ { diff --git a/integ/util.go b/integ/util.go index 01d4e997..6fdef79e 100644 --- a/integ/util.go +++ b/integ/util.go @@ -178,6 +178,12 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U } } +func panicAtError(err error) { + if err != nil { + panic(err) + } +} + func panicAtHttpError(err error, httpResp *http.Response) { if err != nil { panic(err) diff --git a/integ/workflow/locking/routers.go b/integ/workflow/locking/routers.go index 11ba3880..16703643 100644 --- a/integ/workflow/locking/routers.go +++ b/integ/workflow/locking/routers.go @@ -28,6 +28,11 @@ const ( ShouldUnblockStateWaiting = "shouldUnblockStateWaiting" InParallelS2 = 10 + + NumUnusedSignals = 4 + + UnusedSignalChannelName = "test-unused-signal-channel" + UnusedInternalChannelName = "test-unused-internal-channel" ) var TestValue = &iwfidl.EncodedObject{ @@ -71,6 +76,7 @@ var state2Movement = iwfidl.StateMovement{ type handler struct { invokeHistory map[string]int64 invokeData map[string]interface{} + rpcInvokes int32 } func NewHandler() common.WorkflowHandlerWithRpc { @@ -107,6 +113,20 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) { }) return } + + signalChannelInfo := (*req.SignalChannelInfos)[UnusedSignalChannelName] + if signalChannelInfo.GetSize() != NumUnusedSignals { + // the 4 messages are sent from the beginning of "locking_test" + panic("incorrect signal channel size") + } + if h.rpcInvokes > 0 { + internalChannelInfo := (*req.InternalChannelInfos)[UnusedInternalChannelName] + if h.rpcInvokes != internalChannelInfo.GetSize() { + panic("incorrect internal channel size") + } + } + h.rpcInvokes++ + // this RPC will increase both SA and DA time.Sleep(time.Millisecond) @@ -178,6 +198,12 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) { Value: TestValue, }, }, + PublishToInterStateChannel: []iwfidl.InterStateChannelPublishing{ + { + ChannelName: UnusedInternalChannelName, + Value: TestValue, + }, + }, } c.JSON(http.StatusOK, response) diff --git a/integ/workflow/signal/routers.go b/integ/workflow/signal/routers.go index 9f082ecf..500d323b 100644 --- a/integ/workflow/signal/routers.go +++ b/integ/workflow/signal/routers.go @@ -1,6 +1,7 @@ package signal import ( + "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/indeedeng/iwf/gen/iwfidl" @@ -12,11 +13,14 @@ import ( ) const ( - WorkflowType = "signal" - State1 = "S1" - State2 = "S2" - SignalName = "test-signal-name" - UnhandledSignalName = "test-unhandled-signal-name" + WorkflowType = "signal" + State1 = "S1" + State2 = "S2" + SignalName = "test-signal-name" + InternalChannelName = "test-internal-channel-name" + UnhandledSignalName = "test-unhandled-signal-name" + RPCNameGetSignalChannelInfo = "RPCNameGetSignalChannelInfo" + RPCNameGetInternalChannelInfo = "RPCNameGetInternalChannelInfo" ) type handler struct { @@ -24,7 +28,48 @@ type handler struct { invokeData map[string]interface{} } -func NewHandler() common.WorkflowHandler { +func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) { + var req iwfidl.WorkflowWorkerRpcRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if req.RpcName == RPCNameGetSignalChannelInfo { + signalInfos := req.SignalChannelInfos + data, err := json.Marshal(signalInfos) + if err != nil { + panic(err) + } + c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{ + PublishToInterStateChannel: []iwfidl.InterStateChannelPublishing{ + { + ChannelName: InternalChannelName, + }, + }, + Output: &iwfidl.EncodedObject{ + Data: ptr.Any(string(data)), + }, + }) + return + } + if req.RpcName == RPCNameGetInternalChannelInfo { + icInfos := req.InternalChannelInfos + data, err := json.Marshal(icInfos) + if err != nil { + panic(err) + } + c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{ + Output: &iwfidl.EncodedObject{ + Data: ptr.Any(string(data)), + }, + }) + return + } + c.JSON(http.StatusBadRequest, gin.H{}) + return +} + +func NewHandler() common.WorkflowHandlerWithRpc { return &handler{ invokeHistory: make(map[string]int64), invokeData: make(map[string]interface{}), diff --git a/iwf-idl b/iwf-idl index a6fbd8fe..e5bc6236 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit a6fbd8feb427fdf77b7ddc0f7c9d7eb154c385e7 +Subproject commit e5bc6236d36b977c2fec85c2bf5e0742f859e04e diff --git a/service/common/rpc/invoke.go b/service/common/rpc/invoke.go index 526a7b98..e4b15ddd 100644 --- a/service/common/rpc/invoke.go +++ b/service/common/rpc/invoke.go @@ -13,7 +13,9 @@ import ( "net/http" ) -func InvokeWorkerRpc(ctx context.Context, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest, apiMaxSeconds int64) (*iwfidl.WorkflowWorkerRpcResponse, *errors.ErrorAndStatus) { +func InvokeWorkerRpc( + ctx context.Context, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest, apiMaxSeconds int64, +) (*iwfidl.WorkflowWorkerRpcResponse, *errors.ErrorAndStatus) { iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(rpcPrep.IwfWorkerUrl) // invoke worker rpc apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ @@ -33,11 +35,13 @@ func InvokeWorkerRpc(ctx context.Context, rpcPrep *service.PrepareRpcQueryRespon WorkflowRunId: rpcPrep.WorkflowRunId, WorkflowStartedTimestamp: rpcPrep.WorkflowStartedTimestamp, }, - WorkflowType: rpcPrep.IwfWorkflowType, - RpcName: req.RpcName, - Input: req.Input, - SearchAttributes: rpcPrep.SearchAttributes, - DataAttributes: rpcPrep.DataObjects, + WorkflowType: rpcPrep.IwfWorkflowType, + RpcName: req.RpcName, + Input: req.Input, + SearchAttributes: rpcPrep.SearchAttributes, + DataAttributes: rpcPrep.DataObjects, + SignalChannelInfos: &rpcPrep.SignalChannelInfo, + InternalChannelInfos: &rpcPrep.InternalChannelInfo, } resp, httpResp, err := workerReq.WorkflowWorkerRpcRequest(workerRequest).Execute() if utils.CheckHttpError(err, httpResp) { diff --git a/service/interfaces.go b/service/interfaces.go index 03b4afdc..6a975001 100644 --- a/service/interfaces.go +++ b/service/interfaces.go @@ -83,6 +83,8 @@ type ( WorkflowStartedTimestamp int64 IwfWorkflowType string IwfWorkerUrl string + SignalChannelInfo map[string]iwfidl.ChannelInfo + InternalChannelInfo map[string]iwfidl.ChannelInfo } ExecuteRpcSignalRequest struct { diff --git a/service/interpreter/InternalChannel.go b/service/interpreter/InternalChannel.go index 7737f455..77788d4c 100644 --- a/service/interpreter/InternalChannel.go +++ b/service/interpreter/InternalChannel.go @@ -1,6 +1,9 @@ package interpreter -import "github.com/indeedeng/iwf/gen/iwfidl" +import ( + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service/common/ptr" +) type InternalChannel struct { // key is channel name @@ -23,6 +26,16 @@ func (i *InternalChannel) GetAllReceived() map[string][]*iwfidl.EncodedObject { return i.receivedData } +func (i *InternalChannel) GetInfos() map[string]iwfidl.ChannelInfo { + infos := make(map[string]iwfidl.ChannelInfo, len(i.receivedData)) + for name, l := range i.receivedData { + infos[name] = iwfidl.ChannelInfo{ + Size: ptr.Any(int32(len(l))), + } + } + return infos +} + func (i *InternalChannel) HasData(channelName string) bool { l := i.receivedData[channelName] return len(l) > 0 diff --git a/service/interpreter/queryHandler.go b/service/interpreter/queryHandler.go index 057748cc..6eb224f0 100644 --- a/service/interpreter/queryHandler.go +++ b/service/interpreter/queryHandler.go @@ -7,6 +7,7 @@ import ( func SetQueryHandlers( ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + internalChannel *InternalChannel, signalReceiver *SignalReceiver, continueAsNewer *ContinueAsNewer, workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo, ) error { @@ -45,6 +46,8 @@ func SetQueryHandlers( WorkflowStartedTimestamp: info.WorkflowStartTime.Unix(), IwfWorkflowType: basicInfo.IwfWorkflowType, IwfWorkerUrl: basicInfo.IwfWorkerUrl, + SignalChannelInfo: signalReceiver.GetInfos(), + InternalChannelInfo: internalChannel.GetInfos(), }, nil }) if err != nil { diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 50e00f7c..06aa17de 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -1,6 +1,7 @@ package interpreter import ( + "github.com/indeedeng/iwf/service/common/ptr" "strings" "github.com/indeedeng/iwf/gen/iwfidl" @@ -237,6 +238,16 @@ func (sr *SignalReceiver) GetAllReceived() map[string][]*iwfidl.EncodedObject { return sr.receivedSignals } +func (sr *SignalReceiver) GetInfos() map[string]iwfidl.ChannelInfo { + infos := make(map[string]iwfidl.ChannelInfo, len(sr.receivedSignals)) + for name, l := range sr.receivedSignals { + infos[name] = iwfidl.ChannelInfo{ + Size: ptr.Any(int32(len(l))), + } + } + return infos +} + // DrainAllReceivedButUnprocessedSignals will process all the signals that are received but not processed in the current // workflow task. // There are two cases this is needed: diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 3ba89d1a..b747f862 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -76,7 +76,7 @@ func InterpreterImpl( IwfWorkerUrl: input.IwfWorkerUrl, } - var interStateChannel *InternalChannel + var internalChannel *InternalChannel var stateRequestQueue *StateRequestQueue var persistenceManager *PersistenceManager var timerProcessor *TimerProcessor @@ -96,31 +96,31 @@ func InterpreterImpl( // The below initialization order should be the same as for non-continueAsNew - interStateChannel = RebuildInternalChannel(previous.InterStateChannelReceived) + internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) timerProcessor = NewTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) - signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) + signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner, counterInfo.StateIdStartedCount, counterInfo.StateIdCurrentlyExecutingCount, counterInfo.TotalCurrentlyExecutingCount, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(previous.StateOutputs) - continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) + continueAsNewer = NewContinueAsNewer(provider, internalChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) } else { - interStateChannel = NewInternalChannel() + internalChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) timerProcessor = NewTimerProcessor(ctx, provider, nil) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) - signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) + signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(nil) - continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) + continueAsNewer = NewContinueAsNewer(provider, internalChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) } - _, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, workflowConfiger, interStateChannel, basicInfo, globalVersioner) + _, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, workflowConfiger, internalChannel, signalReceiver, basicInfo, globalVersioner) if err != nil { retErr = err return @@ -129,7 +129,7 @@ func InterpreterImpl( // This is to ensure the correctness. If we set the query handler before that, // the query handler could return empty data (since the loading hasn't completed), which will be incorrect response. // We would rather return server errors and let the client retry later. - err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo) + err = SetQueryHandlers(ctx, provider, persistenceManager, internalChannel, signalReceiver, continueAsNewer, workflowConfiger, basicInfo) if err != nil { retErr = err return @@ -229,7 +229,7 @@ func InterpreterImpl( slices.Contains(input.WaitForCompletionStateIds, state.GetStateId()) decision, stateExecStatus, err := processStateExecution( - ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, + ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, internalChannel, signalReceiver, timerProcessor, continueAsNewer, continueAsNewCounter, workflowConfiger, shouldSendSignalOnCompletion) if err != nil { // this is the case where stateExecStatus == FailureStateExecutionStatus @@ -242,7 +242,7 @@ func InterpreterImpl( // NOTE: decision is only available on this CompletedStateExecutionStatus canGoNext, gracefulComplete, forceComplete, forceFail, output, err := - checkClosingWorkflow(ctx, provider, globalVersioner, decision, state.GetStateId(), stateExeId, interStateChannel, signalReceiver) + checkClosingWorkflow(ctx, provider, globalVersioner, decision, state.GetStateId(), stateExeId, internalChannel, signalReceiver) if err != nil { errToFailWf = err // no return so that it can fall through to call MarkStateExecutionCompleted diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index 448bbc19..ac4fb321 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -11,7 +11,8 @@ type WorkflowUpdater struct { provider WorkflowProvider continueAsNewer *ContinueAsNewer continueAsNewCounter *ContinueAsNewCounter - interStateChannel *InternalChannel + internalChannel *InternalChannel + signalReceiver *SignalReceiver stateRequestQueue *StateRequestQueue configer *WorkflowConfiger logger UnifiedLogger @@ -23,13 +24,15 @@ func NewWorkflowUpdater( ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, - interStateChannel *InternalChannel, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner, + internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo, + globalVersioner *GlobalVersioner, ) (*WorkflowUpdater, error) { updater := &WorkflowUpdater{ persistenceManager: persistenceManager, continueAsNewer: continueAsNewer, continueAsNewCounter: continueAsNewCounter, - interStateChannel: interStateChannel, + internalChannel: internalChannel, + signalReceiver: signalReceiver, stateRequestQueue: stateRequestQueue, configer: configer, basicInfo: basicInfo, @@ -61,6 +64,8 @@ func (u *WorkflowUpdater) handler( WorkflowStartedTimestamp: info.WorkflowStartTime.Unix(), IwfWorkflowType: u.basicInfo.IwfWorkflowType, IwfWorkerUrl: u.basicInfo.IwfWorkerUrl, + SignalChannelInfo: u.signalReceiver.GetInfos(), + InternalChannelInfo: u.internalChannel.GetInfos(), } activityOptions := ActivityOptions{ @@ -91,7 +96,7 @@ func (u *WorkflowUpdater) handler( u.continueAsNewCounter.IncSyncUpdateReceived() _ = u.persistenceManager.ProcessUpsertDataObject(ctx, rpcOutput.UpsertDataAttributes) _ = u.persistenceManager.ProcessUpsertSearchAttribute(ctx, rpcOutput.UpsertSearchAttributes) - u.interStateChannel.ProcessPublishing(rpcOutput.PublishToInterStateChannel) + u.internalChannel.ProcessPublishing(rpcOutput.PublishToInterStateChannel) if rpcOutput.StateDecision != nil { u.stateRequestQueue.AddStateStartRequests(rpcOutput.StateDecision.NextStates) }