Skip to content

Commit

Permalink
[3.4] backport 12669: Implement Endpoint Watch and new Resolver
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Oct 19, 2023
1 parent 95321a4 commit 9b18c4f
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 103 deletions.
4 changes: 2 additions & 2 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints
Expand Down Expand Up @@ -84,4 +84,4 @@ em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
```
34 changes: 27 additions & 7 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import (
"time"

"github.com/google/uuid"
"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
)

var (
Expand Down Expand Up @@ -95,7 +96,8 @@ type Client struct {

callOpts []grpc.CallOption

lg *zap.Logger
lgMu *sync.RWMutex
lg *zap.Logger
}

// New creates a new etcdv3 client from a given configuration.
Expand Down Expand Up @@ -125,6 +127,23 @@ func NewFromURLs(urls []string) (*Client, error) {
return New(Config{Endpoints: urls})
}

// WithLogger sets a logger
func (c *Client) WithLogger(lg *zap.Logger) *Client {
c.lgMu.Lock()
c.lg = lg
c.lgMu.Unlock()
return c
}

// GetLogger gets the logger.
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
func (c *Client) GetLogger() *zap.Logger {
c.lgMu.RLock()
l := c.lg
c.lgMu.RUnlock()
return l
}

// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
Expand Down Expand Up @@ -423,6 +442,7 @@ func newClient(cfg *Config) (*Client, error) {
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}

lcfg := logutil.DefaultZapLoggerConfig
Expand Down
20 changes: 8 additions & 12 deletions clientv3/integration/naming/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
)

func TestEndpointManager(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Expand All @@ -54,10 +52,10 @@ func TestEndpointManager(t *testing.T) {
us := <-w

if us == nil {
t.Fatal("failed to get update", err)
t.Fatal("failed to get update")
}

wu := endpoints.Update{
wu := &endpoints.Update{
Op: endpoints.Add,
Key: "foo/a1",
Endpoint: e1,
Expand All @@ -69,30 +67,28 @@ func TestEndpointManager(t *testing.T) {

err = em.DeleteEndpoint(context.TODO(), "foo/a1")
if err != nil {
t.Fatalf("failed to udpate %v", err)
t.Fatalf("failed to update %v", err)
}

us = <-w
if err != nil {
t.Fatalf("failed to get udpate %v", err)
if us == nil {
t.Fatal("failed to get update")
}

wu = endpoints.Update{
wu = &endpoints.Update{
Op: endpoints.Delete,
Key: "foo/a1",
}

if !reflect.DeepEqual(us, wu) {
t.Fatalf("up = %#v, want %#v", us[1], wu)
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
}

// TestEndpointManagerAtomicity ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestEndpointManagerAtomicity(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Expand Down
11 changes: 6 additions & 5 deletions clientv3/integration/naming/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"

etcdnaming "go.etcd.io/etcd/clientv3/naming"
"go.etcd.io/etcd/clientv3/naming/endpoints"
"go.etcd.io/etcd/clientv3/naming/resolver"
"go.etcd.io/etcd/integration"
grpctest "go.etcd.io/etcd/pkg/grpc_testing"
"go.etcd.io/etcd/pkg/testutil"
Expand Down Expand Up @@ -63,10 +63,12 @@ func TestEtcdGrpcResolver(t *testing.T) {
if err != nil {
t.Fatal("failed to add foo", err)
}
r := &etcdnaming.GRPCResolver{Client: clus.Client(1)}
b := grpc.RoundRobin(r)

conn, err := grpc.Dial("foo", grpc.WithInsecure(), grpc.WithBalancer(b))
b, err := resolver.NewBuilder(clus.Client(1))
if err != nil {
t.Fatal("failed to new resolver builder", err)
}
conn, err := grpc.Dial("etcd:///foo", grpc.WithResolvers(b))
if err != nil {
t.Fatal("failed to connect to foo", err)
}
Expand Down Expand Up @@ -106,7 +108,6 @@ func TestEtcdGrpcResolver(t *testing.T) {
}
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
}

break
}
}
Expand Down
2 changes: 1 addition & 1 deletion clientv3/naming/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Update struct {
}

// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
type WatchChannel <-chan []*Update

// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
Expand Down
145 changes: 77 additions & 68 deletions clientv3/naming/endpoints/endpoints_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -89,73 +89,82 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
}

func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")

// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}

lg := m.client.GetLogger()
initUpdates := make([]*Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var iup internal.Update
if err := json.Unmarshal(kv.Value, &iup); err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
continue
}
up := &Update{
Op: Add,
Key: string(kv.Key),
Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
}
initUpdates = append(initUpdates, up)
}

upch := make(chan []*Update, 1)
if len(initUpdates) > 0 {
upch <- initUpdates
}
go m.watch(ctx, resp.Header.Revision+1, upch)
return upch, nil
}

func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
defer close(upch)

lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...)
for {
select {
case <-ctx.Done():
return
case wresp, ok := <-wch:
if !ok {
lg.Warn("watch closed", zap.String("target", m.target))
return
}
if wresp.Err() != nil {
lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
return
}

deltaUps := make([]*Update, 0, len(wresp.Events))
for _, e := range wresp.Events {
var iup internal.Update
var err error
var op Operation
switch e.Type {
case clientv3.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &iup)
op = Add
if err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
continue
}
case clientv3.EventTypeDelete:
iup = internal.Update{Op: internal.Delete}
op = Delete
default:
continue
}
up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
deltaUps = append(deltaUps, up)
}
if len(deltaUps) > 0 {
upch <- deltaUps
}
}
}
}

func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
Expand Down
Loading

0 comments on commit 9b18c4f

Please sign in to comment.