Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
First implementation of dynamic query support
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Mar 24, 2016
1 parent 89e29ab commit cc5c244
Show file tree
Hide file tree
Showing 10 changed files with 1,084 additions and 115 deletions.
158 changes: 107 additions & 51 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Copyright 2015-2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@ var (

// ErrLoadedPluginNotFound - error message when a loaded plugin is not found
ErrLoadedPluginNotFound = errors.New("Loaded plugin not found")

// ErrControllerNotStarted - error message when the Controller was not started
ErrControllerNotStarted = errors.New("Must start Controller before calling Load()")
)
Expand Down Expand Up @@ -108,7 +109,9 @@ type managesPlugins interface {
}

type catalogsMetrics interface {
Get([]string, int) (*metricType, error)
GetMetric([]string, int) (*metricType, error)
GetMetrics([]string, int) ([]*metricType, error)
GetMatchedMetrics([]string) ([][]string, error)
Add(*metricType)
AddLoadedMetricType(*loadedPlugin, core.Metric) error
RmUnloadedPluginMetrics(lp *loadedPlugin)
Expand All @@ -125,6 +128,31 @@ type managesSigning interface {
ValidateSignature([]string, string, []byte) error
}

type metric struct {
namespace []string
version int
config *cdata.ConfigDataNode
}

func (m *metric) Namespace() []string {
return m.namespace
}

func (m *metric) Config() *cdata.ConfigDataNode {
return m.config
}

func (m *metric) Version() int {
return m.version
}

func (m *metric) Data() interface{} { return nil }
func (m *metric) Tags() map[string]string { return nil }
func (m *metric) Labels() []core.Label { return nil }
func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) }
func (m *metric) Source() string { return "" }
func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) }

// PluginControlOpt is used to set optional parameters on the pluginControl struct
type PluginControlOpt func(*pluginControl)

Expand Down Expand Up @@ -445,10 +473,16 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
return nil
}

// ExpandWildCards expands wildcards in metric namespace `ns' and returns all matched metrics namespaces based on cataloged metrics
func (p *pluginControl) ExpandWildCards(ns []string) [][]string {
nss, _ := p.metricCatalog.GetMatchedMetrics(ns)
return nss
}

func (p *pluginControl) ValidateDeps(mts []core.Metric, plugins []core.SubscribedPlugin) []serror.SnapError {
var serrs []serror.SnapError
for _, mt := range mts {
_, errs := p.validateMetricTypeSubscription(mt, mt.Config())
errs := p.validateMetricTypeSubscription(mt, mt.Config())
if len(errs) > 0 {
serrs = append(serrs, errs...)
}
Expand Down Expand Up @@ -506,62 +540,59 @@ func (p *pluginControl) validatePluginSubscription(pl core.SubscribedPlugin) []s
return serrs
}

func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, cd *cdata.ConfigDataNode) (core.Metric, []serror.SnapError) {
func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, cd *cdata.ConfigDataNode) []serror.SnapError {
var serrs []serror.SnapError
controlLogger.WithFields(log.Fields{
"_block": "validate-metric-subscription",
"namespace": mt.Namespace(),
"version": mt.Version(),
}).Info("subscription called on metric")

m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
ms, err := p.metricCatalog.GetMetrics(mt.Namespace(), mt.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"version": mt.Version(),
}))
return nil, serrs
}

