Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(helpers): update "NewSubtaskStateManager" #7861

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 72 additions & 33 deletions backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
plugin "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
)

Expand Down Expand Up @@ -75,57 +75,96 @@ type SubtaskStateManager struct {
// NewSubtaskStateManager create a new SubtaskStateManager
func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskStateManager, err errors.Error) {
db := args.GetDal()
syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
plugin := args.SubTaskContext.TaskContext().GetName()
subtask := args.SubTaskContext.GetName()
// load sync policy and make sure it is not nil
syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
if syncPolicy == nil {
syncPolicy = &models.SyncPolicy{}
}

plugin := args.SubTaskContext.TaskContext().GetName()
subtask := args.SubTaskContext.GetName()
params := args.GetRawDataParams()
// load the previous state from the database
state := &models.SubtaskState{}
err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params))
preState, err := loadPreviousState(db, plugin, subtask, params)
if err != nil {
if db.IsErrorNotFound(err) {
state = &models.SubtaskState{
Plugin: plugin,
Subtask: subtask,
Params: params,
}
err = nil
} else {
err = errors.Default.Wrap(err, "failed to load the previous subtask state")
return
}
return
}
// fullsync by default

isIncremental, since := calculateStateManagerIncrementalMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig))

now := time.Now()
stateManager = &SubtaskStateManager{
db: db,
state: state,
state: preState,
syncPolicy: syncPolicy,
isIncremental: false,
since: syncPolicy.TimeAfter,
isIncremental: isIncremental,
since: since,
until: &now,
config: utils.ToJsonString(args.SubtaskConfig),
}
// fallback to the previous timeAfter if no new value
if stateManager.since == nil {
stateManager.since = state.TimeAfter
stateManager.since = preState.TimeAfter
}
// if fullsync is set or no previous success start time, we are in the full sync mode
if syncPolicy.FullSync || state.PrevStartedAt == nil {
return
return
}

func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.SubtaskState, errors.Error) {
// load the previous state from the database
preState := &models.SubtaskState{}
err := db.First(preState, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params))
if err != nil {
if db.IsErrorNotFound(err) {
preState = &models.SubtaskState{
Plugin: plugin,
Subtask: subtask,
Params: params,
}
} else {
return nil, errors.Default.Wrap(err, "failed to load the previous subtask state")
}
}

return preState, nil
}

// calculateStateManagerIncrementalMode tries to calculate whether state manager should run in incremental mode and returns the state manager's 'since' time.
func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
if preState == nil || syncPolicy == nil {
panic("preState or syncPolicy is nil")
}

// User click 'Collect Data in Full Refresh Mode'
// No matter whether there is a successful pipeline.
if syncPolicy.FullSync {
return false, syncPolicy.TimeAfter
}
// if timeAfter is not set or NOT before the previous value, we are in the incremental mode
if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter)) &&
// and the previous config is the same as the current config
(state.PrevConfig == "" || state.PrevConfig == stateManager.config) {
stateManager.isIncremental = true
stateManager.since = state.PrevStartedAt
// No previous success state means this pipeline has never been executed.
if preState.PrevStartedAt == nil {
return false, syncPolicy.TimeAfter
}
return
// When subtask config has changed, state manager should NOT in incremental mode.
if subTaskConfigHasChanged(preState, newSubtaskConfig) {
return false, syncPolicy.TimeAfter
}
// There is a sync policy and sync policy is earlier than latest successful pipeline's timeAfter
if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil && syncPolicy.TimeAfter.Before(*preState.TimeAfter) {
return false, syncPolicy.TimeAfter
}

// No need to do a full refresh, run task incrementally.
// New state manager's start time is previous state's finished time.
// But there is no such field, so use previous state's PrevStartedAt time.
return true, preState.PrevStartedAt
}

// subTaskConfigHasChanged checks whether the previous sub-task config is the same as the current sub-task config
// When plugin's scope config changes, Subtask's config may change.
func subTaskConfigHasChanged(preState *models.SubtaskState, newSubtaskConfig string) bool {
if preState == nil {
return true
}
preConfig := preState.PrevConfig
return preConfig != "" && preConfig != newSubtaskConfig
}

func (c *SubtaskStateManager) IsIncremental() bool {
Expand Down
Loading