Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds encryption by default.
Browse files Browse the repository at this point in the history
This commit adds payload encryption to both types of RPC currently
supported by Pulse.
  • Loading branch information
pittma committed Sep 28, 2015
1 parent 4249909 commit 3e035d0
Show file tree
Hide file tree
Showing 49 changed files with 1,195 additions and 746 deletions.
27 changes: 20 additions & 7 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package control

import (
"crypto/rsa"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -67,7 +68,7 @@ type availablePlugin struct {

// newAvailablePlugin returns an availablePlugin with information from a
// plugin.Response
func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) {
func newAvailablePlugin(resp *plugin.Response, privKey *rsa.PrivateKey, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) {
if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType {
return nil, ErrBadType
}
Expand All @@ -89,9 +90,13 @@ func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executa
case plugin.CollectorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
ap.client = client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout)
c, e := client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout)
c, e := client.NewCollectorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand All @@ -100,9 +105,13 @@ func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executa
case plugin.PublisherPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
ap.client = client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout)
c, e := client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout)
c, e := client.NewPublisherNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand All @@ -111,9 +120,13 @@ func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executa
case plugin.ProcessorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
ap.client = client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout)
c, e := client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, privKey)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
ap.client = c
case plugin.NativeRPC:
c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout)
c, e := client.NewProcessorNativeClient(resp.ListenAddress, DefaultClientTimeout, resp.PublicKey, privKey)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand Down
14 changes: 11 additions & 3 deletions control/available_plugin_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package control

