Skip to content

Commit

Permalink
Add support for GetItems to all backends.
Browse files Browse the repository at this point in the history
  • Loading branch information
russjones committed Jun 21, 2018
1 parent f91faab commit 8d20382
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 96 deletions.
15 changes: 5 additions & 10 deletions lib/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
type Backend interface {
// GetKeys returns a list of keys for a given path
GetKeys(bucket []string) ([]string, error)
// GetItems returns a list of items (key value pairs) for a bucket.
GetItems(bucket []string) ([]Item, error)
// CreateVal creates value with a given TTL and key in the bucket
// if the value already exists, it must return trace.AlreadyExistsError
CreateVal(bucket []string, key string, val []byte, ttl time.Duration) error
Expand Down Expand Up @@ -66,21 +68,14 @@ type Backend interface {
Clock() clockwork.Clock
}

// Item is a pair of key and value
// Item is a pair of key and value.
type Item struct {
// Key is an item key
// Key is an item key.
Key string
// Value is an item value
// Value is an item value.
Value []byte
}

// ItemsGetter is an interface that allows gettings all
// items in the bucket at once
type ItemsGetter interface {
// GetItems returns a list of items - key value pairs
GetItems(bucket []string) ([]Item, error)
}

// backend.Params type defines a flexible unified back-end configuration API.
// It is just a map of key/value pairs which gets populated by `storage` section
// in Teleport YAML config.
Expand Down
26 changes: 26 additions & 0 deletions lib/backend/boltbk/boltbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ func (b *BoltBackend) Close() error {
return b.db.Close()
}

// GetItems fetches keys and values and returns them to the caller.
func (b *BoltBackend) GetItems(path []string) ([]backend.Item, error) {
keys, err := b.GetKeys(path)
if err != nil {
return nil, trace.Wrap(err)
}

// This is a very inefficient approach. It's here to satisfy the
// backend.Backend interface since the Bolt backend is slated for removal
// in 2.7.0 anyway.
items := make([]backend.Item, 0, len(keys))
for _, e := range keys {
val, err := b.GetVal(path, e)
if err != nil {
continue
}

items = append(items, backend.Item{
Key: e,
Value: val,
})
}

return items, nil
}

func (b *BoltBackend) GetKeys(path []string) ([]string, error) {
keys, err := b.getKeys(path)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions lib/backend/boltbk/boltbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *BoltSuite) TestBasicCRUD(c *C) {
s.suite.BasicCRUD(c)
}

func (s *BoltSuite) TestBatchCRUD(c *C) {
s.suite.BatchCRUD(c)
}

func (s *BoltSuite) TestCompareAndSwap(c *C) {
s.suite.CompareAndSwap(c)
}
Expand Down
56 changes: 45 additions & 11 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,29 @@ func (b *bk) reconnect() error {
return nil
}

// GetItems fetches keys and values and returns them to the caller.
func (b *bk) GetItems(path []string) ([]backend.Item, error) {
items, err := b.getItems(b.key(path...))
if err != nil {
return nil, trace.Wrap(err)
}

return items, nil
}

// GetKeys fetches keys (and values) but only returns keys to the caller.
func (b *bk) GetKeys(path []string) ([]string, error) {
keys, err := b.getKeys(b.key(path...))
items, err := b.getItems(b.key(path...))
if err != nil {
return nil, trace.Wrap(err)
}
sort.Sort(sort.StringSlice(keys))

// Convert from []backend.Item to []string and return keys.
keys := make([]string, len(items))
for i, e := range items {
keys[i] = e.Key
}

return keys, nil
}

Expand Down Expand Up @@ -248,22 +265,39 @@ func (b *bk) ReleaseLock(token string) error {
return convertErr(err)
}

func (b *bk) getKeys(key string) ([]string, error) {
var vals []string
re, err := b.api.Get(context.Background(), key, nil)
err = convertErr(err)
if err != nil {
if trace.IsNotFound(err) {
// getItems fetches keys and values and returns them to the caller.
func (b *bk) getItems(path string) ([]backend.Item, error) {
var vals []backend.Item

re, err := b.api.Get(context.Background(), path, nil)
if er := convertErr(err); er != nil {
if trace.IsNotFound(er) {
return vals, nil
}
return nil, trace.Wrap(err)
return nil, trace.Wrap(er)
}
if !isDir(re.Node) {
return nil, trace.BadParameter("'%v': expected directory", key)
return nil, trace.BadParameter("'%v': expected directory", path)
}

// Convert etcd response of *client.Response to backend.Item.
for _, n := range re.Node.Nodes {
vals = append(vals, suffix(n.Key))
valueBytes, err := base64.StdEncoding.DecodeString(n.Value)
if err != nil {
return nil, trace.Wrap(err)
}

vals = append(vals, backend.Item{
Key: suffix(n.Key),
Value: valueBytes,
})
}

// Sort and return results.
sort.Slice(vals, func(i, j int) bool {
return vals[i].Key < vals[j].Key
})

return vals, nil
}

Expand Down
4 changes: 4 additions & 0 deletions lib/backend/etcdbk/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (s *EtcdSuite) TestBasicCRUD(c *C) {
s.suite.BasicCRUD(c)
}

func (s *EtcdSuite) TestBatchCRUD(c *C) {
s.suite.BatchCRUD(c)
}

func (s *EtcdSuite) TestCompareAndSwap(c *C) {
s.suite.CompareAndSwap(c)
}
Expand Down
9 changes: 9 additions & 0 deletions lib/backend/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func (s *Sanitizer) GetKeys(bucket []string) ([]string, error) {
return s.backend.GetKeys(bucket)
}

// GetItems returns a list of items (key value pairs) for a bucket.
func (s *Sanitizer) GetItems(bucket []string) ([]Item, error) {
if !isSliceSafe(bucket) {
return nil, trace.BadParameter(errorMessage)
}

return s.backend.GetItems(bucket)
}

// CreateVal creates value with a given TTL and key in the bucket. If the
// value already exists, returns trace.AlreadyExistsError.
func (s *Sanitizer) CreateVal(bucket []string, key string, val []byte, ttl time.Duration) error {
Expand Down
4 changes: 4 additions & 0 deletions lib/backend/sanitize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (n *nopBackend) GetKeys(bucket []string) ([]string, error) {
return []string{"foo"}, nil
}

func (n *nopBackend) GetItems(bucket []string) ([]Item, error) {
return []Item{Item{Key: "foo", Value: []byte("bar")}}, nil
}

func (n *nopBackend) CreateVal(bucket []string, key string, val []byte, ttl time.Duration) error {
return nil
}
Expand Down
9 changes: 2 additions & 7 deletions lib/backend/test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,12 @@ func (s *BackendSuite) CompareAndSwap(c *C) {
c.Assert(string(val), Equals, "2")
}

// BatchCRUD tests batch CRUD operations if supported by the backend
// BatchCRUD tests batch CRUD operations.
func (s *BackendSuite) BatchCRUD(c *C) {
getter, ok := s.B.(backend.ItemsGetter)
if !ok {
c.Skip("backend does not support batch get")
return
}
c.Assert(s.B.UpsertVal([]string{"a", "b"}, "bkey", []byte("val1"), 0), IsNil)
c.Assert(s.B.UpsertVal([]string{"a", "b"}, "akey", []byte("val2"), 0), IsNil)

items, err := getter.GetItems([]string{"a", "b"})
items, err := s.B.GetItems([]string{"a", "b"})
c.Assert(err, IsNil)
c.Assert(len(items), Equals, 2)
c.Assert(string(items[0].Value), Equals, "val2")
Expand Down
53 changes: 11 additions & 42 deletions lib/services/local/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,21 @@ import (
"github.com/gravitational/teleport/lib/services"

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)

// PresenceService records and reports the presence of all components
// of the cluster - Nodes, Proxies and SSH nodes
type PresenceService struct {
*log.Entry
log *logrus.Entry
backend.Backend
// getter is used to make batch requests to the backend.
getter backend.ItemsGetter
}

// NewPresenceService returns new presence service instance
func NewPresenceService(b backend.Backend) *PresenceService {
getter, _ := b.(backend.ItemsGetter)
return &PresenceService{
Entry: log.WithFields(log.Fields{trace.Component: "Presence"}),
log: logrus.WithFields(logrus.Fields{trace.Component: "Presence"}),
Backend: b,
getter: getter,
}
}

Expand Down Expand Up @@ -172,43 +168,17 @@ func (s *PresenceService) GetNodes(namespace string) ([]services.Server, error)
if namespace == "" {
return nil, trace.BadParameter("missing namespace value")
}
if s.getter != nil {
return s.batchGetNodes(namespace)
}
start := time.Now()
keys, err := s.GetKeys([]string{namespacesPrefix, namespace, nodesPrefix})
if err != nil {
return nil, trace.Wrap(err)
}
servers := make([]services.Server, 0, len(keys))
for _, key := range keys {
data, err := s.GetVal([]string{namespacesPrefix, namespace, nodesPrefix}, key)
if err != nil {
if trace.IsNotFound(err) {
continue
}
return nil, trace.Wrap(err)
}
server, err := services.GetServerMarshaler().UnmarshalServer(data, services.KindNode)
if err != nil {
return nil, trace.Wrap(err)
}
servers = append(servers, server)
}
s.Debugf("GetServers(%v) in %v", len(servers), time.Now().Sub(start))
// sorting helps with tests and makes it all deterministic
sort.Sort(services.SortedServers(servers))
return servers, nil
}

// batchGetNodes returns a list of registered servers by using fast batch get
func (s *PresenceService) batchGetNodes(namespace string) ([]services.Server, error) {
start := time.Now()

// Get all items in the bucket.
bucket := []string{namespacesPrefix, namespace, nodesPrefix}
items, err := s.getter.GetItems(bucket)
items, err := s.GetItems(bucket)
if err != nil {
return nil, trace.Wrap(err)
}

// Marshal values into a []services.Server slice.
servers := make([]services.Server, len(items))
for i, item := range items {
server, err := services.GetServerMarshaler().UnmarshalServer(item.Value, services.KindNode)
Expand All @@ -218,9 +188,8 @@ func (s *PresenceService) batchGetNodes(namespace string) ([]services.Server, er
servers[i] = server
}

s.Debugf("GetServers(%v) in %v", len(servers), time.Now().Sub(start))
// sorting helps with tests and makes it all deterministic
sort.Sort(services.SortedServers(servers))
s.log.Debugf("GetServers(%v) in %v", len(servers), time.Now().Sub(start))

return servers, nil
}

Expand Down Expand Up @@ -409,7 +378,7 @@ func (s *PresenceService) GetTunnelConnection(clusterName, connectionName string
}
conn, err := services.UnmarshalTunnelConnection(data)
if err != nil {
log.Debugf("got some problem with data: %q", string(data))
s.log.Debugf("got some problem with data: %q", string(data))
}
return conn, err
}
Expand Down
31 changes: 5 additions & 26 deletions lib/services/local/trust.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ import (
// is using local backend
type CA struct {
backend.Backend
// getter is used to make batch requests to the backend.
getter backend.ItemsGetter
}

// NewCAService returns new instance of CAService
func NewCAService(b backend.Backend) *CA {
getter, _ := b.(backend.ItemsGetter)
return &CA{
getter: getter,
Backend: b,
}
}
Expand Down Expand Up @@ -201,36 +197,18 @@ func setSigningKeys(ca services.CertAuthority, loadSigningKeys bool) {
// GetCertAuthorities returns a list of authorities of a given type
// loadSigningKeys controls whether signing keys should be loaded or not
func (s *CA) GetCertAuthorities(caType services.CertAuthType, loadSigningKeys bool) ([]services.CertAuthority, error) {
cas := []services.CertAuthority{}
if err := caType.Check(); err != nil {
return nil, trace.Wrap(err)
}
if s.getter != nil {
return s.batchGetCertAuthorities(caType, loadSigningKeys)
}
domains, err := s.GetKeys([]string{"authorities", string(caType)})
if err != nil {
if trace.IsNotFound(err) {
return cas, nil
}
return nil, trace.Wrap(err)
}
for _, domain := range domains {
ca, err := s.GetCertAuthority(services.CertAuthID{DomainName: domain, Type: caType}, loadSigningKeys)
if err != nil {
return nil, trace.Wrap(err)
}
cas = append(cas, ca)
}
return cas, nil
}

func (s *CA) batchGetCertAuthorities(caType services.CertAuthType, loadSigningKeys bool) ([]services.CertAuthority, error) {
// Get all items in the bucket.
bucket := []string{"authorities", string(caType)}
items, err := s.getter.GetItems(bucket)
items, err := s.GetItems(bucket)
if err != nil {
return nil, trace.Wrap(err)
}

// Marshal values into a []services.CertAuthority slice.
cas := make([]services.CertAuthority, len(items))
for i, item := range items {
ca, err := services.GetCertAuthorityMarshaler().UnmarshalCertAuthority(item.Value)
Expand All @@ -243,5 +221,6 @@ func (s *CA) batchGetCertAuthorities(caType services.CertAuthType, loadSigningKe
setSigningKeys(ca, loadSigningKeys)
cas[i] = ca
}

return cas, nil
}

0 comments on commit 8d20382

Please sign in to comment.