diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index 613fea62..c18e8b48 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -3342,6 +3342,7 @@ components: - WORKFLOW_COMPLETE_EVENT - WORKFLOW_FAIL_EVENT - WORKFLOW_START_EVENT + - RPC_EXECUTION_EVENT type: string IwfEvent: properties: @@ -3357,6 +3358,8 @@ components: type: string stateExecutionId: type: string + rpcName: + type: string startTimestampInMs: format: int64 type: integer diff --git a/gen/iwfidl/docs/EventType.md b/gen/iwfidl/docs/EventType.md index 3da20187..83df3a50 100644 --- a/gen/iwfidl/docs/EventType.md +++ b/gen/iwfidl/docs/EventType.md @@ -29,6 +29,8 @@ * `WORKFLOW_START_EVENT` (value: `"WORKFLOW_START_EVENT"`) +* `RPC_EXECUTION_EVENT` (value: `"RPC_EXECUTION_EVENT"`) + [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/gen/iwfidl/docs/IwfEvent.md b/gen/iwfidl/docs/IwfEvent.md index 7877ba45..f9e58b48 100644 --- a/gen/iwfidl/docs/IwfEvent.md +++ b/gen/iwfidl/docs/IwfEvent.md @@ -10,6 +10,7 @@ Name | Type | Description | Notes **WorkflowRunId** | **string** | | **StateId** | Pointer to **string** | | [optional] **StateExecutionId** | Pointer to **string** | | [optional] +**RpcName** | Pointer to **string** | | [optional] **StartTimestampInMs** | Pointer to **int64** | | [optional] **EndTimestampInMs** | Pointer to **int64** | | [optional] @@ -162,6 +163,31 @@ SetStateExecutionId sets StateExecutionId field to given value. HasStateExecutionId returns a boolean if a field has been set. +### GetRpcName + +`func (o *IwfEvent) GetRpcName() string` + +GetRpcName returns the RpcName field if non-nil, zero value otherwise. + +### GetRpcNameOk + +`func (o *IwfEvent) GetRpcNameOk() (*string, bool)` + +GetRpcNameOk returns a tuple with the RpcName field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetRpcName + +`func (o *IwfEvent) SetRpcName(v string)` + +SetRpcName sets RpcName field to given value. + +### HasRpcName + +`func (o *IwfEvent) HasRpcName() bool` + +HasRpcName returns a boolean if a field has been set. + ### GetStartTimestampInMs `func (o *IwfEvent) GetStartTimestampInMs() int64` diff --git a/gen/iwfidl/model_event_type.go b/gen/iwfidl/model_event_type.go index 35684a6f..845b2d76 100644 --- a/gen/iwfidl/model_event_type.go +++ b/gen/iwfidl/model_event_type.go @@ -33,6 +33,7 @@ const ( WORKFLOW_COMPLETE_EVENT EventType = "WORKFLOW_COMPLETE_EVENT" WORKFLOW_FAIL_EVENT EventType = "WORKFLOW_FAIL_EVENT" WORKFLOW_START_EVENT EventType = "WORKFLOW_START_EVENT" + RPC_EXECUTION_EVENT EventType = "RPC_EXECUTION_EVENT" ) // All allowed values of EventType enum @@ -50,6 +51,7 @@ var AllowedEventTypeEnumValues = []EventType{ "WORKFLOW_COMPLETE_EVENT", "WORKFLOW_FAIL_EVENT", "WORKFLOW_START_EVENT", + "RPC_EXECUTION_EVENT", } func (v *EventType) UnmarshalJSON(src []byte) error { diff --git a/gen/iwfidl/model_iwf_event.go b/gen/iwfidl/model_iwf_event.go index ceac2424..62d5f90f 100644 --- a/gen/iwfidl/model_iwf_event.go +++ b/gen/iwfidl/model_iwf_event.go @@ -25,6 +25,7 @@ type IwfEvent struct { WorkflowRunId string `json:"workflowRunId"` StateId *string `json:"stateId,omitempty"` StateExecutionId *string `json:"stateExecutionId,omitempty"` + RpcName *string `json:"rpcName,omitempty"` StartTimestampInMs *int64 `json:"startTimestampInMs,omitempty"` EndTimestampInMs *int64 `json:"endTimestampInMs,omitempty"` } @@ -210,6 +211,38 @@ func (o *IwfEvent) SetStateExecutionId(v string) { o.StateExecutionId = &v } +// GetRpcName returns the RpcName field value if set, zero value otherwise. +func (o *IwfEvent) GetRpcName() string { + if o == nil || IsNil(o.RpcName) { + var ret string + return ret + } + return *o.RpcName +} + +// GetRpcNameOk returns a tuple with the RpcName field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *IwfEvent) GetRpcNameOk() (*string, bool) { + if o == nil || IsNil(o.RpcName) { + return nil, false + } + return o.RpcName, true +} + +// HasRpcName returns a boolean if a field has been set. +func (o *IwfEvent) HasRpcName() bool { + if o != nil && !IsNil(o.RpcName) { + return true + } + + return false +} + +// SetRpcName gets a reference to the given string and assigns it to the RpcName field. +func (o *IwfEvent) SetRpcName(v string) { + o.RpcName = &v +} + // GetStartTimestampInMs returns the StartTimestampInMs field value if set, zero value otherwise. func (o *IwfEvent) GetStartTimestampInMs() int64 { if o == nil || IsNil(o.StartTimestampInMs) { @@ -294,6 +327,9 @@ func (o IwfEvent) ToMap() (map[string]interface{}, error) { if !IsNil(o.StateExecutionId) { toSerialize["stateExecutionId"] = o.StateExecutionId } + if !IsNil(o.RpcName) { + toSerialize["rpcName"] = o.RpcName + } if !IsNil(o.StartTimestampInMs) { toSerialize["startTimestampInMs"] = o.StartTimestampInMs } diff --git a/iwf-idl b/iwf-idl index e5bc6236..a7fb5559 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit e5bc6236d36b977c2fec85c2bf5e0742f859e04e +Subproject commit a7fb55597de5591fffa406b6b5f07d85fda4dfee diff --git a/service/api/service.go b/service/api/service.go index 49994124..fdf53e7a 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/indeedeng/iwf/config" + "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/interpreter/env" "net/http" "os" @@ -612,7 +613,9 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost( func (s *serviceImpl) ApiV1WorkflowRpcPost( ctx context.Context, req iwfidl.WorkflowRpcRequest, ) (wresp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { - defer func() { log.CapturePanic(recover(), s.logger, &retError) }() + defer func() { + log.CapturePanic(recover(), s.logger, &retError) + }() if needLocking(req) { return s.handleRpcBySynchronousUpdate(ctx, req) @@ -628,6 +631,15 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost( return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId()) } + defer func() { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.RPC_EXECUTION_EVENT, + RpcName: &req.RpcName, + WorkflowType: rpcPrep.IwfWorkflowType, + WorkflowId: req.GetWorkflowId(), + }) + }() + resp, retError := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, s.config.Api.MaxWaitSeconds) if retError != nil { return nil, retError diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index ac4fb321..6846ee65 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -3,6 +3,7 @@ package interpreter import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/event" "time" ) @@ -52,11 +53,22 @@ func NewWorkflowUpdater( func (u *WorkflowUpdater) handler( ctx UnifiedContext, input iwfidl.WorkflowRpcRequest, ) (output *HandlerOutput, err error) { - u.continueAsNewer.IncreaseInflightOperation() defer u.continueAsNewer.DecreaseInflightOperation() info := u.provider.GetWorkflowInfo(ctx) + + defer func() { + if !u.provider.IsReplaying(ctx) { + event.Handle(iwfidl.IwfEvent{ + EventType: iwfidl.RPC_EXECUTION_EVENT, + RpcName: &input.RpcName, + WorkflowType: u.basicInfo.IwfWorkflowType, + WorkflowId: info.WorkflowExecution.ID, + }) + } + }() + rpcPrep := service.PrepareRpcQueryResponse{ DataObjects: u.persistenceManager.LoadDataObjects(ctx, input.DataAttributesLoadingPolicy), SearchAttributes: u.persistenceManager.LoadSearchAttributes(ctx, input.SearchAttributesLoadingPolicy), @@ -88,6 +100,7 @@ func (u *WorkflowUpdater) handler( handlerOutput := &HandlerOutput{ StatusError: activityOutput.StatusError, } + rpcOutput := activityOutput.RpcOutput if rpcOutput != nil { handlerOutput.RpcOutput = &iwfidl.WorkflowRpcResponse{