Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement endpoint watch and resolver #12669

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints
Expand Down Expand Up @@ -86,4 +86,4 @@ em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
```
17 changes: 15 additions & 2 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type Client struct {

callOpts []grpc.CallOption

lg *zap.Logger
lgMu *sync.RWMutex
lg *zap.Logger
}

// New creates a new etcdv3 client from a given configuration.
Expand All @@ -112,7 +113,7 @@ func New(cfg Config) (*Client, error) {
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel, lg: zap.NewNop()}
return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chaochn47 It seems that you missed this change in the second commit in #16800?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for capturing that!

Zero value of RWMutex should be the same new(sync.RWMutext) though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero value of RWMutex should be the same new(sync.RWMutext) though.

It isn't true. zero value for a pointer is nil, while the result of new(sync.RWMutext) isn't nil.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

}

// NewFromURL creates a new etcdv3 client from a URL.
Expand All @@ -127,10 +128,21 @@ func NewFromURLs(urls []string) (*Client, error) {

// WithLogger sets a logger
func (c *Client) WithLogger(lg *zap.Logger) *Client {
c.lgMu.Lock()
c.lg = lg
c.lgMu.Unlock()
return c
}

// GetLogger gets the logger.
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
func (c *Client) GetLogger() *zap.Logger {
c.lgMu.RLock()
l := c.lg
c.lgMu.RUnlock()
return l
}

// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
Expand Down Expand Up @@ -382,6 +394,7 @@ func newClient(cfg *Config) (*Client, error) {
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}

lcfg := logutil.DefaultZapLoggerConfig
Expand Down
2 changes: 1 addition & 1 deletion client/v3/naming/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Update struct {
}

// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
type WatchChannel <-chan []*Update

// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
Expand Down
145 changes: 77 additions & 68 deletions client/v3/naming/endpoints/endpoints_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints/internal"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -78,73 +78,82 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
}

func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")

// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}

lg := m.client.GetLogger()
initUpdates := make([]*Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var iup internal.Update
if err := json.Unmarshal(kv.Value, &iup); err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth logging the error at least (as WARNING with the 'corrupted' key)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the logger(*zap.Logger) be created internally or passed from outside?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question.

  1. I assumed clientv3.lg, but this one is private.
  2. logger.go is practically dead - I will prepare PR to delete this one.

So there are 2 options:

  1. Expose the public c.Logger() getter and use it.
  2. Expand signature of NewManager to take the logger parameter.

@gyuho @jingyih @jpbetz -> do you have opinion here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's go for 1. It's in scope of the same 'library' and it's hard to imagine use-case when user want's to have separate logger for endpoint resolution.

Please comment that this method is for internal use of etcd-client library and should not be used as general-purpose logger.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
up := &Update{
Op: Add,
Key: string(kv.Key),
Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
}
initUpdates = append(initUpdates, up)
}

upch := make(chan []*Update, 1)
if len(initUpdates) > 0 {
upch <- initUpdates
}
go m.watch(ctx, resp.Header.Revision+1, upch)
return upch, nil
}

func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
defer close(upch)

lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...)
for {
select {
case <-ctx.Done():
return
case wresp, ok := <-wch:
if !ok {
lg.Warn("watch closed", zap.String("target", m.target))
return
}
if wresp.Err() != nil {
lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
return
}

deltaUps := make([]*Update, 0, len(wresp.Events))
for _, e := range wresp.Events {
var iup internal.Update
var err error
var op Operation
switch e.Type {
case clientv3.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &iup)
op = Add
if err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
continue
}
case clientv3.EventTypeDelete:
iup = internal.Update{Op: internal.Delete}
op = Delete
default:
continue
}
up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
deltaUps = append(deltaUps, up)
}
if len(deltaUps) > 0 {
upch <- deltaUps
}
}
}
}

func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
Expand Down
99 changes: 91 additions & 8 deletions client/v3/naming/resolver/resolver.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,107 @@
package resolver

import (
"context"
"sync"

clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"go.etcd.io/etcd/client/v3/naming/endpoints"

"google.golang.org/grpc/codes"
gresolver "google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

type builder struct {
// ...
c *clientv3.Client
}

func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// To be implemented...
// Using endpoints.NewWatcher() to subscribe for endpoints changes.
return nil, nil
func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
r := &resolver{
c: b.c,
target: target.Endpoint,
cc: cc,
}
r.ctx, r.cancel = context.WithCancel(context.Background())

em, err := endpoints.NewManager(r.c, r.target)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "resolver: failed to new endpoint manager: %s", err)
}
r.wch, err = em.NewWatchChannel(r.ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err)
}

r.wg.Add(1)
go r.watch()
return r, nil
}

func (b builder) Scheme() string {
return "etcd"
}

func NewBuilder(client *clientv3.Client) (resolver.Builder, error) {
return builder{}, nil
// NewBuilder creates a resolver builder.
func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) {
return builder{c: client}, nil
}

type resolver struct {
c *clientv3.Client
target string
cc gresolver.ClientConn
wch endpoints.WatchChannel
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func (r *resolver) watch() {
defer r.wg.Done()

allUps := make(map[string]*endpoints.Update)
for {
select {
case <-r.ctx.Done():
return
case ups, ok := <-r.wch:
if !ok {
return
}

for _, up := range ups {
switch up.Op {
case endpoints.Add:
allUps[up.Key] = up
case endpoints.Delete:
delete(allUps, up.Key)
}
}

addrs := convertToGRPCAddress(allUps)
r.cc.UpdateState(gresolver.State{Addresses: addrs})
}
}
}

func convertToGRPCAddress(ups map[string]*endpoints.Update) []gresolver.Address {
var addrs []gresolver.Address
for _, up := range ups {
addr := gresolver.Address{
Addr: up.Endpoint.Addr,
Metadata: up.Endpoint.Metadata,
}
addrs = append(addrs, addr)
}
return addrs
}

// ResolveNow is a no-op here.
// It's just a hint, resolver can ignore this if it's not necessary.
func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {}
ptabor marked this conversation as resolved.
Show resolved Hide resolved

func (r *resolver) Close() {
r.cancel()
r.wg.Wait()
}
Loading