Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for GetItems to all backends. #2031

Merged
merged 1 commit into from
Jun 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions lib/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
type Backend interface {
// GetKeys returns a list of keys for a given path
GetKeys(bucket []string) ([]string, error)
// GetItems returns a list of items (key value pairs) for a bucket.
GetItems(bucket []string) ([]Item, error)
// CreateVal creates value with a given TTL and key in the bucket
// if the value already exists, it must return trace.AlreadyExistsError
CreateVal(bucket []string, key string, val []byte, ttl time.Duration) error
Expand Down Expand Up @@ -66,21 +68,14 @@ type Backend interface {
Clock() clockwork.Clock
}

// Item is a pair of key and value
// Item is a pair of key and value.
type Item struct {
// Key is an item key
// Key is an item key.
Key string
// Value is an item value
// Value is an item value.
Value []byte
}

// ItemsGetter is an interface that allows gettings all
// items in the bucket at once
type ItemsGetter interface {
// GetItems returns a list of items - key value pairs
GetItems(bucket []string) ([]Item, error)
}

// backend.Params type defines a flexible unified back-end configuration API.
// It is just a map of key/value pairs which gets populated by `storage` section
// in Teleport YAML config.
Expand Down
26 changes: 26 additions & 0 deletions lib/backend/boltbk/boltbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ func (b *BoltBackend) Close() error {
return b.db.Close()
}

// GetItems fetches keys and values and returns them to the caller.
func (b *BoltBackend) GetItems(path []string) ([]backend.Item, error) {
keys, err := b.GetKeys(path)
if err != nil {
return nil, trace.Wrap(err)
}

// This is a very inefficient approach. It's here to satisfy the
// backend.Backend interface since the Bolt backend is slated for removal
// in 2.7.0 anyway.
items := make([]backend.Item, 0, len(keys))
for _, e := range keys {
val, err := b.GetVal(path, e)
if err != nil {
continue
}

items = append(items, backend.Item{
Key: e,
Value: val,
})
}

return items, nil
}

func (b *BoltBackend) GetKeys(path []string) ([]string, error) {
keys, err := b.getKeys(path)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions lib/backend/boltbk/boltbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *BoltSuite) TestBasicCRUD(c *C) {
s.suite.BasicCRUD(c)
}

func (s *BoltSuite) TestBatchCRUD(c *C) {
s.suite.BatchCRUD(c)
}

func (s *BoltSuite) TestCompareAndSwap(c *C) {
s.suite.CompareAndSwap(c)
}
Expand Down
56 changes: 45 additions & 11 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,29 @@ func (b *bk) reconnect() error {
return nil
}

// GetItems fetches keys and values and returns them to the caller.
func (b *bk) GetItems(path []string) ([]backend.Item, error) {
items, err := b.getItems(b.key(path...))
if err != nil {
return nil, trace.Wrap(err)
}

return items, nil
}

// GetKeys fetches keys (and values) but only returns keys to the caller.
func (b *bk) GetKeys(path []string) ([]string, error) {
keys, err := b.getKeys(b.key(path...))
items, err := b.getItems(b.key(path...))
if err != nil {
return nil, trace.Wrap(err)
}
sort.Sort(sort.StringSlice(keys))

// Convert from []backend.Item to []string and return keys.
keys := make([]string, len(items))
for i, e := range items {
keys[i] = e.Key
}

return keys, nil
}

Expand Down Expand Up @@ -248,22 +265,39 @@ func (b *bk) ReleaseLock(token string) error {
return convertErr(err)
}

func (b *bk) getKeys(key string) ([]string, error) {
var vals []string
re, err := b.api.Get(context.Background(), key, nil)
err = convertErr(err)
if err != nil {
if trace.IsNotFound(err) {
// getItems fetches keys and values and returns them to the caller.
func (b *bk) getItems(path string) ([]backend.Item, error) {
var vals []backend.Item

re, err := b.api.Get(context.Background(), path, nil)
if er := convertErr(err); er != nil {
if trace.IsNotFound(er) {
return vals, nil
}
return nil, trace.Wrap(err)
return nil, trace.Wrap(er)
}
if !isDir(re.Node) {
return nil, trace.BadParameter("'%v': expected directory", key)
return nil, trace.BadParameter("'%v': expected directory", path)
}

// Convert etcd response of *client.Response to backend.Item.
for _, n := range re.Node.Nodes {
vals = append(vals, suffix(n.Key))
valueBytes, err := base64.StdEncoding.DecodeString(n.Value)
if err != nil {
return nil, trace.Wrap(err)
}

vals = append(vals, backend.Item{
Key: suffix(n.Key),
Value: valueBytes,
})
}

// Sort and return results.
sort.Slice(vals, func(i, j int) bool {
return vals[i].Key < vals[j].Key
})

return vals, nil
}

Expand Down
4 changes: 4 additions & 0 deletions lib/backend/etcdbk/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (s *EtcdSuite) TestBasicCRUD(c *C) {
s.suite.BasicCRUD(c)
}

func (s *EtcdSuite) TestBatchCRUD(c *C) {
s.suite.BatchCRUD(c)
}

func (s *EtcdSuite) TestCompareAndSwap(c *C) {
s.suite.CompareAndSwap(c)
}
Expand Down
9 changes: 9 additions & 0 deletions lib/backend/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func (s *Sanitizer) GetKeys(bucket []string) ([]string, error) {
return s.backend.GetKeys(bucket)
}

// GetItems returns a list of items (key value pairs) for a bucket.
func (s *Sanitizer) GetItems(bucket []string) ([]Item, error) {
if !isSliceSafe(bucket) {
return nil, trace.BadParameter(errorMessage)
}

return s.backend.GetItems(bucket)
}

// CreateVal creates value with a given TTL and key in the bucket. If the
// value already exists, returns trace.AlreadyExistsError.
func (s *Sanitizer) CreateVal(bucket []string, key string, val []byte, ttl time.Duration) error {
Expand Down
4 changes: 4 additions & 0 deletions lib/backend/sanitize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (n *nopBackend) GetKeys(bucket []string) ([]string, error) {
return []string{"foo"}, nil
}

func (n *nopBackend) GetItems(bucket []string) ([]Item, error) {
return []Item{Item{Key: "foo", Value: []byte("bar")}}, nil
}

func (n *nopBackend) CreateVal(bucket []string, key string, val []byte, ttl time.Duration) error {
return nil
}
Expand Down
9 changes: 2 additions & 7 deletions lib/backend/test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,12 @@ func (s *BackendSuite) CompareAndSwap(c *C) {
c.Assert(string(val), Equals, "2")
}

// BatchCRUD tests batch CRUD operations if supported by the backend
// BatchCRUD tests batch CRUD operations.
func (s *BackendSuite) BatchCRUD(c *C) {
getter, ok := s.B.(backend.ItemsGetter)
if !ok {
c.Skip("backend does not support batch get")
return
}
c.Assert(s.B.UpsertVal([]string{"a", "b"}, "bkey", []byte("val1"), 0), IsNil)
c.Assert(s.B.UpsertVal([]string{"a", "b"}, "akey", []byte("val2"), 0), IsNil)

items, err := getter.GetItems([]string{"a", "b"})
items, err := s.B.GetItems([]string{"a", "b"})
c.Assert(err, IsNil)
c.Assert(len(items), Equals, 2)
c.Assert(string(items[0].Value), Equals, "val2")
Expand Down
53 changes: 11 additions & 42 deletions lib/services/local/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,21 @@ import (
"github.com/gravitational/teleport/lib/services"

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)

// PresenceService records and reports the presence of all components
// of the cluster - Nodes, Proxies and SSH nodes
type PresenceService struct {
*log.Entry
log *logrus.Entry
backend.Backend
// getter is used to make batch requests to the backend.
getter backend.ItemsGetter
}

// NewPresenceService returns new presence service instance
func NewPresenceService(b backend.Backend) *PresenceService {
getter, _ := b.(backend.ItemsGetter)
return &PresenceService{
Entry: log.WithFields(log.Fields{trace.Component: "Presence"}),
log: logrus.WithFields(logrus.Fields{trace.Component: "Presence"}),
Backend: b,
getter: getter,
}
}

Expand Down Expand Up @@ -172,43 +168,17 @@ func (s *PresenceService) GetNodes(namespace string) ([]services.Server, error)
if namespace == "" {
return nil, trace.BadParameter("missing namespace value")
}
if s.getter != nil {
return s.batchGetNodes(namespace)
}
start := time.Now()
keys, err := s.GetKeys([]string{namespacesPrefix, namespace, nodesPrefix})
if err != nil {
return nil, trace.Wrap(err)
}
servers := make([]services.Server, 0, len(keys))
for _, key := range keys {
data, err := s.GetVal([]string{namespacesPrefix, namespace, nodesPrefix}, key)
if err != nil {
if trace.IsNotFound(err) {
continue
}
return nil, trace.Wrap(err)
}
server, err := services.GetServerMarshaler().UnmarshalServer(data, services.KindNode)
if err != nil {
return nil, trace.Wrap(err)
}
servers = append(servers, server)
}
s.Debugf("GetServers(%v) in %v", len(servers), time.Now().Sub(start))
// sorting helps with tests and makes it all deterministic
sort.Sort(services.SortedServers(servers))
return servers, nil
}

// batchGetNodes returns a list of registered servers by using fast batch get
func (s *PresenceService) batchGetNodes(namespace string) ([]services.Server, error) {
start := time.Now()

// Get all items in the bucket.
bucket := []string{namespacesPrefix, namespace, nodesPrefix}
items, err := s.getter.GetItems(bucket)
items, err := s.GetItems(bucket)
if err != nil {
return nil, trace.Wrap(err)
}

// Marshal values into a []services.Server slice.
servers := make([]services.Server, len(items))
for i, item := range items {
server, err := services.GetServerMarshaler().UnmarshalServer(item.Value, services.KindNode)
Expand All @@ -218,9 +188,8 @@ func (s *PresenceService) batchGetNodes(namespace string) ([]services.Server, er
servers[i] = server
}

s.Debugf("GetServers(%v) in %v", len(servers), time.Now().Sub(start))
// sorting helps with tests and makes it all deterministic
sort.Sort(services.SortedServers(servers))
s.log.Debugf("GetServers(%v) in %v", len(servers), time.Now().Sub(start))

return servers, nil
}

Expand Down Expand Up @@ -409,7 +378,7 @@ func (s *PresenceService) GetTunnelConnection(clusterName, connectionName string
}
conn, err := services.UnmarshalTunnelConnection(data)
if err != nil {
log.Debugf("got some problem with data: %q", string(data))
s.log.Debugf("got some problem with data: %q", string(data))
}
return conn, err
}
Expand Down
31 changes: 5 additions & 26 deletions lib/services/local/trust.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ import (
// is using local backend
type CA struct {
backend.Backend
// getter is used to make batch requests to the backend.
getter backend.ItemsGetter
}

// NewCAService returns new instance of CAService
func NewCAService(b backend.Backend) *CA {
getter, _ := b.(backend.ItemsGetter)
return &CA{
getter: getter,
Backend: b,
}
}
Expand Down Expand Up @@ -201,36 +197,18 @@ func setSigningKeys(ca services.CertAuthority, loadSigningKeys bool) {
// GetCertAuthorities returns a list of authorities of a given type
// loadSigningKeys controls whether signing keys should be loaded or not
func (s *CA) GetCertAuthorities(caType services.CertAuthType, loadSigningKeys bool) ([]services.CertAuthority, error) {
cas := []services.CertAuthority{}
if err := caType.Check(); err != nil {
return nil, trace.Wrap(err)
}
if s.getter != nil {
return s.batchGetCertAuthorities(caType, loadSigningKeys)
}
domains, err := s.GetKeys([]string{"authorities", string(caType)})
if err != nil {
if trace.IsNotFound(err) {
return cas, nil
}
return nil, trace.Wrap(err)
}
for _, domain := range domains {
ca, err := s.GetCertAuthority(services.CertAuthID{DomainName: domain, Type: caType}, loadSigningKeys)
if err != nil {
return nil, trace.Wrap(err)
}
cas = append(cas, ca)
}
return cas, nil
}

func (s *CA) batchGetCertAuthorities(caType services.CertAuthType, loadSigningKeys bool) ([]services.CertAuthority, error) {
// Get all items in the bucket.
bucket := []string{"authorities", string(caType)}
items, err := s.getter.GetItems(bucket)
items, err := s.GetItems(bucket)
if err != nil {
return nil, trace.Wrap(err)
}

// Marshal values into a []services.CertAuthority slice.
cas := make([]services.CertAuthority, len(items))
for i, item := range items {
ca, err := services.GetCertAuthorityMarshaler().UnmarshalCertAuthority(item.Value)
Expand All @@ -243,5 +221,6 @@ func (s *CA) batchGetCertAuthorities(caType services.CertAuthType, loadSigningKe
setSigningKeys(ca, loadSigningKeys)
cas[i] = ca
}

return cas, nil
}