Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Consolidate multiple conversions of SubscribedPlugin to CorePlugin in…
Browse files Browse the repository at this point in the history
… one function call
  • Loading branch information
geauxvirtual committed Oct 8, 2015
1 parent ebbff15 commit dcf9716
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,7 @@ func (s *scheduler) CreateTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, s
"task-id": task.ID(),
}).Info("starting task on creation")

cps := make([]core.Plugin, len(plugins))
for i, plugin := range plugins {
cps[i] = plugin
}
cps := returnCorePlugin(plugins)
errs := s.metricManager.SubscribeDeps(task.ID(), mts, cps)
if len(errs) > 0 {
te.errs = append(te.errs, errs...)
Expand Down Expand Up @@ -246,10 +243,7 @@ func (s *scheduler) StartTask(id string) []perror.PulseError {
}

mts, plugins := s.gatherMetricsAndPlugins(t.workflow)
cps := make([]core.Plugin, len(plugins))
for i, plugin := range plugins {
cps[i] = plugin
}
cps := returnCorePlugin(plugins)
errs := s.metricManager.SubscribeDeps(id, mts, cps)
if len(errs) > 0 {
return errs
Expand Down Expand Up @@ -282,10 +276,7 @@ func (s *scheduler) StopTask(id string) []perror.PulseError {
}

mts, plugins := s.gatherMetricsAndPlugins(t.workflow)
cps := make([]core.Plugin, len(plugins))
for i, plugin := range plugins {
cps[i] = plugin
}
cps := returnCorePlugin(plugins)
errs := s.metricManager.UnsubscribeDeps(t.ID(), mts, cps)
if len(errs) > 0 {
return errs
Expand Down Expand Up @@ -439,6 +430,14 @@ func (s *scheduler) walkWorkflow(prnodes []*processNode, pbnodes []*publishNode,
}
}

func returnCorePlugin(plugins []core.SubscribedPlugin) []core.Plugin {
cps := make([]core.Plugin, len(plugins))
for i, plugin := range plugins {
cps[i] = plugin
}
return cps
}

func buildErrorsLog(errs []perror.PulseError, logger *log.Entry) *log.Entry {
for i, e := range errs {
logger = logger.WithField(fmt.Sprintf("%s[%d]", "error", i), e.Error())
Expand Down

0 comments on commit dcf9716

Please sign in to comment.