From 50c31312cb83af0c2d6e7f90e08294a08bdd866f Mon Sep 17 00:00:00 2001 From: d4x1 <1507509064@qq.com> Date: Wed, 7 Aug 2024 17:28:13 +0800 Subject: [PATCH 1/3] refactor(helpers): update "NewSubtaskStateManager" --- .../pluginhelper/api/subtask_state_manager.go | 103 ++++++++++++------ 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go b/backend/helpers/pluginhelper/api/subtask_state_manager.go index 22f71453d4b..fb3246afeef 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go @@ -25,7 +25,7 @@ import ( "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models" - plugin "github.com/apache/incubator-devlake/core/plugin" + "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/core/utils" ) @@ -75,57 +75,94 @@ type SubtaskStateManager struct { // NewSubtaskStateManager create a new SubtaskStateManager func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskStateManager, err errors.Error) { db := args.GetDal() - syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy() - plugin := args.SubTaskContext.TaskContext().GetName() - subtask := args.SubTaskContext.GetName() // load sync policy and make sure it is not nil + syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy() if syncPolicy == nil { syncPolicy = &models.SyncPolicy{} } + + plugin := args.SubTaskContext.TaskContext().GetName() + subtask := args.SubTaskContext.GetName() params := args.GetRawDataParams() - // load the previous state from the database - state := &models.SubtaskState{} - err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params)) + preState, err := loadPreviousState(db, plugin, subtask, params) if err != nil { - if db.IsErrorNotFound(err) { - state = &models.SubtaskState{ - Plugin: plugin, - Subtask: subtask, - Params: params, - } - err = nil - } else { - err = errors.Default.Wrap(err, "failed to load the previous subtask state") - return - } + return } - // fullsync by default + + isIncremental, since := calculateStateManagerMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig)) + now := time.Now() stateManager = &SubtaskStateManager{ db: db, - state: state, + state: preState, syncPolicy: syncPolicy, - isIncremental: false, - since: syncPolicy.TimeAfter, + isIncremental: isIncremental, + since: since, until: &now, config: utils.ToJsonString(args.SubtaskConfig), } // fallback to the previous timeAfter if no new value if stateManager.since == nil { - stateManager.since = state.TimeAfter + stateManager.since = preState.TimeAfter } - // if fullsync is set or no previous success start time, we are in the full sync mode - if syncPolicy.FullSync || state.PrevStartedAt == nil { - return + return +} + +func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.SubtaskState, errors.Error) { + // load the previous state from the database + preState := &models.SubtaskState{} + err := db.First(preState, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params)) + if err != nil { + if db.IsErrorNotFound(err) { + preState = &models.SubtaskState{ + Plugin: plugin, + Subtask: subtask, + Params: params, + } + } else { + return nil, errors.Default.Wrap(err, "failed to load the previous subtask state") + } } - // if timeAfter is not set or NOT before the previous vaule, we are in the incremental mode - if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter)) && - // and the previous config is the same as the current config - (state.PrevConfig == "" || state.PrevConfig == stateManager.config) { - stateManager.isIncremental = true - stateManager.since = state.PrevStartedAt + return preState, nil +} + +func calculateStateManagerMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) { + if preState == nil || syncPolicy == nil { + panic("preState or syncPolicy is nil") } - return + + // User click 'Collect Data in Full Refresh Mode' + // No matter whether there is a successful pipeline. + if syncPolicy.FullSync { + return false, syncPolicy.TimeAfter + } + // No previous success state means this pipeline has never been executed. + if preState.PrevStartedAt == nil { + return false, syncPolicy.TimeAfter + } + // When subtask config has changed, state manager should NOT in incremental mode. + if subTaskConfigHasChanged(preState, newSubtaskConfig) { + return false, syncPolicy.TimeAfter + } + // There is a sync policy and sync policy is earlier than latest successful pipeline's timeAfter + if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil || syncPolicy.TimeAfter.Before(*preState.TimeAfter) { + return false, syncPolicy.TimeAfter + } + + // No need to do a full refresh, run task incrementally. + // New state manager's start time is previous state's finished time. + // But there is no such field, so use previous state's PrevStartedAt time. + return true, preState.PrevStartedAt +} + +// subTaskConfigHasChanged checks whether the previous sub-task config is the same as the current sub-task config +// When plugin's scope config changes, Subtask's config may change. +func subTaskConfigHasChanged(preState *models.SubtaskState, newSubtaskConfig string) bool { + if preState == nil { + return true + } + preConfig := preState.PrevConfig + return preConfig != "" && preConfig != newSubtaskConfig } func (c *SubtaskStateManager) IsIncremental() bool { From 8bf11fb61f7297fef6745f80fb462090f2bfbad8 Mon Sep 17 00:00:00 2001 From: d4x1 <1507509064@qq.com> Date: Wed, 7 Aug 2024 18:51:14 +0800 Subject: [PATCH 2/3] fix(helper): fix unit test --- backend/helpers/pluginhelper/api/subtask_state_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go b/backend/helpers/pluginhelper/api/subtask_state_manager.go index fb3246afeef..338791c26dc 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go @@ -145,7 +145,7 @@ func calculateStateManagerMode(syncPolicy *models.SyncPolicy, preState *models.S return false, syncPolicy.TimeAfter } // There is a sync policy and sync policy is earlier than latest successful pipeline's timeAfter - if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil || syncPolicy.TimeAfter.Before(*preState.TimeAfter) { + if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil && syncPolicy.TimeAfter.Before(*preState.TimeAfter) { return false, syncPolicy.TimeAfter } From 67b26005de2812ea4cd2ff3e2e0566f388c7cb56 Mon Sep 17 00:00:00 2001 From: d4x1 <1507509064@qq.com> Date: Wed, 7 Aug 2024 18:59:43 +0800 Subject: [PATCH 3/3] refactor(helpers): rename calculateStateManagerIncrementalMode --- backend/helpers/pluginhelper/api/subtask_state_manager.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go b/backend/helpers/pluginhelper/api/subtask_state_manager.go index 338791c26dc..7e1f316724e 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go @@ -89,7 +89,7 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState return } - isIncremental, since := calculateStateManagerMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig)) + isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig)) now := time.Now() stateManager = &SubtaskStateManager{ @@ -126,7 +126,8 @@ func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.Subt return preState, nil } -func calculateStateManagerMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) { +// calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time. +func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) { if preState == nil || syncPolicy == nil { panic("preState or syncPolicy is nil") }