From ebdd1fbc870f8e9f11a40fcf6f19e6df35b15710 Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Wed, 18 May 2016 10:21:50 -0700 Subject: [PATCH] Adds control grpc server files Adds control grpc server files to control/ Adds control grpc use on startup along with configuration info (addr, port) needed for this. Moves control flags to control/flags.go Adds grpc configuration options to snapd option parsing (ListenAddr and port for control grpc server). --- control/config.go | 21 ++ control/config_test.go | 18 ++ control/control.go | 85 +++++-- control/control_grpc_server.go | 164 ++++++++++++ control/control_grpc_server_test.go | 373 ++++++++++++++++++++++++++++ control/flags.go | 16 +- snapd.go | 4 +- 7 files changed, 655 insertions(+), 26 deletions(-) create mode 100644 control/control_grpc_server.go create mode 100644 control/control_grpc_server_test.go diff --git a/control/config.go b/control/config.go index 1bef95150..55cc19867 100644 --- a/control/config.go +++ b/control/config.go @@ -37,6 +37,8 @@ import ( // default configuration values const ( + defaultListenAddr string = "127.0.0.1" + defaultListenPort int = 8082 defaultMaxRunningPlugins int = 3 defaultPluginTrust int = 1 defaultAutoDiscoverPath string = "" @@ -73,6 +75,8 @@ type Config struct { KeyringPaths string `json:"keyring_paths"yaml:"keyring_paths"` CacheExpiration jsonutil.Duration `json:"cache_expiration"yaml:"cache_expiration"` Plugins *pluginConfig `json:"plugins"yaml:"plugins"` + ListenAddr string `json:"listen_addr,omitempty"yaml:"listen_addr"` + ListenPort int `json:"listen_port,omitempty"yaml:"listen_port"` } const ( @@ -102,6 +106,12 @@ const ( "type": ["object", "null"], "properties" : {}, "additionalProperties": true + }, + "listen_addr": { + "type": "string" + }, + "listen_port": { + "type": "integer" } }, "additionalProperties": false @@ -112,6 +122,8 @@ const ( // get the default snapd configuration func GetDefaultConfig() *Config { return &Config{ + ListenAddr: defaultListenAddr, + ListenPort: defaultListenPort, MaxRunningPlugins: defaultMaxRunningPlugins, PluginTrust: defaultPluginTrust, AutoDiscoverPath: defaultAutoDiscoverPath, @@ -159,6 +171,15 @@ func (c *Config) UnmarshalJSON(data []byte) error { if err := json.Unmarshal(v, c.Plugins); err != nil { return err } + case "listen_addr": + if err := json.Unmarshal(v, &(c.ListenAddr)); err != nil { + return err + } + case "listen_port": + if err := json.Unmarshal(v, &(c.ListenPort)); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized key '%v' in global config file while parsing 'control'", k) } diff --git a/control/config_test.go b/control/config_test.go index 239546d46..f24323dad 100644 --- a/control/config_test.go +++ b/control/config_test.go @@ -162,6 +162,12 @@ func TestControlConfigJSON(t *testing.T) { Convey("MaxRunningPlugins should be set to 1", func() { So(cfg.MaxRunningPlugins, ShouldEqual, 1) }) + Convey("ListenAddr should be set to 0.0.0.0", func() { + So(cfg.ListenAddr, ShouldEqual, "0.0.0.0") + }) + Convey("ListenPort should be set to 10082", func() { + So(cfg.ListenPort, ShouldEqual, 10082) + }) Convey("KeyringPaths should be set to /some/path/with/keyring/files", func() { So(cfg.KeyringPaths, ShouldEqual, "/some/path/with/keyring/files") }) @@ -225,6 +231,12 @@ func TestControlConfigYaml(t *testing.T) { Convey("MaxRunningPlugins should be set to 1", func() { So(cfg.MaxRunningPlugins, ShouldEqual, 1) }) + Convey("ListenAddr should be set to 0.0.0.0", func() { + So(cfg.ListenAddr, ShouldEqual, "0.0.0.0") + }) + Convey("ListenPort should be set to 10082", func() { + So(cfg.ListenPort, ShouldEqual, 10082) + }) Convey("KeyringPaths should be set to /some/path/with/keyring/files", func() { So(cfg.KeyringPaths, ShouldEqual, "/some/path/with/keyring/files") }) @@ -277,6 +289,12 @@ func TestControlDefaultConfig(t *testing.T) { Convey("MaxRunningPlugins should equal 3", func() { So(cfg.MaxRunningPlugins, ShouldEqual, 3) }) + Convey("ListenAddr should be set to 127.0.0.1", func() { + So(cfg.ListenAddr, ShouldEqual, "127.0.0.1") + }) + Convey("ListenPort should be set to 8082", func() { + So(cfg.ListenPort, ShouldEqual, 8082) + }) Convey("KeyringPaths should be empty", func() { So(cfg.KeyringPaths, ShouldEqual, "") }) diff --git a/control/control.go b/control/control.go index 4bf18e20d..3fb298f31 100644 --- a/control/control.go +++ b/control/control.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io/ioutil" + "net" "os" "path" "path/filepath" @@ -31,6 +32,8 @@ import ( "sync" "time" + "google.golang.org/grpc" + log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/gomit" @@ -42,6 +45,7 @@ import ( "github.com/intelsdi-x/snap/core/control_event" "github.com/intelsdi-x/snap/core/ctypes" "github.com/intelsdi-x/snap/core/serror" + "github.com/intelsdi-x/snap/grpc/controlproxy/rpc" "github.com/intelsdi-x/snap/pkg/aci" "github.com/intelsdi-x/snap/pkg/psigning" ) @@ -64,7 +68,7 @@ var ( ErrLoadedPluginNotFound = errors.New("Loaded plugin not found") // ErrControllerNotStarted - error message when the Controller was not started - ErrControllerNotStarted = errors.New("Must start Controller before calling Load()") + ErrControllerNotStarted = errors.New("Must start Controller before use") ) type executablePlugins []plugin.ExecutablePlugin @@ -236,6 +240,7 @@ func (p *pluginControl) Start() error { controlLogger.WithFields(log.Fields{ "_block": "start", }).Info("control started") + //Autodiscover if p.Config.AutoDiscoverPath != "" { controlLogger.WithFields(log.Fields{ @@ -323,6 +328,23 @@ func (p *pluginControl) Start() error { "_block": "start", }).Info("auto discover path is disabled") } + + lis, err := net.Listen("tcp", fmt.Sprintf("%v:%v", p.Config.ListenAddr, p.Config.ListenPort)) + if err != nil { + controlLogger.WithField("error", err.Error()).Error("Failed to start control grpc listener") + return err + } + + opts := []grpc.ServerOption{} + grpcServer := grpc.NewServer(opts...) + rpc.RegisterMetricManagerServer(grpcServer, &ControlGRPCServer{p}) + go func() { + err := grpcServer.Serve(lis) + if err != nil { + controlLogger.Fatal(err) + } + }() + return nil } @@ -731,33 +753,35 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, [ func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError { var serrs []serror.SnapError - collectors, errs := p.gatherCollectors(mts) - if len(errs) > 0 { - serrs = append(serrs, errs...) - } - - for _, gc := range collectors { - pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version())) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs + if len(mts) != 0 { + collectors, errs := p.gatherCollectors(mts) + if len(errs) > 0 { + serrs = append(serrs, errs...) } - pool.Subscribe(taskID, gc.subscriptionType) - if pool.Eligible() { - err = p.verifyPlugin(gc.plugin.(*loadedPlugin)) + + for _, gc := range collectors { + pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version())) if err != nil { serrs = append(serrs, serror.New(err)) return serrs } - err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs + pool.Subscribe(taskID, gc.subscriptionType) + if pool.Eligible() { + err = p.verifyPlugin(gc.plugin.(*loadedPlugin)) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + } + serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin) + if serr != nil { + serrs = append(serrs, serr) } - } - serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin) - if serr != nil { - serrs = append(serrs, serr) } } for _, sub := range plugins { @@ -819,7 +843,6 @@ func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins serrs = append(serrs, serr) } } - return serrs } @@ -988,6 +1011,12 @@ func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool { // of metrics and errors. If an error is encountered no metrics will be // returned. func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string, allTags map[string]map[string]string) (metrics []core.Metric, errs []error) { + // If control is not started we don't want tasks to be able to + // go through a workflow. + if !p.Started { + return nil, []error{ErrControllerNotStarted} + } + for ns, nsTags := range allTags { for k, v := range nsTags { log.WithFields(log.Fields{ @@ -1064,6 +1093,11 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. // PublishMetrics func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { + // If control is not started we don't want tasks to be able to + // go through a workflow. + if !p.Started { + return []error{ErrControllerNotStarted} + } // merge global plugin config into the config for this request // without over-writing the task specific config cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, pluginName, pluginVersion).Table() @@ -1080,6 +1114,11 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi // ProcessMetrics func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { + // If control is not started we don't want tasks to be able to + // go through a workflow. + if !p.Started { + return "", nil, []error{ErrControllerNotStarted} + } // merge global plugin config into the config for this request // without over-writing the task specific config cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, pluginName, pluginVersion).Table() diff --git a/control/control_grpc_server.go b/control/control_grpc_server.go new file mode 100644 index 000000000..5583aef02 --- /dev/null +++ b/control/control_grpc_server.go @@ -0,0 +1,164 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package control + +import ( + "time" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" + "github.com/intelsdi-x/snap/grpc/common" + "github.com/intelsdi-x/snap/grpc/controlproxy/rpc" + "golang.org/x/net/context" +) + +type ControlGRPCServer struct { + control *pluginControl +} + +// --------- Scheduler's managesMetrics implementation ---------- + +func (pc *ControlGRPCServer) GetPluginContentTypes(ctx context.Context, r *rpc.GetPluginContentTypesRequest) (*rpc.GetPluginContentTypesReply, error) { + accepted, returned, err := pc.control.GetPluginContentTypes(r.Name, core.PluginType(int(r.PluginType)), int(r.Version)) + reply := &rpc.GetPluginContentTypesReply{ + AcceptedTypes: accepted, + ReturnedTypes: returned, + } + if err != nil { + reply.Error = err.Error() + } + return reply, nil +} + +func (pc *ControlGRPCServer) PublishMetrics(ctx context.Context, r *rpc.PubProcMetricsRequest) (*rpc.ErrorReply, error) { + errs := pc.control.PublishMetrics(r.ContentType, r.Content, r.PluginName, int(r.PluginVersion), common.ParseConfig(r.Config), r.TaskId) + erro := make([]string, len(errs)) + for i, v := range errs { + erro[i] = v.Error() + } + return &rpc.ErrorReply{Errors: erro}, nil +} + +func (pc *ControlGRPCServer) ProcessMetrics(ctx context.Context, r *rpc.PubProcMetricsRequest) (*rpc.ProcessMetricsReply, error) { + contentType, content, errs := pc.control.ProcessMetrics(r.ContentType, r.Content, r.PluginName, int(r.PluginVersion), common.ParseConfig(r.Config), r.TaskId) + erro := make([]string, len(errs)) + for i, v := range errs { + erro[i] = v.Error() + } + reply := &rpc.ProcessMetricsReply{ + ContentType: contentType, + Content: content, + Errors: erro, + } + return reply, nil +} + +func (pc *ControlGRPCServer) CollectMetrics(ctx context.Context, r *rpc.CollectMetricsRequest) (*rpc.CollectMetricsReply, error) { + metrics := common.ToCoreMetrics(r.Metrics) + deadline := time.Unix(r.Deadline.Sec, r.Deadline.Nsec) + var AllTags map[string]map[string]string + for k, v := range r.AllTags { + AllTags[k] = make(map[string]string) + for _, entry := range v.Entries { + AllTags[k][entry.Key] = entry.Value + } + } + mts, errs := pc.control.CollectMetrics(metrics, deadline, r.TaskID, AllTags) + var reply *rpc.CollectMetricsReply + if mts == nil { + reply = &rpc.CollectMetricsReply{ + Errors: errorsToStrings(errs), + } + } else { + reply = &rpc.CollectMetricsReply{ + Metrics: common.NewMetrics(mts), + Errors: errorsToStrings(errs), + } + } + return reply, nil +} + +func (pc *ControlGRPCServer) ExpandWildcards(ctx context.Context, r *rpc.ExpandWildcardsRequest) (*rpc.ExpandWildcardsReply, error) { + nss, serr := pc.control.ExpandWildcards(common.ToCoreNamespace(r.Namespace)) + reply := &rpc.ExpandWildcardsReply{} + if nss != nil { + reply.NSS = convertNSS(nss) + } + if serr != nil { + reply.Error = common.NewErrors([]serror.SnapError{serr})[0] + } + return reply, nil +} + +func (pc *ControlGRPCServer) ValidateDeps(ctx context.Context, r *rpc.ValidateDepsRequest) (*rpc.ValidateDepsReply, error) { + metrics := common.ToCoreMetrics(r.Metrics) + plugins := common.ToSubPlugins(r.Plugins) + serrors := pc.control.ValidateDeps(metrics, plugins) + return &rpc.ValidateDepsReply{Errors: common.NewErrors(serrors)}, nil +} + +func (pc *ControlGRPCServer) SubscribeDeps(ctx context.Context, r *rpc.SubscribeDepsRequest) (*rpc.SubscribeDepsReply, error) { + metrics := common.ToCoreMetrics(r.Metrics) + plugins := common.MsgToCorePlugins(r.Plugins) + serrors := pc.control.SubscribeDeps(r.TaskId, metrics, plugins) + return &rpc.SubscribeDepsReply{Errors: common.NewErrors(serrors)}, nil +} + +func (pc *ControlGRPCServer) UnsubscribeDeps(ctx context.Context, r *rpc.SubscribeDepsRequest) (*rpc.SubscribeDepsReply, error) { + metrics := common.ToCoreMetrics(r.Metrics) + plugins := common.MsgToCorePlugins(r.Plugins) + serrors := pc.control.UnsubscribeDeps(r.TaskId, metrics, plugins) + return &rpc.SubscribeDepsReply{Errors: common.NewErrors(serrors)}, nil +} + +func (pc *ControlGRPCServer) MatchQueryToNamespaces(ctx context.Context, r *rpc.ExpandWildcardsRequest) (*rpc.ExpandWildcardsReply, error) { + nss, serr := pc.control.MatchQueryToNamespaces(common.ToCoreNamespace(r.Namespace)) + reply := &rpc.ExpandWildcardsReply{} + if nss != nil { + reply.NSS = convertNSS(nss) + } + if serr != nil { + reply.Error = common.NewErrors([]serror.SnapError{serr})[0] + } + return reply, nil +} + +//-------- util --------------- + +func convertNSS(nss []core.Namespace) []*rpc.ArrString { + res := make([]*rpc.ArrString, len(nss)) + for i := range nss { + var tmp rpc.ArrString + tmp.S = common.ToNamespace(nss[i]) + res[i] = &tmp + } + return res +} + +func errorsToStrings(in []error) []string { + if len(in) == 0 { + return []string{} + } + erro := make([]string, len(in)) + for i, e := range in { + erro[i] = e.Error() + } + return erro +} diff --git a/control/control_grpc_server_test.go b/control/control_grpc_server_test.go new file mode 100644 index 000000000..375ac26d3 --- /dev/null +++ b/control/control_grpc_server_test.go @@ -0,0 +1,373 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package control + +import ( + "bytes" + "encoding/gob" + "net" + "path" + "testing" + "time" + + "golang.org/x/net/context" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/control/strategy" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" + "github.com/intelsdi-x/snap/core/ctypes" + "github.com/intelsdi-x/snap/grpc/common" + "github.com/intelsdi-x/snap/grpc/controlproxy" + "github.com/intelsdi-x/snap/grpc/controlproxy/rpc" + "github.com/intelsdi-x/snap/pkg/rpcutil" + . "github.com/smartystreets/goconvey/convey" +) + +// This test is meant to cover the grpc implementation of the subset of control +// features that scheduler uses. It is not intended to test the control features +// themselves, only that we are correctly passing data over grpc and correctly +// passing success/errors. +func TestGRPCServerScheduler(t *testing.T) { + l, _ := net.Listen("tcp", ":0") + l.Close() + cfg := GetDefaultConfig() + cfg.ListenPort = l.Addr().(*net.TCPAddr).Port + c := New(cfg) + err := c.Start() + + Convey("Starting control_proxy server/client", t, func() { + Convey("So err should be nil", func() { + So(err, ShouldBeNil) + }) + }) + // Load 3 plugins + // collector -- mock + // processor -- passthru + // publisher -- file + mock, err := core.NewRequestedPlugin(JSONRPCPluginPath) + if err != nil { + log.Fatal(err) + } + c.Load(mock) + passthru, err := core.NewRequestedPlugin(path.Join(SnapPath, "plugin", "snap-processor-passthru")) + if err != nil { + log.Fatal(err) + } + c.Load(passthru) + filepub, err := core.NewRequestedPlugin(path.Join(SnapPath, "plugin", "snap-publisher-file")) + if err != nil { + log.Fatal(err) + } + c.Load(filepub) + + conn, err := rpcutil.GetClientConnection(c.Config.ListenAddr, c.Config.ListenPort) + + Convey("Creating an rpc connection", t, func() { + Convey("Should not error", func() { + So(err, ShouldBeNil) + }) + }) + + client := rpc.NewMetricManagerClient(conn) + + Convey("Creating an RPC client to control RPC server", t, func() { + Convey("And a client should exist", func() { + So(client, ShouldNotBeNil) + }) + }) + //GetContentTypes + Convey("Getting Content Types", t, func() { + Convey("Should err if invalid plugin given", func() { + req := &rpc.GetPluginContentTypesRequest{ + Name: "bogus", + PluginType: int32(0), + Version: int32(0), + } + reply, err := client.GetPluginContentTypes(context.Background(), req) + // We don't expect rpc errors + So(err, ShouldBeNil) + So(reply.Error, ShouldNotEqual, "") + So(reply.Error, ShouldResemble, "plugin not found") + }) + Convey("Should return content types with valid plugin", func() { + req := &rpc.GetPluginContentTypesRequest{ + Name: "mock", + PluginType: int32(0), + Version: 0, + } + reply, err := client.GetPluginContentTypes(context.Background(), req) + So(err, ShouldBeNil) + So(reply.Error, ShouldEqual, "") + So(reply.AcceptedTypes, ShouldContain, "snap.gob") + So(reply.ReturnedTypes, ShouldContain, "snap.gob") + }) + }) + + validMetric := MockMetricType{ + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}...), + cfg: cdata.NewNode(), + ver: 0, + } + invalidMetric := MockMetricType{ + namespace: core.NewNamespace([]string{"this", "is", "invalid"}...), + cfg: cdata.NewNode(), + ver: 1000, + } + + // Verify that validate deps is properly passing through errors + Convey("Validating Deps", t, func() { + Convey("Should Fail if given invalid info", func() { + req := &rpc.ValidateDepsRequest{ + Metrics: common.NewMetrics([]core.Metric{invalidMetric}), + Plugins: common.ToSubPluginsMsg([]core.SubscribedPlugin{}), + } + reply, err := client.ValidateDeps(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + }) + Convey("with valid metrics", func() { + req := &rpc.ValidateDepsRequest{ + Metrics: common.NewMetrics([]core.Metric{validMetric}), + Plugins: common.ToSubPluginsMsg([]core.SubscribedPlugin{}), + } + reply, err := client.ValidateDeps(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + }) + }) + //Subscribe Deps: valid/invalid + Convey("SubscribeDeps", t, func() { + Convey("Should Error with invalid inputs", func() { + req := &rpc.SubscribeDepsRequest{ + Metrics: common.NewMetrics([]core.Metric{invalidMetric}), + Plugins: common.ToCorePluginsMsg([]core.Plugin{}), + TaskId: "my-snowflake-id", + } + reply, err := client.SubscribeDeps(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + So(reply.Errors[0].ErrorString, ShouldResemble, "Metric not found: /this/is/invalid") + }) + Convey("Should not error with valid inputs", func() { + req := &rpc.SubscribeDepsRequest{ + Metrics: common.NewMetrics([]core.Metric{validMetric}), + Plugins: common.ToCorePluginsMsg([]core.Plugin{}), + TaskId: "my-snowflake-id", + } + reply, err := client.SubscribeDeps(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldEqual, 0) + }) + }) + // unsubscribedeps -- valid/invalid + Convey("UnsubscribeDeps", t, func() { + Convey("Should Error with invalid inputs", func() { + req := &rpc.SubscribeDepsRequest{ + Metrics: common.NewMetrics([]core.Metric{invalidMetric}), + Plugins: common.ToCorePluginsMsg([]core.Plugin{}), + TaskId: "my-snowflake-id", + } + reply, err := client.UnsubscribeDeps(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + So(reply.Errors[0].ErrorString, ShouldResemble, "Metric not found: /this/is/invalid") + }) + Convey("Should not error with valid inputs", func() { + req := &rpc.SubscribeDepsRequest{ + Metrics: common.NewMetrics([]core.Metric{validMetric}), + Plugins: common.ToCorePluginsMsg([]core.Plugin{}), + TaskId: "my-snowflake-id", + } + reply, err := client.UnsubscribeDeps(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldEqual, 0) + }) + }) + //matchquerytonamespaces -- valid/invalid + Convey("MatchingQueryToNamespaces", t, func() { + Convey("Should error with invalid inputs", func() { + req := &rpc.ExpandWildcardsRequest{ + Namespace: common.ToNamespace(invalidMetric.Namespace()), + } + reply, err := client.MatchQueryToNamespaces(context.Background(), req) + // we don't expect rpc.errors + So(err, ShouldBeNil) + So(reply.Error, ShouldNotBeNil) + So(reply.Error.ErrorString, ShouldResemble, "Metric not found: /this/is/invalid") + }) + Convey("Should not error with invalid inputs", func() { + req := &rpc.ExpandWildcardsRequest{ + Namespace: common.ToNamespace(validMetric.Namespace()), + } + reply, err := client.MatchQueryToNamespaces(context.Background(), req) + // we don't expect rpc.errors + So(err, ShouldBeNil) + So(reply.Error, ShouldBeNil) + }) + }) + //expandwildcards -- valid/invalid + Convey("ExpandWildcards", t, func() { + Convey("Should error with invalid inputs", func() { + req := &rpc.ExpandWildcardsRequest{ + Namespace: common.ToNamespace(invalidMetric.Namespace()), + } + reply, err := client.ExpandWildcards(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(reply.Error, ShouldNotBeNil) + So(reply.Error.ErrorString, ShouldResemble, "Metric not found: /this/is/invalid") + }) + Convey("Should not error with valid inputs", func() { + req := &rpc.ExpandWildcardsRequest{ + Namespace: common.ToNamespace(validMetric.Namespace()), + } + reply, err := client.ExpandWildcards(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(reply.Error, ShouldBeNil) + }) + }) + // start plugin pools/provide task info so we can do collect/process/publishMetrics + // errors here indicate problems outside the scope of this test. + plugins := []string{"collector:mock:1", "processor:passthru:1", "publisher:file:3"} + lps := make([]*loadedPlugin, len(plugins)) + pools := make([]strategy.Pool, len(plugins)) + for i, v := range plugins { + lps[i], err = c.pluginManager.get(v) + if err != nil { + log.Fatal(err) + } + pools[i], err = c.pluginRunner.AvailablePlugins().getOrCreatePool(v) + if err != nil { + log.Fatal(err) + } + pools[i].Subscribe("my-snowflake-id", strategy.BoundSubscriptionType) + err = c.pluginRunner.runPlugin(lps[i].Details) + if err != nil { + log.Fatal(err) + } + } + //our returned metrics + var mts []core.Metric + //collect + Convey("CollectMetrics", t, func() { + Convey("Should error with invalid inputs", func() { + req := &rpc.CollectMetricsRequest{ + Metrics: common.NewMetrics([]core.Metric{invalidMetric}), + Deadline: &common.Time{ + Sec: int64(time.Now().Unix()), + Nsec: int64(time.Now().Nanosecond()), + }, + TaskID: "my-snowflake-id", + } + reply, err := client.CollectMetrics(context.Background(), req) + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + }) + Convey("should not error with valid inputs", func() { + req := &rpc.CollectMetricsRequest{ + Metrics: common.NewMetrics([]core.Metric{validMetric}), + Deadline: &common.Time{ + Sec: int64(time.Now().Unix()), + Nsec: int64(time.Now().Nanosecond()), + }, + TaskID: "my-snowflake-id", + } + reply, err := client.CollectMetrics(context.Background(), req) + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldEqual, 0) + So(reply.Metrics[0].Namespace, ShouldResemble, common.ToNamespace(validMetric.Namespace())) + // Used in a later test as metrics to be passed to processor + mts = common.ToCoreMetrics(reply.Metrics) + }) + }) + //our content to pass to publish + var content []byte + //process + Convey("ProcessMetrics", t, func() { + Convey("Should error with invalid inputs", func() { + req := controlproxy.GetPubProcReq("snap.gob", []byte{}, "bad name", 1, map[string]ctypes.ConfigValue{}, "my-snowflake-id") + reply, err := client.ProcessMetrics(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + So(reply.Errors[0], ShouldResemble, "bad key") + }) + Convey("should not error with valid inputs", func() { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + metrics := make([]plugin.MetricType, len(mts)) + for i, m := range mts { + metrics[i] = m.(plugin.MetricType) + } + enc.Encode(metrics) + req := controlproxy.GetPubProcReq("snap.gob", buf.Bytes(), "passthru", 1, map[string]ctypes.ConfigValue{}, "my-snowflake-id") + reply, err := client.ProcessMetrics(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldEqual, 0) + // content to pass to publisher + content = reply.Content + + }) + }) + //publishmetrics + Convey("PublishMetrics", t, func() { + Convey("Should error with invalid inputs", func() { + req := controlproxy.GetPubProcReq("snap.gob", []byte{}, "bad name", 1, map[string]ctypes.ConfigValue{}, "my-snowflake-id") + reply, err := client.PublishMetrics(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldNotEqual, 0) + + So(reply.Errors[0], ShouldResemble, "bad key") + }) + // Publish only returns no errors on success + Convey("should not error with valid inputs", func() { + config := make(map[string]ctypes.ConfigValue) + config["file"] = ctypes.ConfigValueStr{Value: "/tmp/grpcservertest.snap"} + req := controlproxy.GetPubProcReq("snap.gob", content, "file", 3, config, "my-snowflake-id") + reply, err := client.PublishMetrics(context.Background(), req) + // we don't expect rpc errors + So(err, ShouldBeNil) + So(len(reply.Errors), ShouldEqual, 0) + }) + }) +} + +func equal(a []string, b []string) bool { + if len(a) != len(b) { + return false + } + for idx := range a { + if a[idx] != b[idx] { + return false + } + } + return true +} diff --git a/control/flags.go b/control/flags.go index 6e1e267da..bb40b245a 100644 --- a/control/flags.go +++ b/control/flags.go @@ -34,6 +34,7 @@ var ( Value: defaultPluginTrust, EnvVar: "SNAP_TRUST_LEVEL", } + flAutoDiscover = cli.StringFlag{ Name: "auto-discover, a", Usage: "Auto discover paths separated by colons.", @@ -51,6 +52,17 @@ var ( EnvVar: "SNAP_CACHE_EXPIRATION", } - // Flags consumed by snapd - Flags = []cli.Flag{flNumberOfPLs, flPluginTrust, flAutoDiscover, flKeyringPaths, flCache} + flControlRpcPort = cli.IntFlag{ + Name: "control-listen-port", + Usage: "Listen port for control RPC server", + EnvVar: "SNAP_CONTROL_LISTEN_PORT", + } + + flControlRpcAddr = cli.StringFlag{ + Name: "control-listen-addr", + Usage: "Listen address for control RPC server", + EnvVar: "SNAP_CONTROL_LISTEN_ADDR", + } + + Flags = []cli.Flag{flNumberOfPLs, flAutoDiscover, flPluginTrust, flKeyringPaths, flCache, flControlRpcPort, flControlRpcAddr} ) diff --git a/snapd.go b/snapd.go index c9c53ec86..3806d250b 100644 --- a/snapd.go +++ b/snapd.go @@ -182,8 +182,8 @@ func main() { flMaxProcs, flConfig, } - app.Flags = append(app.Flags, scheduler.Flags...) app.Flags = append(app.Flags, control.Flags...) + app.Flags = append(app.Flags, scheduler.Flags...) app.Flags = append(app.Flags, rest.Flags...) app.Flags = append(app.Flags, tribe.Flags...) @@ -566,6 +566,8 @@ func applyCmdLineFlags(cfg *Config, ctx *cli.Context) { cfg.Control.AutoDiscoverPath = setStringVal(cfg.Control.AutoDiscoverPath, ctx, "auto-discover") cfg.Control.KeyringPaths = setStringVal(cfg.Control.KeyringPaths, ctx, "keyring-paths") cfg.Control.CacheExpiration = jsonutil.Duration{setDurationVal(cfg.Control.CacheExpiration.Duration, ctx, "cache-expiration")} + cfg.Control.ListenAddr = setStringVal(cfg.Control.ListenAddr, ctx, "control-listen-addr") + cfg.Control.ListenPort = setIntVal(cfg.Control.ListenPort, ctx, "control-listen-port") // next for the RESTful server related flags cfg.RestAPI.Enable = setBoolVal(cfg.RestAPI.Enable, ctx, "disable-api", invertBoolean) cfg.RestAPI.Port = setIntVal(cfg.RestAPI.Port, ctx, "api-port")