// No metric found return error.
if m == nil {
serrs = append(serrs, serror.New(fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)", mt.Namespace(), mt.Version())))
return nil, serrs
return serrs
}

m.config = cd
for i := range ms {
ms[i].config = cd

typ, serr := core.ToPluginType(m.Plugin.TypeName())
if serr != nil {
return nil, []serror.SnapError{serror.New(err)}
}

// merge global plugin config
if m.config != nil {
m.config.Merge(p.Config.Plugins.getPluginConfigDataNode(typ, m.Plugin.Name(), m.Plugin.Version()))
} else {
m.config = p.Config.Plugins.getPluginConfigDataNode(typ, m.Plugin.Name(), m.Plugin.Version())
}
typ, err := core.ToPluginType(ms[i].Plugin.TypeName())
if err != nil {
serrs = append(serrs, serror.New(err))
continue
}

// When a metric is added to the MetricCatalog, the policy of rules defined by the plugin is added to the metric's policy.
// If no rules are defined for a metric, we set the metric's policy to an empty ConfigPolicyNode.
// Checking m.policy for nil will not work, we need to check if rules are nil.
if m.policy.HasRules() {
if m.Config() == nil {
serrs = append(serrs, serror.New(fmt.Errorf("Policy defined for metric, (%s) version (%d), but no config defined in manifest", mt.Namespace(), mt.Version())))
return nil, serrs
// merge global plugin config
if ms[i].config != nil {
ms[i].config.Merge(p.Config.Plugins.getPluginConfigDataNode(typ, ms[i].Plugin.Name(), ms[i].Plugin.Version()))
} else {
ms[i].config = p.Config.Plugins.getPluginConfigDataNode(typ, ms[i].Plugin.Name(), ms[i].Plugin.Version())
}
ncdTable, errs := m.policy.Process(m.Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))

// When a metric is added to the MetricCatalog, the policy of rules defined by the plugin is added to the metric's policy.
// If no rules are defined for a metric, we set the metric's policy to an empty ConfigPolicyNode.
// Checking m.policy for nil will not work, we need to check if rules are nil.
if ms[i].policy.HasRules() {
if ms[i].Config() == nil {
serrs = append(serrs, serror.New(fmt.Errorf("Policy defined for metric, (%s) version (%d), but no config defined in manifest", mt.Namespace(), mt.Version())))
continue
}
ncdTable, errs := ms[i].policy.Process(ms[i].Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))
}
} else {
ms[i].config = cdata.FromTable(*ncdTable)
}
return nil, serrs
}
m.config = cdata.FromTable(*ncdTable)
}

return m, serrs
return serrs
}

func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []serror.SnapError) {
Expand All @@ -577,23 +608,26 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
// types.
colPlugins := make(map[string]*loadedPlugin)
for _, mt := range mts {
m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
ms, err := p.metricCatalog.GetMetrics(mt.Namespace(), mt.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"version": mt.Version(),
}))
continue
}
// if the metric subscription is to version -1, we need to carry
// that forward in the subscription.
if mt.Version() < 1 {
// make a copy of the loadedPlugin and overwrite the version.
npl := *m.Plugin
npl.Meta.Version = -1
colPlugins[npl.Key()] = &npl
} else {
colPlugins[m.Plugin.Key()] = m.Plugin

for _, m := range ms {
// if the metric subscription is to version -1, we need to carry
// that forward in the subscription.
if mt.Version() < 1 {
// make a copy of the loadedPlugin and overwrite the version.
npl := *m.Plugin
npl.Meta.Version = -1
colPlugins[npl.Key()] = &npl
} else {
colPlugins[m.Plugin.Key()] = m.Plugin
}
}
}
if len(serrs) > 0 {
Expand Down Expand Up @@ -814,7 +848,7 @@ func (p *pluginControl) FetchMetrics(ns []string, version int) ([]core.Cataloged
}

func (p *pluginControl) GetMetric(ns []string, ver int) (core.CatalogedMetric, error) {
return p.metricCatalog.Get(ns, ver)
return p.metricCatalog.GetMetric(ns, ver)
}

func (p *pluginControl) GetMetricVersions(ns []string) ([]core.CatalogedMetric, error) {
Expand All @@ -831,7 +865,7 @@ func (p *pluginControl) GetMetricVersions(ns []string) ([]core.CatalogedMetric,
}

func (p *pluginControl) MetricExists(mns []string, ver int) bool {
_, err := p.metricCatalog.Get(mns, ver)
_, err := p.metricCatalog.GetMetric(mns, ver)
if err == nil {
return true
}
Expand All @@ -842,7 +876,29 @@ func (p *pluginControl) MetricExists(mns []string, ver int) bool {
// of metrics and errors. If an error is encountered no metrics will be
// returned.
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) {
pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes)
// expanded metric based on wildcard/tuple rules
expandedMts := []core.Metric{}
for _, m := range metricTypes {
// getting matched metrics based on metric namespace
nss, err := p.metricCatalog.GetMatchedMetrics(m.Namespace())

if err != nil {
errs = append(errs, err)
continue
}

// create metric with appropriate namespace, inherited version and config, and append it to expandedMts
for _, ns := range nss {
expandedMt := &metric{
namespace: ns,
version: m.Version(),
config: m.Config(),
}
expandedMts = append(expandedMts, expandedMt)
}
}

pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, expandedMts)
if err != nil {
errs = append(errs, err)
return
Expand Down
Loading

0 comments on commit cc5c244

Please sign in to comment.