import (
"crypto/rand"
"crypto/rsa"
"errors"
"net"
"testing"
Expand All @@ -12,6 +14,8 @@ import (

func TestAvailablePlugin(t *testing.T) {
Convey("newAvailablePlugin()", t, func() {
key, err := rsa.GenerateKey(rand.Reader, 2048)
So(err, ShouldBeNil)
Convey("returns an availablePlugin", func() {
ln, _ := net.Listen("tcp", ":4000")
defer ln.Close()
Expand All @@ -23,15 +27,17 @@ func TestAvailablePlugin(t *testing.T) {
Type: plugin.CollectorPluginType,
ListenAddress: "127.0.0.1:4000",
}
ap, err := newAvailablePlugin(resp, nil, nil)
ap, err := newAvailablePlugin(resp, key, nil, nil)
So(ap, ShouldHaveSameTypeAs, new(availablePlugin))
So(err, ShouldBeNil)
})
})

Convey("Stop()", t, func() {
Convey("returns nil if plugin successfully stopped", func() {
r := newRunner(&routing.RoundRobinStrategy{})
key, err := rsa.GenerateKey(rand.Reader, 2048)
So(err, ShouldBeNil)
r := newRunner(&routing.RoundRobinStrategy{}, key)
a := plugin.Arg{
PluginLogPath: "/tmp/pulse-test-plugin-stop.log",
}
Expand Down Expand Up @@ -83,6 +89,8 @@ func TestAvailablePlugins(t *testing.T) {
})
})
Convey("it returns an error if client cannot be created", t, func() {
key, err := rsa.GenerateKey(rand.Reader, 2048)
So(err, ShouldBeNil)
resp := &plugin.Response{
Meta: plugin.PluginMeta{
Name: "test",
Expand All @@ -91,7 +99,7 @@ func TestAvailablePlugins(t *testing.T) {
Type: plugin.CollectorPluginType,
ListenAddress: "localhost:",
}
ap, err := newAvailablePlugin(resp, nil, nil)
ap, err := newAvailablePlugin(resp, key, nil, nil)
So(ap, ShouldBeNil)
So(err, ShouldNotBeNil)
})
Expand Down
20 changes: 14 additions & 6 deletions control/control.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package control

import (
"crypto/rand"
"crypto/rsa"
"errors"
"fmt"
Expand Down Expand Up @@ -46,8 +47,8 @@ type pluginControl struct {
Started bool

autodiscoverPaths []string
controlPrivKey *rsa.PrivateKey
controlPubKey *rsa.PublicKey
privKey *rsa.PrivateKey
pubKey *rsa.PublicKey
eventManager *gomit.EventController

pluginManager managesPlugins
Expand Down Expand Up @@ -116,7 +117,14 @@ func CacheExpiration(t time.Duration) controlOpt {
// New returns a new pluginControl instance
func New(opts ...controlOpt) *pluginControl {

c := &pluginControl{}
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
panic(err)
}
c := &pluginControl{
pubKey: &key.PublicKey,
privKey: key,
}
// Initialize components
//
// Event Manager
Expand All @@ -133,7 +141,7 @@ func New(opts ...controlOpt) *pluginControl {
}).Debug("metric catalog created")

// Plugin Manager
c.pluginManager = newPluginManager()
c.pluginManager = newPluginManager(c.pubKey, c.privKey)
controlLogger.WithFields(log.Fields{
"_block": "new",
}).Debug("plugin manager created")
Expand All @@ -148,7 +156,7 @@ func New(opts ...controlOpt) *pluginControl {

// Plugin Runner
// TODO (danielscottt): handle routing strat changes via events
c.pluginRunner = newRunner(&routing.RoundRobinStrategy{})
c.pluginRunner = newRunner(&routing.RoundRobinStrategy{}, c.privKey)
controlLogger.WithFields(log.Fields{
"_block": "new",
}).Debug("runner created")
Expand All @@ -161,7 +169,7 @@ func New(opts ...controlOpt) *pluginControl {
// Wire event manager

// Start stuff
err := c.pluginRunner.Start()
err = c.pluginRunner.Start()
if err != nil {
panic(err)
}
Expand Down
5 changes: 4 additions & 1 deletion control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package control

import (
"bytes"
"crypto/rand"
"crypto/rsa"
"encoding/gob"
"encoding/json"
"errors"
Expand Down Expand Up @@ -361,14 +363,15 @@ func TestStop(t *testing.T) {
}

func TestPluginCatalog(t *testing.T) {
key, _ := rsa.GenerateKey(rand.Reader, 2048)
ts := time.Now()

c := New()

// We need our own plugin manager to drop mock
// loaded plugins into. Aribitrarily adding
// plugins from the pm is no longer supported.
tpm := newPluginManager()
tpm := newPluginManager(&key.PublicKey, key)
c.pluginManager = tpm

lp1 := new(loadedPlugin)
Expand Down
4 changes: 4 additions & 0 deletions control/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (mp *mockPluginClient) Kill(r string) error {
return nil
}

func (mp *mockPluginClient) GetConfigPolicy() error {
return nil
}

func TestMonitor(t *testing.T) {
Convey("monitor", t, func() {
aps := newAvailablePlugins(&routing.RoundRobinStrategy{})
Expand Down
5 changes: 2 additions & 3 deletions control/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@ import (

// PluginClient A client providing common plugin method calls.
type PluginClient interface {
SetKey() error
Ping() error
Kill(string) error
GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
}

// PluginCollectorClient A client providing collector specific plugin method calls.
type PluginCollectorClient interface {
PluginClient
CollectMetrics([]core.Metric) ([]core.Metric, error)
GetMetricTypes() ([]core.Metric, error)
GetConfigPolicy() (cpolicy.ConfigPolicy, error)
}

// PluginProcessorClient A client providing processor specific plugin method calls.
type PluginProcessorClient interface {
PluginClient
Process(contentType string, content []byte, config map[string]ctypes.ConfigValue) (string, []byte, error)
GetConfigPolicy() (cpolicy.ConfigPolicy, error)
}

// PluginPublisherClient A client providing publishing specific plugin method calls.
type PluginPublisherClient interface {
PluginClient
Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error
GetConfigPolicy() (cpolicy.ConfigPolicy, error)
}
Loading

0 comments on commit 3e035d0

Please sign in to comment.