diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index b51e2673c0b..39919b516cc 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -2789,7 +2789,7 @@ func (ms *MutableStateImpl) AddCompletedWorkflowEvent( } // TODO merge active & passive task generation if err := ms.taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -2834,7 +2834,7 @@ func (ms *MutableStateImpl) AddFailWorkflowEvent( } // TODO merge active & passive task generation if err := ms.taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -2878,7 +2878,7 @@ func (ms *MutableStateImpl) AddTimeoutWorkflowEvent( } // TODO merge active & passive task generation if err := ms.taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -2959,7 +2959,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionCanceledEvent( } // TODO merge active & passive task generation if err := ms.taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -3509,7 +3509,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionTerminatedEvent( } // TODO merge active & passive task generation if err := ms.taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), deleteAfterTerminate, ); err != nil { return nil, err @@ -3752,7 +3752,7 @@ func (ms *MutableStateImpl) AddContinueAsNewEvent( } // TODO merge active & passive task generation if err := ms.taskGenerator.GenerateWorkflowCloseTasks( - continueAsNewEvent, + continueAsNewEvent.GetEventTime(), false, ); err != nil { return nil, nil, err diff --git a/service/history/workflow/mutable_state_rebuilder.go b/service/history/workflow/mutable_state_rebuilder.go index 98ffd14af6a..6d3081042d8 100644 --- a/service/history/workflow/mutable_state_rebuilder.go +++ b/service/history/workflow/mutable_state_rebuilder.go @@ -560,7 +560,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( } if err := taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -575,7 +575,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( } if err := taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -590,7 +590,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( } if err := taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -605,7 +605,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( } if err := taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -620,7 +620,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( } if err := taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err @@ -667,7 +667,7 @@ func (b *MutableStateRebuilderImpl) applyEvents( } if err := taskGenerator.GenerateWorkflowCloseTasks( - event, + event.GetEventTime(), false, ); err != nil { return nil, err diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index c58db7edd7d..abdaf9e7640 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -278,7 +278,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimedOut() s.mockMutableState.EXPECT().ReplicateWorkflowExecutionTimedoutEvent(event.GetEventId(), event).Return(nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - event, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() @@ -310,7 +310,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTerminated s.mockMutableState.EXPECT().ReplicateWorkflowExecutionTerminatedEvent(event.GetEventId(), event).Return(nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - event, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() @@ -341,7 +341,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() { s.mockMutableState.EXPECT().ReplicateWorkflowExecutionFailedEvent(event.GetEventId(), event).Return(nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - event, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() @@ -373,7 +373,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCompleted( s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCompletedEvent(event.GetEventId(), event).Return(nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - event, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() @@ -405,7 +405,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCanceled() s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCanceledEvent(event.GetEventId(), event).Return(nil) s.mockUpdateVersion(event) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - event, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() @@ -503,7 +503,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA s.mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes() s.mockUpdateVersion(continueAsNewEvent) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - continueAsNewEvent, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() @@ -561,7 +561,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA s.mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes() s.mockUpdateVersion(continueAsNewEvent) s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks( - continueAsNewEvent, + &now, false, ).Return(nil) s.mockMutableState.EXPECT().ClearStickyTaskQueue() diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index e160fdc54bb..cd773a3fcbf 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -50,10 +50,7 @@ type ( startEvent *historypb.HistoryEvent, ) error GenerateWorkflowCloseTasks( - // TODO: remove closeEvent parameter - // when deprecating the backward compatible logic - // for getting close time from close event. - closeEvent *historypb.HistoryEvent, + closedTime *time.Time, deleteAfterClose bool, ) error // GenerateDeleteHistoryEventTask adds a tasks.DeleteHistoryEventTask to the mutable state. @@ -154,7 +151,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks( } func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( - closeEvent *historypb.HistoryEvent, + closedTime *time.Time, deleteAfterClose bool, ) error { @@ -206,7 +203,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( delay = retention } // archiveTime is the time when the archival queue recognizes the ArchiveExecutionTask as ready-to-process - archiveTime := closeEvent.GetEventTime().Add(delay) + archiveTime := timestamp.TimeValue(closedTime).Add(delay) // This flag is only untrue for old server versions which were using the archival workflow instead of the // archival queue. @@ -219,7 +216,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( } closeTasks = append(closeTasks, task) } else { - closeTime := timestamp.TimeValue(closeEvent.GetEventTime()) + closeTime := timestamp.TimeValue(closedTime) if err := r.GenerateDeleteHistoryEventTask(closeTime, false); err != nil { return err } diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index bf31a7e292f..5881e9560ec 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -288,17 +288,17 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateUserTimerTasks() *gomock.Call { } // GenerateWorkflowCloseTasks mocks base method. -func (m *MockTaskGenerator) GenerateWorkflowCloseTasks(closeEvent *history.HistoryEvent, deleteAfterClose bool) error { +func (m *MockTaskGenerator) GenerateWorkflowCloseTasks(closedTime *time.Time, deleteAfterClose bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenerateWorkflowCloseTasks", closeEvent, deleteAfterClose) + ret := m.ctrl.Call(m, "GenerateWorkflowCloseTasks", closedTime, deleteAfterClose) ret0, _ := ret[0].(error) return ret0 } // GenerateWorkflowCloseTasks indicates an expected call of GenerateWorkflowCloseTasks. -func (mr *MockTaskGeneratorMockRecorder) GenerateWorkflowCloseTasks(closeEvent, deleteAfterClose interface{}) *gomock.Call { +func (mr *MockTaskGeneratorMockRecorder) GenerateWorkflowCloseTasks(closedTime, deleteAfterClose interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWorkflowCloseTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateWorkflowCloseTasks), closeEvent, deleteAfterClose) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateWorkflowCloseTasks", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateWorkflowCloseTasks), closedTime, deleteAfterClose) } // GenerateWorkflowResetTasks mocks base method. diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index d7eb5a0ac7d..de3dff70f3e 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -56,7 +56,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.temporal.io/api/enums/v1" - historypb "go.temporal.io/api/history/v1" "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/archiver" @@ -277,12 +276,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { }).AnyTimes() taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata) - err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{ - Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{ - WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{}, - }, - EventTime: timestamp.TimePtr(p.CloseEventTime), - }, p.DeleteAfterClose) + err := taskGenerator.GenerateWorkflowCloseTasks(timestamp.TimePtr(p.CloseEventTime), p.DeleteAfterClose) require.NoError(t, err) var ( diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index f48228541bd..5797b35758b 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -188,13 +188,13 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowClose( executionState := mutableState.GetExecutionState() if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { - closeEvent, err := mutableState.GetCompletionEvent(ctx) + closeEventTime, err := mutableState.GetWorkflowCloseTime(ctx) if err != nil { return err } return taskGenerator.GenerateWorkflowCloseTasks( - closeEvent, + closeEventTime, false, ) }