diff --git a/control/available_plugin.go b/control/available_plugin.go index 9e318d74e..c4687c0df 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -1,7 +1,6 @@ package control import ( - "crypto/rsa" "errors" "fmt" "strconv" @@ -68,7 +67,7 @@ type availablePlugin struct { // newAvailablePlugin returns an availablePlugin with information from a // plugin.Response -func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) { +func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) { if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType { return nil, ErrBadType } @@ -90,13 +89,13 @@ func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter case plugin.CollectorPluginType: switch resp.Meta.RPCType { case plugin.JSONRPC: - c, e := client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey) + c, e := client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) } ap.client = c case plugin.NativeRPC: - c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey) + c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) } @@ -105,13 +104,13 @@ func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter case plugin.PublisherPluginType: switch resp.Meta.RPCType { case plugin.JSONRPC: - c, e := client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey) + c, e := client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) } ap.client = c case plugin.NativeRPC: - c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey) + c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) } @@ -120,13 +119,13 @@ func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter case plugin.ProcessorPluginType: switch resp.Meta.RPCType { case plugin.JSONRPC: - c, e := client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey) + c, e := client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) } ap.client = c case plugin.NativeRPC: - c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey) + c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure) if e != nil { return nil, errors.New("error while creating client connection: " + e.Error()) } diff --git a/control/available_plugin_test.go b/control/available_plugin_test.go index 10dd72186..c86329cdb 100644 --- a/control/available_plugin_test.go +++ b/control/available_plugin_test.go @@ -1,8 +1,6 @@ package control import ( - "crypto/rand" - "crypto/rsa" "errors" "net" "testing" @@ -14,8 +12,6 @@ import ( func TestAvailablePlugin(t *testing.T) { Convey("newAvailablePlugin()", t, func() { - key, err := rsa.GenerateKey(rand.Reader, 2048) - So(err, ShouldBeNil) Convey("returns an availablePlugin", func() { ln, _ := net.Listen("tcp", ":4000") defer ln.Close() @@ -27,7 +23,7 @@ func TestAvailablePlugin(t *testing.T) { Type: plugin.CollectorPluginType, ListenAddress: "127.0.0.1:4000", } - ap, err := newAvailablePlugin(resp, key, nil, nil) + ap, err := newAvailablePlugin(resp, nil, nil) So(ap, ShouldHaveSameTypeAs, new(availablePlugin)) So(err, ShouldBeNil) }) @@ -35,9 +31,7 @@ func TestAvailablePlugin(t *testing.T) { Convey("Stop()", t, func() { Convey("returns nil if plugin successfully stopped", func() { - key, err := rsa.GenerateKey(rand.Reader, 2048) - So(err, ShouldBeNil) - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin-stop.log", } @@ -89,8 +83,6 @@ func TestAvailablePlugins(t *testing.T) { }) }) Convey("it returns an error if client cannot be created", t, func() { - key, err := rsa.GenerateKey(rand.Reader, 2048) - So(err, ShouldBeNil) resp := &plugin.Response{ Meta: plugin.PluginMeta{ Name: "test", @@ -99,7 +91,7 @@ func TestAvailablePlugins(t *testing.T) { Type: plugin.CollectorPluginType, ListenAddress: "localhost:", } - ap, err := newAvailablePlugin(resp, key, nil, nil) + ap, err := newAvailablePlugin(resp, nil, nil) So(ap, ShouldBeNil) So(err, ShouldNotBeNil) }) diff --git a/control/control.go b/control/control.go index 6c9574842..b495b4acf 100644 --- a/control/control.go +++ b/control/control.go @@ -1,8 +1,6 @@ package control import ( - "crypto/rand" - "crypto/rsa" "errors" "fmt" "strconv" @@ -24,11 +22,7 @@ import ( "github.com/intelsdi-x/pulse/pkg/psigning" ) -// control private key (RSA private key) -// control public key (RSA public key) // Plugin token = token generated by plugin and passed to control -// Session token = plugin seed encrypted by control private key, verified by plugin using control public key -// var ( controlLogger = log.WithFields(log.Fields{ @@ -47,8 +41,6 @@ type pluginControl struct { Started bool autodiscoverPaths []string - privKey *rsa.PrivateKey - pubKey *rsa.PublicKey eventManager *gomit.EventController pluginManager managesPlugins @@ -117,14 +109,7 @@ func CacheExpiration(t time.Duration) controlOpt { // New returns a new pluginControl instance func New(opts ...controlOpt) *pluginControl { - key, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - panic(err) - } - c := &pluginControl{ - pubKey: &key.PublicKey, - privKey: key, - } + c := &pluginControl{} // Initialize components // // Event Manager @@ -141,7 +126,7 @@ func New(opts ...controlOpt) *pluginControl { }).Debug("metric catalog created") // Plugin Manager - c.pluginManager = newPluginManager(c.pubKey, c.privKey) + c.pluginManager = newPluginManager() controlLogger.WithFields(log.Fields{ "_block": "new", }).Debug("plugin manager created") @@ -156,7 +141,7 @@ func New(opts ...controlOpt) *pluginControl { // Plugin Runner // TODO (danielscottt): handle routing strat changes via events - c.pluginRunner = newRunner(&routing.RoundRobinStrategy{}, c.privKey) + c.pluginRunner = newRunner(&routing.RoundRobinStrategy{}) controlLogger.WithFields(log.Fields{ "_block": "new", }).Debug("runner created") @@ -169,7 +154,7 @@ func New(opts ...controlOpt) *pluginControl { // Wire event manager // Start stuff - err = c.pluginRunner.Start() + err := c.pluginRunner.Start() if err != nil { panic(err) } diff --git a/control/control_test.go b/control/control_test.go index 9555a0c0f..cf4ceeaa3 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -2,8 +2,6 @@ package control import ( "bytes" - "crypto/rand" - "crypto/rsa" "encoding/gob" "encoding/json" "errors" @@ -363,7 +361,6 @@ func TestStop(t *testing.T) { } func TestPluginCatalog(t *testing.T) { - key, _ := rsa.GenerateKey(rand.Reader, 2048) ts := time.Now() c := New() @@ -371,7 +368,7 @@ func TestPluginCatalog(t *testing.T) { // We need our own plugin manager to drop mock // loaded plugins into. Aribitrarily adding // plugins from the pm is no longer supported. - tpm := newPluginManager(&key.PublicKey, key) + tpm := newPluginManager() c.pluginManager = tpm lp1 := new(loadedPlugin) diff --git a/control/monitor_test.go b/control/monitor_test.go index a52268253..9f10733e8 100644 --- a/control/monitor_test.go +++ b/control/monitor_test.go @@ -12,20 +12,6 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -type mockPluginClient struct{} - -func (mp *mockPluginClient) Ping() error { - return nil -} - -func (mp *mockPluginClient) Kill(r string) error { - return nil -} - -func (mp *mockPluginClient) GetConfigPolicy() error { - return nil -} - func TestMonitor(t *testing.T) { Convey("monitor", t, func() { aps := newAvailablePlugins(&routing.RoundRobinStrategy{}) diff --git a/control/plugin/client/httpjsonrpc.go b/control/plugin/client/httpjsonrpc.go index a9e687825..401488dd1 100644 --- a/control/plugin/client/httpjsonrpc.go +++ b/control/plugin/client/httpjsonrpc.go @@ -33,58 +33,64 @@ type httpJSONRPCClient struct { } // NewCollectorHttpJSONRPCClient returns CollectorHttpJSONRPCClient -func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginCollectorClient, error) { - key, err := encrypter.GenerateKey() - if err != nil { - return nil, err - } - e := encrypter.New(pub, priv) - e.Key = key - enc := encoding.NewJsonEncoder() - enc.SetEncrypter(e) - return &httpJSONRPCClient{ +func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) { + hjr := &httpJSONRPCClient{ url: u, timeout: timeout, pluginType: plugin.CollectorPluginType, - encrypter: e, - encoder: enc, - }, nil + encoder: encoding.NewJsonEncoder(), + } + if secure { + key, err := encrypter.GenerateKey() + if err != nil { + return nil, err + } + e := encrypter.New(pub, nil) + e.Key = key + hjr.encoder.SetEncrypter(e) + hjr.encrypter = e + } + return hjr, nil } -func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginProcessorClient, error) { - key, err := encrypter.GenerateKey() - if err != nil { - return nil, err - } - e := encrypter.New(pub, priv) - e.Key = key - enc := encoding.NewJsonEncoder() - enc.SetEncrypter(e) - return &httpJSONRPCClient{ +func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) { + hjr := &httpJSONRPCClient{ url: u, timeout: timeout, pluginType: plugin.ProcessorPluginType, - encrypter: e, - encoder: enc, - }, nil + encoder: encoding.NewJsonEncoder(), + } + if secure { + key, err := encrypter.GenerateKey() + if err != nil { + return nil, err + } + e := encrypter.New(pub, nil) + e.Key = key + hjr.encoder.SetEncrypter(e) + hjr.encrypter = e + } + return hjr, nil } -func NewPublisherHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginPublisherClient, error) { - key, err := encrypter.GenerateKey() - if err != nil { - return nil, err - } - e := encrypter.New(pub, priv) - e.Key = key - enc := encoding.NewJsonEncoder() - enc.SetEncrypter(e) - return &httpJSONRPCClient{ +func NewPublisherHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginPublisherClient, error) { + hjr := &httpJSONRPCClient{ url: u, timeout: timeout, pluginType: plugin.PublisherPluginType, - encrypter: e, - encoder: enc, - }, nil + encoder: encoding.NewJsonEncoder(), + } + if secure { + key, err := encrypter.GenerateKey() + if err != nil { + return nil, err + } + e := encrypter.New(pub, nil) + e.Key = key + hjr.encoder.SetEncrypter(e) + hjr.encrypter = e + } + return hjr, nil } // Ping diff --git a/control/plugin/client/httpjsonrpc_test.go b/control/plugin/client/httpjsonrpc_test.go index 6efaaf013..81b8116a5 100644 --- a/control/plugin/client/httpjsonrpc_test.go +++ b/control/plugin/client/httpjsonrpc_test.go @@ -26,7 +26,7 @@ import ( ) var ( - key, _ = rsa.GenerateKey(crand.Reader, 2048) + key, _ = rsa.GenerateKey(crand.Reader, 1024) symkey, _ = encrypter.GenerateKey() ) @@ -122,7 +122,7 @@ func (m *mockSessionStatePluginProxy) Kill(arg []byte, b *[]byte) error { var httpStarted = false func startHTTPJSONRPC() (string, *mockSessionStatePluginProxy) { - encr := encrypter.New(&key.PublicKey, key) + encr := encrypter.New(&key.PublicKey, nil) encr.Key = symkey ee := encoding.NewJsonEncoder() ee.SetEncrypter(encr) @@ -159,7 +159,7 @@ func TestHTTPJSONRPC(t *testing.T) { Convey("Collector Client", t, func() { session.c = true - c, err := NewCollectorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, key) + c, err := NewCollectorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, true) So(err, ShouldBeNil) So(c, ShouldNotBeNil) cl := c.(*httpJSONRPCClient) @@ -253,7 +253,7 @@ func TestHTTPJSONRPC(t *testing.T) { Convey("Processor Client", t, func() { session.c = false - p, _ := NewProcessorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, key) + p, _ := NewProcessorHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, true) cl := p.(*httpJSONRPCClient) cl.encrypter.Key = symkey So(p, ShouldNotBeNil) @@ -293,7 +293,7 @@ func TestHTTPJSONRPC(t *testing.T) { Convey("Publisher Client", t, func() { session.c = false - p, _ := NewPublisherHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, key) + p, _ := NewPublisherHttpJSONRPCClient(fmt.Sprintf("http://%v", addr), 1*time.Second, &key.PublicKey, true) cl := p.(*httpJSONRPCClient) cl.encrypter.Key = symkey So(p, ShouldNotBeNil) diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index b2a193e5d..c730970c8 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -30,16 +30,16 @@ type PluginNativeClient struct { encrypter *encrypter.Encrypter } -func NewCollectorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginCollectorClient, error) { - return newNativeClient(address, timeout, plugin.CollectorPluginType, pub, priv) +func NewCollectorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) { + return newNativeClient(address, timeout, plugin.CollectorPluginType, pub, secure) } -func NewPublisherNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginPublisherClient, error) { - return newNativeClient(address, timeout, plugin.PublisherPluginType, pub, priv) +func NewPublisherNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginPublisherClient, error) { + return newNativeClient(address, timeout, plugin.PublisherPluginType, pub, secure) } -func NewProcessorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, priv *rsa.PrivateKey) (PluginProcessorClient, error) { - return newNativeClient(address, timeout, plugin.ProcessorPluginType, pub, priv) +func NewProcessorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) { + return newNativeClient(address, timeout, plugin.ProcessorPluginType, pub, secure) } func (p *PluginNativeClient) Ping() error { @@ -220,7 +220,7 @@ func (p *PluginNativeClient) GetType() string { return upcaseInitial(p.pluginType.String()) } -func newNativeClient(address string, timeout time.Duration, t plugin.PluginType, pub *rsa.PublicKey, priv *rsa.PrivateKey) (*PluginNativeClient, error) { +func newNativeClient(address string, timeout time.Duration, t plugin.PluginType, pub *rsa.PublicKey, secure bool) (*PluginNativeClient, error) { // Attempt to dial address error on timeout or problem conn, err := net.DialTimeout("tcp", address, timeout) // Return nil RPCClient and err if encoutered @@ -233,16 +233,18 @@ func newNativeClient(address string, timeout time.Duration, t plugin.PluginType, pluginType: t, } - key, err := encrypter.GenerateKey() - if err != nil { - return nil, err - } p.encoder = encoding.NewGobEncoder() - encrypter := encrypter.New(pub, priv) - encrypter.Key = key - p.encrypter = encrypter - p.encoder.SetEncrypter(encrypter) + if secure { + key, err := encrypter.GenerateKey() + if err != nil { + return nil, err + } + encrypter := encrypter.New(pub, nil) + encrypter.Key = key + p.encrypter = encrypter + p.encoder.SetEncrypter(encrypter) + } return p, nil } diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index fe94b7017..aad63d0f2 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -85,6 +85,8 @@ type PluginMeta struct { ConcurrencyCount int // should always only be one instance of this plugin running Exclusive bool + // do not encrypt communication with this plugin + Unsecure bool } type metaOp func(m *PluginMeta) @@ -101,6 +103,12 @@ func Exclusive(e bool) metaOp { } } +func Unsecure(e bool) metaOp { + return func(m *PluginMeta) { + m.Unsecure = e + } +} + // NewPluginMeta constructs and returns a PluginMeta struct func NewPluginMeta(name string, version int, pluginType PluginType, acceptContentTypes, returnContentTypes []string, opts ...metaOp) *PluginMeta { // An empty accepted content type default to "pulse.*" @@ -149,8 +157,6 @@ func NewPluginMeta(name string, version int, pluginType PluginType, acceptConten type Arg struct { // Plugin file path to binary PluginLogPath string - // A public key from control used to verify RPC calls - not implemented yet - ControlPubKey *rsa.PublicKey // Ping timeout duration PingTimeoutDuration time.Duration @@ -159,9 +165,8 @@ type Arg struct { listenPort string } -func NewArg(pubkey *rsa.PublicKey, logpath string) Arg { +func NewArg(logpath string) Arg { return Arg{ - ControlPubKey: pubkey, PluginLogPath: logpath, PingTimeoutDuration: PingTimeoutDurationDefault, } @@ -207,17 +212,21 @@ func Start(m *PluginMeta, c Plugin, requestString string) (error, int) { rpc.RegisterName("Collector", proxy) r = &Response{ - Type: CollectorPluginType, - State: PluginSuccess, - Meta: *m, - PublicKey: &s.privateKey.PublicKey, + Type: CollectorPluginType, + State: PluginSuccess, + Meta: *m, + } + if !m.Unsecure { + r.PublicKey = &s.privateKey.PublicKey } case PublisherPluginType: r = &Response{ - Type: PublisherPluginType, - State: PluginSuccess, - Meta: *m, - PublicKey: &s.privateKey.PublicKey, + Type: PublisherPluginType, + State: PluginSuccess, + Meta: *m, + } + if !m.Unsecure { + r.PublicKey = &s.privateKey.PublicKey } // Create our proxy proxy := &publisherPluginProxy{ @@ -229,10 +238,12 @@ func Start(m *PluginMeta, c Plugin, requestString string) (error, int) { rpc.RegisterName("Publisher", proxy) case ProcessorPluginType: r = &Response{ - Type: ProcessorPluginType, - State: PluginSuccess, - Meta: *m, - PublicKey: &s.privateKey.PublicKey, + Type: ProcessorPluginType, + State: PluginSuccess, + Meta: *m, + } + if !m.Unsecure { + r.PublicKey = &s.privateKey.PublicKey } // Create our proxy proxy := &processorPluginProxy{ diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index 3743fca25..7af49d241 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -38,7 +38,7 @@ func TestMetricType(t *testing.T) { func TestArg(t *testing.T) { Convey("NewArg", t, func() { - arg := NewArg(nil, "/tmp/pulse/plugin.log") + arg := NewArg("/tmp/pulse/plugin.log") So(arg, ShouldNotBeNil) }) } diff --git a/control/plugin/session.go b/control/plugin/session.go index 3c87a1b9e..bc43c47c2 100644 --- a/control/plugin/session.go +++ b/control/plugin/session.go @@ -248,24 +248,27 @@ func NewSessionState(pluginArgsMsg string, plugin Plugin, meta *PluginMeta) (*Se case NativeRPC: enc = encoding.NewGobEncoder() } - key, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - return nil, err, 2 + ss := &SessionState{ + Arg: pluginArg, + Encoder: enc, + + plugin: plugin, + token: rs, + killChan: make(chan int), + logger: logger, + } + + if !meta.Unsecure { + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, err, 2 + } + encrypt := encrypter.New(nil, key) + enc.SetEncrypter(encrypt) + ss.Encrypter = encrypt + ss.privateKey = key } - encrypt := encrypter.New(pluginArg.ControlPubKey, key) - enc.SetEncrypter(encrypt) - - return &SessionState{ - Arg: pluginArg, - Encoder: enc, - Encrypter: encrypt, - - plugin: plugin, - token: rs, - killChan: make(chan int), - logger: logger, - privateKey: key, - }, nil, 0 + return ss, nil, 0 } func init() { diff --git a/control/plugin_manager.go b/control/plugin_manager.go index ec5ddfa48..24c0b6738 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -3,7 +3,6 @@ package control import ( - "crypto/rsa" "errors" "fmt" "os" @@ -183,17 +182,13 @@ func (lp *loadedPlugin) LoadedTimestamp() *time.Time { type pluginManager struct { metricCatalog catalogsMetrics loadedPlugins *loadedPlugins - privateKey *rsa.PrivateKey - publicKey *rsa.PublicKey logPath string } -func newPluginManager(pubKey *rsa.PublicKey, privKey *rsa.PrivateKey) *pluginManager { +func newPluginManager() *pluginManager { p := &pluginManager{ loadedPlugins: newLoadedPlugins(), logPath: "/tmp", - privateKey: privKey, - publicKey: pubKey, } return p } @@ -242,7 +237,7 @@ func (p *pluginManager) LoadPlugin(path string, emitter gomit.Emitter) (*loadedP return nil, perror.New(err) } - ap, err := newAvailablePlugin(resp, p.privateKey, emitter, ePlugin) + ap, err := newAvailablePlugin(resp, emitter, ePlugin) if err != nil { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", @@ -251,7 +246,11 @@ func (p *pluginManager) LoadPlugin(path string, emitter gomit.Emitter) (*loadedP return nil, perror.New(err) } - err = ap.client.SetKey() + if resp.Meta.Unsecure { + err = ap.client.Ping() + } else { + err = ap.client.SetKey() + } if err != nil { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", @@ -425,7 +424,7 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, perror.Puls func (p *pluginManager) GenerateArgs(pluginPath string) plugin.Arg { pluginLog := filepath.Join(p.logPath, filepath.Base(pluginPath)) + ".log" - return plugin.NewArg(p.publicKey, pluginLog) + return plugin.NewArg(pluginLog) } func (p *pluginManager) teardown() { diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 1585a8ec8..ab7fa22ba 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -1,8 +1,6 @@ package control import ( - "crypto/rand" - "crypto/rsa" "errors" "os" "path" @@ -59,11 +57,10 @@ func TestLoadPlugin(t *testing.T) { // build the plugins first into the build dir if PulsePath != "" { - key, _ := rsa.GenerateKey(rand.Reader, 2048) Convey("PluginManager.LoadPlugin", t, func() { Convey("loads plugin successfully", func() { - p := newPluginManager(&key.PublicKey, key) + p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) @@ -74,7 +71,7 @@ func TestLoadPlugin(t *testing.T) { }) Convey("loads json-rpc plugin successfully", func() { - p := newPluginManager(&key.PublicKey, key) + p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(JSONRPC_PluginPath, nil) @@ -99,12 +96,11 @@ func TestLoadPlugin(t *testing.T) { func TestUnloadPlugin(t *testing.T) { if PulsePath != "" { - key, _ := rsa.GenerateKey(rand.Reader, 2048) Convey("pluginManager.UnloadPlugin", t, func() { Convey("when a loaded plugin is unloaded", func() { Convey("then it is removed from the loadedPlugins", func() { - p := newPluginManager(&key.PublicKey, key) + p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath, nil) So(err, ShouldBeNil) @@ -121,7 +117,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a loaded plugin is not in a PluginLoaded state", func() { Convey("then an error is thrown", func() { - p := newPluginManager(&key.PublicKey, key) + p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) glp, err2 := p.get("collector:dummy1:1") @@ -134,7 +130,7 @@ func TestUnloadPlugin(t *testing.T) { Convey("when a plugin is already unloaded", func() { Convey("then an error is thrown", func() { - p := newPluginManager(&key.PublicKey, key) + p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath, nil) diff --git a/control/runner.go b/control/runner.go index a83d690ff..2e34956c1 100644 --- a/control/runner.go +++ b/control/runner.go @@ -1,7 +1,6 @@ package control import ( - "crypto/rsa" "errors" "fmt" "strings" @@ -46,12 +45,10 @@ type runner struct { metricCatalog catalogsMetrics pluginManager managesPlugins routingStrategy RoutingStrategy - privKey *rsa.PrivateKey } -func newRunner(routingStrategy RoutingStrategy, privKey *rsa.PrivateKey) *runner { +func newRunner(routingStrategy RoutingStrategy) *runner { r := &runner{ - privKey: privKey, monitor: newMonitor(), availablePlugins: newAvailablePlugins(routingStrategy), routingStrategy: routingStrategy, @@ -181,12 +178,16 @@ func (r *runner) startPlugin(p executablePlugin) (*availablePlugin, error) { } // build availablePlugin - ap, err := newAvailablePlugin(resp, r.privKey, r.emitter, p) + ap, err := newAvailablePlugin(resp, r.emitter, p) if err != nil { return nil, err } - err = ap.client.SetKey() + if resp.Meta.Unsecure { + err = ap.client.Ping() + } else { + err = ap.client.SetKey() + } if err != nil { return nil, err } diff --git a/control/runner_test.go b/control/runner_test.go index 1d87aa431..71f3cf7e6 100644 --- a/control/runner_test.go +++ b/control/runner_test.go @@ -1,8 +1,6 @@ package control import ( - "crypto/rand" - "crypto/rsa" "errors" "io" "testing" @@ -149,7 +147,6 @@ func TestRunnerState(t *testing.T) { // log.SetLevel(log.DebugLevel) // log.SetOutput(os.Stdout) - key, _ := rsa.GenerateKey(rand.Reader, 2048) Convey("pulse/control", t, func() { Convey("Runner", func() { @@ -157,7 +154,7 @@ func TestRunnerState(t *testing.T) { Convey(".AddDelegates", func() { Convey("adds a handler delegate", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.AddDelegates(new(MockHandlerDelegate)) r.SetEmitter(new(MockEmitter)) @@ -165,7 +162,7 @@ func TestRunnerState(t *testing.T) { }) Convey("adds multiple delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.AddDelegates(new(MockHandlerDelegate)) r.AddDelegates(new(MockHandlerDelegate)) @@ -173,7 +170,7 @@ func TestRunnerState(t *testing.T) { }) Convey("adds multiple delegates (batch)", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.AddDelegates(new(MockHandlerDelegate), new(MockHandlerDelegate)) So(len(r.delegates), ShouldEqual, 2) @@ -184,7 +181,7 @@ func TestRunnerState(t *testing.T) { Convey(".Start", func() { Convey("returns error without adding delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) e := r.Start() So(e, ShouldNotBeNil) @@ -192,7 +189,7 @@ func TestRunnerState(t *testing.T) { }) Convey("starts after adding one delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) m1 := new(MockHandlerDelegate) r.AddDelegates(m1) e := r.Start() @@ -202,7 +199,7 @@ func TestRunnerState(t *testing.T) { }) Convey("starts after after adding multiple delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) m1 := new(MockHandlerDelegate) m2 := new(MockHandlerDelegate) m3 := new(MockHandlerDelegate) @@ -217,7 +214,7 @@ func TestRunnerState(t *testing.T) { }) Convey("error if delegate cannot RegisterHandler", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) me := new(MockHandlerDelegate) me.ErrorMode = true r.AddDelegates(me) @@ -232,7 +229,7 @@ func TestRunnerState(t *testing.T) { Convey(".Stop", func() { Convey("removes handlers from delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) m1 := new(MockHandlerDelegate) m2 := new(MockHandlerDelegate) m3 := new(MockHandlerDelegate) @@ -249,7 +246,7 @@ func TestRunnerState(t *testing.T) { }) Convey("returns errors for handlers errors on stop", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) m1 := new(MockHandlerDelegate) m1.StopError = errors.New("0") m2 := new(MockHandlerDelegate) @@ -278,16 +275,13 @@ func TestRunnerState(t *testing.T) { func TestRunnerPluginRunning(t *testing.T) { // log.SetLevel(log.DebugLevel) Convey("pulse/control", t, func() { - key, err := rsa.GenerateKey(rand.Reader, 2048) - So(err, ShouldBeNil) - Convey("Runner", func() { Convey("startPlugin", func() { // These tests only work if Pulse Path is known to discover dummy plugin used for testing if PulsePath != "" { Convey("should return an AvailablePlugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin.log", @@ -312,7 +306,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("availablePlugins should include returned availablePlugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin.log", @@ -332,7 +326,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("healthcheck on healthy plugin does not increment failedHealthChecks", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin.log", @@ -351,7 +345,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("healthcheck on unhealthy plugin increments failedHealthChecks", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin.log", @@ -370,7 +364,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("successful healthcheck resets failedHealthChecks", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin-foo.log", @@ -393,7 +387,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("three consecutive failedHealthChecks disables the plugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin.log", @@ -414,7 +408,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error for WaitForResponse error", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) r.SetEmitter(new(MockEmitter)) exPlugin := new(MockExecutablePlugin) exPlugin.Timeout = true // set to not response @@ -425,7 +419,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error for nil availablePlugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) exPlugin := new(MockExecutablePlugin) exPlugin.NilResponse = true // set to not response ap, e := r.startPlugin(exPlugin) @@ -435,7 +429,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error if plugin fails while starting", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) exPlugin := &MockExecutablePlugin{ StartError: true, } @@ -446,7 +440,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error if plugin fails to start", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) exPlugin := &MockExecutablePlugin{ PluginFailure: true, } @@ -461,7 +455,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("stopPlugin", func() { Convey("should return an AvailablePlugin in a Running state", func() { - r := newRunner(&routing.RoundRobinStrategy{}, key) + r := newRunner(&routing.RoundRobinStrategy{}) a := plugin.Arg{ PluginLogPath: "/tmp/pulse-test-plugin-stop.log", } diff --git a/plugin/collector/pulse-collector-dummy1/dummy/dummy.go b/plugin/collector/pulse-collector-dummy1/dummy/dummy.go index 26a5c29a6..3e882c24d 100644 --- a/plugin/collector/pulse-collector-dummy1/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy1/dummy/dummy.go @@ -58,5 +58,5 @@ func (f *Dummy) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { //Meta returns meta data for testing func Meta() *plugin.PluginMeta { - return plugin.NewPluginMeta(Name, Version, Type, []string{plugin.PulseGOBContentType}, []string{plugin.PulseGOBContentType}) + return plugin.NewPluginMeta(Name, Version, Type, []string{plugin.PulseGOBContentType}, []string{plugin.PulseGOBContentType}, plugin.Unsecure(true)) }