Skip to content

Commit

Permalink
Fix intelsdi-x#824: Add proper handling of subscription rollover
Browse files Browse the repository at this point in the history
Added a check in availablePlugins.collectMetrics for a nil strategy that
was causing a panic when metric subscriptions were in the process of
being moved.

Modified the handling of load/unload plugin events to properly handle
moving subscriptions between differing versions of a plugin that
provides the same metrics and added tests for this behavior in
control/control_test.go.
  • Loading branch information
IRCody committed Apr 6, 2016
1 parent 06118a4 commit da95bcb
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 59 deletions.
6 changes: 6 additions & 0 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
if pool == nil {
return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}
// If the strategy is nil but the pool exists we likely are waiting on the pool to be fully initialized
// because of a plugin load/unload event that is currently being processed. Prevents panic from using nil
// RoutingAndCaching.
if pool.Strategy() == nil {
return nil, errors.New("Plugin strategy not set")
}

metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID)

Expand Down
60 changes: 50 additions & 10 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,18 +589,22 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,
return serrs
}

func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []serror.SnapError) {
type gatheredPlugin struct {
plugin core.Plugin
subscriptionType strategy.SubscriptionType
}

func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, []serror.SnapError) {
var (
plugins []core.Plugin
plugins []gatheredPlugin
serrs []serror.SnapError
)

// here we resolve and retrieve plugins for each metric type.
// if the incoming metric type version is < 1, we treat that as
// latest as with plugins. The following two loops create a set
// of plugins with proper versions needed to discern the subscription
// types.
colPlugins := make(map[string]*loadedPlugin)
colPlugins := make(map[string]gatheredPlugin)
for _, mt := range mts {
// If the version provided is <1 we will get the latest
// plugin for the given metric.
Expand All @@ -612,8 +616,14 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
}))
continue
}

colPlugins[m.Plugin.Key()] = m.Plugin
subType := strategy.BoundSubscriptionType
if mt.Version() < 1 {
subType = strategy.UnboundSubscriptionType
}
colPlugins[fmt.Sprintf("%s:%d", m.Plugin.Key(), subType)] = gatheredPlugin{
plugin: m.Plugin,
subscriptionType: subType,
}
}
if len(serrs) > 0 {
return plugins, serrs
Expand All @@ -622,19 +632,47 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
for _, lp := range colPlugins {
plugins = append(plugins, lp)
}

if len(plugins) == 0 {
serrs = append(serrs, serror.New(errors.New("No plugins found")))
return nil, serrs
}
return plugins, nil
}

func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
var serrs []serror.SnapError

collectors, errs := p.gatherCollectors(mts)
if len(errs) > 0 {
serrs = append(serrs)
}
plugins = append(plugins, collectors...)

for _, gc := range collectors {
pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version()))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
pool.Subscribe(taskID, gc.subscriptionType)
if pool.Eligible() {
err = p.verifyPlugin(gc.plugin.(*loadedPlugin))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
}
serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin)
if serr != nil {
serrs = append(serrs, serr)
}
}
// plugins = append(plugins, collectors...)
// fmt.Println("\n\n Sub deps len collectors", len(collectors), "len plugins", len(plugins))
// fmt.Println()
for _, sub := range plugins {
// pools are created statically, not with keys like "publisher:foo:-1"
// here we check to see if the version of the incoming plugin is -1, and
Expand Down Expand Up @@ -741,7 +779,9 @@ func (p *pluginControl) UnsubscribeDeps(taskID string, mts []core.Metric, plugin
if len(errs) > 0 {
serrs = append(serrs, errs...)
}
plugins = append(plugins, collectors...)
for _, gc := range collectors {
plugins = append(plugins, gc.plugin)
}

for _, sub := range plugins {
pool, err := p.pluginRunner.AvailablePlugins().getPool(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version()))
Expand Down
Loading

0 comments on commit da95bcb

Please sign in to comment.