diff --git a/Documentation/dev-guide/grpc_naming.md b/Documentation/dev-guide/grpc_naming.md index 6038b4c0b81..c75ec5315e4 100644 --- a/Documentation/dev-guide/grpc_naming.md +++ b/Documentation/dev-guide/grpc_naming.md @@ -20,7 +20,7 @@ import ( cli, cerr := clientv3.NewFromURL("http://localhost:2379") etcdResolver, err := resolver.NewBuilder(clus.RandClient()); -conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver)) +conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver)) ``` ## Managing service endpoints @@ -86,4 +86,4 @@ em := endpoints.NewManager(c, "foo") err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{ endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}), endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})}) -``` \ No newline at end of file +``` diff --git a/client/v3/client.go b/client/v3/client.go index 9a037d7b44a..0db6302c5d8 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -95,7 +95,8 @@ type Client struct { callOpts []grpc.CallOption - lg *zap.Logger + lgMu *sync.RWMutex + lg *zap.Logger } // New creates a new etcdv3 client from a given configuration. @@ -112,7 +113,7 @@ func New(cfg Config) (*Client, error) { // service interface implementations and do not need connection management. func NewCtxClient(ctx context.Context) *Client { cctx, cancel := context.WithCancel(ctx) - return &Client{ctx: cctx, cancel: cancel, lg: zap.NewNop()} + return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()} } // NewFromURL creates a new etcdv3 client from a URL. @@ -127,10 +128,21 @@ func NewFromURLs(urls []string) (*Client, error) { // WithLogger sets a logger func (c *Client) WithLogger(lg *zap.Logger) *Client { + c.lgMu.Lock() c.lg = lg + c.lgMu.Unlock() return c } +// GetLogger gets the logger. +// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger. +func (c *Client) GetLogger() *zap.Logger { + c.lgMu.RLock() + l := c.lg + c.lgMu.RUnlock() + return l +} + // Close shuts down the client's etcd connections. func (c *Client) Close() error { c.cancel() @@ -382,6 +394,7 @@ func newClient(cfg *Config) (*Client, error) { cancel: cancel, mu: new(sync.RWMutex), callOpts: defaultCallOpts, + lgMu: new(sync.RWMutex), } lcfg := logutil.DefaultZapLoggerConfig diff --git a/client/v3/naming/endpoints/endpoints.go b/client/v3/naming/endpoints/endpoints.go index 329117af964..72bd2278749 100644 --- a/client/v3/naming/endpoints/endpoints.go +++ b/client/v3/naming/endpoints/endpoints.go @@ -39,7 +39,7 @@ type Update struct { } // WatchChannel is used to deliver notifications about endpoints updates. -type WatchChannel chan []*Update +type WatchChannel <-chan []*Update // Key2EndpointMap maps etcd key into struct describing the endpoint. type Key2EndpointMap map[string]Endpoint diff --git a/client/v3/naming/endpoints/endpoints_impl.go b/client/v3/naming/endpoints/endpoints_impl.go index 0fb848fa285..61abb832e8e 100644 --- a/client/v3/naming/endpoints/endpoints_impl.go +++ b/client/v3/naming/endpoints/endpoints_impl.go @@ -6,12 +6,12 @@ import ( "context" "encoding/json" "errors" - "fmt" "strings" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints/internal" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -78,73 +78,82 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts . } func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { - return nil, fmt.Errorf("Not implemented yet") - - // TODO: Implementation to be inspired by: - // Next gets the next set of updates from the etcd resolver. - //// Calls to Next should be serialized; concurrent calls are not safe since - //// there is no way to reconcile the update ordering. - //func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { - // if gw.wch == nil { - // // first Next() returns all addresses - // return gw.firstNext() - // } - // if gw.err != nil { - // return nil, gw.err - // } - // - // // process new events on target/* - // wr, ok := <-gw.wch - // if !ok { - // gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) - // return nil, gw.err - // } - // if gw.err = wr.Err(); gw.err != nil { - // return nil, gw.err - // } - // - // updates := make([]*naming.Update, 0, len(wr.Events)) - // for _, e := range wr.Events { - // var jupdate naming.Update - // var err error - // switch e.Type { - // case etcd.EventTypePut: - // err = json.Unmarshal(e.Kv.Value, &jupdate) - // jupdate.Op = naming.Add - // case etcd.EventTypeDelete: - // err = json.Unmarshal(e.PrevKv.Value, &jupdate) - // jupdate.Op = naming.Delete - // default: - // continue - // } - // if err == nil { - // updates = append(updates, &jupdate) - // } - // } - // return updates, nil - //} - // - //func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { - // // Use serialized request so resolution still works if the target etcd - // // server is partitioned away from the quorum. - // resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) - // if gw.err = err; err != nil { - // return nil, err - // } - // - // updates := make([]*naming.Update, 0, len(resp.Kvs)) - // for _, kv := range resp.Kvs { - // var jupdate naming.Update - // if err := json.Unmarshal(kv.Value, &jupdate); err != nil { - // continue - // } - // updates = append(updates, &jupdate) - // } - // - // opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} - // gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) - // return updates, nil - //} + resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + + lg := m.client.GetLogger() + initUpdates := make([]*Update, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + var iup internal.Update + if err := json.Unmarshal(kv.Value, &iup); err != nil { + lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err)) + continue + } + up := &Update{ + Op: Add, + Key: string(kv.Key), + Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}, + } + initUpdates = append(initUpdates, up) + } + + upch := make(chan []*Update, 1) + if len(initUpdates) > 0 { + upch <- initUpdates + } + go m.watch(ctx, resp.Header.Revision+1, upch) + return upch, nil +} + +func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) { + defer close(upch) + + lg := m.client.GetLogger() + opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()} + wch := m.client.Watch(ctx, m.target, opts...) + for { + select { + case <-ctx.Done(): + return + case wresp, ok := <-wch: + if !ok { + lg.Warn("watch closed", zap.String("target", m.target)) + return + } + if wresp.Err() != nil { + lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err())) + return + } + + deltaUps := make([]*Update, 0, len(wresp.Events)) + for _, e := range wresp.Events { + var iup internal.Update + var err error + var op Operation + switch e.Type { + case clientv3.EventTypePut: + err = json.Unmarshal(e.Kv.Value, &iup) + op = Add + if err != nil { + lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err)) + continue + } + case clientv3.EventTypeDelete: + iup = internal.Update{Op: internal.Delete} + op = Delete + default: + continue + } + up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}} + deltaUps = append(deltaUps, up) + } + if len(deltaUps) > 0 { + upch <- deltaUps + } + } + } } func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { diff --git a/client/v3/naming/resolver/resolver.go b/client/v3/naming/resolver/resolver.go index 0c453c22e89..a44f61ae04c 100644 --- a/client/v3/naming/resolver/resolver.go +++ b/client/v3/naming/resolver/resolver.go @@ -1,24 +1,107 @@ package resolver import ( + "context" + "sync" + clientv3 "go.etcd.io/etcd/client/v3" - "google.golang.org/grpc/resolver" + "go.etcd.io/etcd/client/v3/naming/endpoints" + + "google.golang.org/grpc/codes" + gresolver "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" ) type builder struct { - // ... + c *clientv3.Client } -func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - // To be implemented... - // Using endpoints.NewWatcher() to subscribe for endpoints changes. - return nil, nil +func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) { + r := &resolver{ + c: b.c, + target: target.Endpoint, + cc: cc, + } + r.ctx, r.cancel = context.WithCancel(context.Background()) + + em, err := endpoints.NewManager(r.c, r.target) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "resolver: failed to new endpoint manager: %s", err) + } + r.wch, err = em.NewWatchChannel(r.ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err) + } + + r.wg.Add(1) + go r.watch() + return r, nil } func (b builder) Scheme() string { return "etcd" } -func NewBuilder(client *clientv3.Client) (resolver.Builder, error) { - return builder{}, nil +// NewBuilder creates a resolver builder. +func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) { + return builder{c: client}, nil +} + +type resolver struct { + c *clientv3.Client + target string + cc gresolver.ClientConn + wch endpoints.WatchChannel + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func (r *resolver) watch() { + defer r.wg.Done() + + allUps := make(map[string]*endpoints.Update) + for { + select { + case <-r.ctx.Done(): + return + case ups, ok := <-r.wch: + if !ok { + return + } + + for _, up := range ups { + switch up.Op { + case endpoints.Add: + allUps[up.Key] = up + case endpoints.Delete: + delete(allUps, up.Key) + } + } + + addrs := convertToGRPCAddress(allUps) + r.cc.UpdateState(gresolver.State{Addresses: addrs}) + } + } +} + +func convertToGRPCAddress(ups map[string]*endpoints.Update) []gresolver.Address { + var addrs []gresolver.Address + for _, up := range ups { + addr := gresolver.Address{ + Addr: up.Endpoint.Addr, + Metadata: up.Endpoint.Metadata, + } + addrs = append(addrs, addr) + } + return addrs +} + +// ResolveNow is a no-op here. +// It's just a hint, resolver can ignore this if it's not necessary. +func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {} + +func (r *resolver) Close() { + r.cancel() + r.wg.Wait() } diff --git a/tests/integration/clientv3/naming/endpoints_test.go b/tests/integration/clientv3/naming/endpoints_test.go index 606d0e53734..19a1a27c09e 100644 --- a/tests/integration/clientv3/naming/endpoints_test.go +++ b/tests/integration/clientv3/naming/endpoints_test.go @@ -27,8 +27,6 @@ import ( ) func TestEndpointManager(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) @@ -54,10 +52,10 @@ func TestEndpointManager(t *testing.T) { us := <-w if us == nil { - t.Fatal("failed to get update", err) + t.Fatal("failed to get update") } - wu := endpoints.Update{ + wu := &endpoints.Update{ Op: endpoints.Add, Key: "foo/a1", Endpoint: e1, @@ -73,16 +71,16 @@ func TestEndpointManager(t *testing.T) { } us = <-w - if err != nil { - t.Fatalf("failed to get udpate %v", err) + if us == nil { + t.Fatal("failed to get udpate") } - wu = endpoints.Update{ + wu = &endpoints.Update{ Op: endpoints.Delete, Key: "foo/a1", } - if !reflect.DeepEqual(us, wu) { + if !reflect.DeepEqual(us[0], wu) { t.Fatalf("up = %#v, want %#v", us[1], wu) } } @@ -91,8 +89,6 @@ func TestEndpointManager(t *testing.T) { // correctly with multiple hosts and correctly receive multiple // updates in a single revision. func TestEndpointManagerAtomicity(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) diff --git a/tests/integration/clientv3/naming/resolver_test.go b/tests/integration/clientv3/naming/resolver_test.go index 5081ba7c21a..1209d535627 100644 --- a/tests/integration/clientv3/naming/resolver_test.go +++ b/tests/integration/clientv3/naming/resolver_test.go @@ -20,8 +20,8 @@ import ( "testing" "time" - etcdnaming "go.etcd.io/etcd/client/v3/naming" "go.etcd.io/etcd/client/v3/naming/endpoints" + "go.etcd.io/etcd/client/v3/naming/resolver" grpctest "go.etcd.io/etcd/pkg/v3/grpc_testing" "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/integration" @@ -65,10 +65,12 @@ func TestEtcdGrpcResolver(t *testing.T) { t.Fatal("failed to add foo", err) } - r := &etcdnaming.GRPCResolver{Client: clus.Client(1)} - b := grpc.RoundRobin(r) + b, err := resolver.NewBuilder(clus.Client(1)) + if err != nil { + t.Fatal("failed to new resolver builder", err) + } - conn, err := grpc.Dial("foo", grpc.WithInsecure(), grpc.WithBalancer(b)) + conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b)) if err != nil { t.Fatal("failed to connect to foo", err) } @@ -108,7 +110,6 @@ func TestEtcdGrpcResolver(t *testing.T) { } t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody()) } - break } }