diff --git a/clientv3/naming/endpoints/endpoints_impl.go b/clientv3/naming/endpoints/endpoints_impl.go index 3634027cd7b9..ffba6a925d56 100644 --- a/clientv3/naming/endpoints/endpoints_impl.go +++ b/clientv3/naming/endpoints/endpoints_impl.go @@ -55,7 +55,7 @@ func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) ops := make([]clientv3.Op, 0, len(updates)) for _, update := range updates { if !strings.HasPrefix(update.Key, m.target+"/") { - return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target) + return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key) } switch update.Op { case Add: diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go deleted file mode 100644 index 7eed84bfb187..000000000000 --- a/clientv3/naming/grpc.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package naming - -import ( - "context" - "encoding/json" - "fmt" - - etcd "go.etcd.io/etcd/clientv3" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/naming" - "google.golang.org/grpc/status" -) - -var ErrWatcherClosed = fmt.Errorf("naming: watch closed") - -// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. -type GRPCResolver struct { - // Client is an initialized etcd client. - Client *etcd.Client -} - -func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) { - switch nm.Op { - case naming.Add: - var v []byte - if v, err = json.Marshal(nm); err != nil { - return status.Error(codes.InvalidArgument, err.Error()) - } - _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - case naming.Delete: - _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) - default: - return status.Error(codes.InvalidArgument, "naming: bad naming op") - } - return err -} - -func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { - ctx, cancel := context.WithCancel(context.Background()) - w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} - return w, nil -} - -type gRPCWatcher struct { - c *etcd.Client - target string - ctx context.Context - cancel context.CancelFunc - wch etcd.WatchChan - err error -} - -// Next gets the next set of updates from the etcd resolver. -// Calls to Next should be serialized; concurrent calls are not safe since -// there is no way to reconcile the update ordering. -func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { - if gw.wch == nil { - // first Next() returns all addresses - return gw.firstNext() - } - if gw.err != nil { - return nil, gw.err - } - - // process new events on target/* - wr, ok := <-gw.wch - if !ok { - gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) - return nil, gw.err - } - if gw.err = wr.Err(); gw.err != nil { - return nil, gw.err - } - - updates := make([]*naming.Update, 0, len(wr.Events)) - for _, e := range wr.Events { - var jupdate naming.Update - var err error - switch e.Type { - case etcd.EventTypePut: - err = json.Unmarshal(e.Kv.Value, &jupdate) - jupdate.Op = naming.Add - case etcd.EventTypeDelete: - err = json.Unmarshal(e.PrevKv.Value, &jupdate) - jupdate.Op = naming.Delete - default: - continue - } - if err == nil { - updates = append(updates, &jupdate) - } - } - return updates, nil -} - -func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { - // Use serialized request so resolution still works if the target etcd - // server is partitioned away from the quorum. - resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) - if gw.err = err; err != nil { - return nil, err - } - - updates := make([]*naming.Update, 0, len(resp.Kvs)) - for _, kv := range resp.Kvs { - var jupdate naming.Update - if err := json.Unmarshal(kv.Value, &jupdate); err != nil { - continue - } - updates = append(updates, &jupdate) - } - - opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} - gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) - return updates, nil -} - -func (gw *gRPCWatcher) Close() { gw.cancel() } diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go deleted file mode 100644 index 0041a89a807d..000000000000 --- a/clientv3/naming/grpc_test.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package naming_test - -import ( - "context" - "encoding/json" - "reflect" - "testing" - - etcd "go.etcd.io/etcd/clientv3" - namingv3 "go.etcd.io/etcd/clientv3/naming" - "go.etcd.io/etcd/integration" - "go.etcd.io/etcd/pkg/testutil" - - "google.golang.org/grpc/naming" -) - -func TestGRPCResolver(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - - r := namingv3.GRPCResolver{ - Client: clus.RandClient(), - } - - w, err := r.Resolve("foo") - if err != nil { - t.Fatal("failed to resolve foo", err) - } - defer w.Close() - - addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"} - err = r.Update(context.TODO(), "foo", addOp) - if err != nil { - t.Fatal("failed to add foo", err) - } - - us, err := w.Next() - if err != nil { - t.Fatal("failed to get udpate", err) - } - - wu := &naming.Update{ - Op: naming.Add, - Addr: "127.0.0.1", - Metadata: "metadata", - } - - if !reflect.DeepEqual(us[0], wu) { - t.Fatalf("up = %#v, want %#v", us[0], wu) - } - - delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"} - err = r.Update(context.TODO(), "foo", delOp) - if err != nil { - t.Fatalf("failed to udpate %v", err) - } - - us, err = w.Next() - if err != nil { - t.Fatalf("failed to get udpate %v", err) - } - - wu = &naming.Update{ - Op: naming.Delete, - Addr: "127.0.0.1", - Metadata: "metadata", - } - - if !reflect.DeepEqual(us[0], wu) { - t.Fatalf("up = %#v, want %#v", us[0], wu) - } -} - -// TestGRPCResolverMulti ensures the resolver will initialize -// correctly with multiple hosts and correctly receive multiple -// updates in a single revision. -func TestGRPCResolverMulti(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - c := clus.RandClient() - - v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"}) - if verr != nil { - t.Fatal(verr) - } - if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil { - t.Fatal(err) - } - if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil { - t.Fatal(err) - } - - r := namingv3.GRPCResolver{c} - - w, err := r.Resolve("foo") - if err != nil { - t.Fatal("failed to resolve foo", err) - } - defer w.Close() - - updates, nerr := w.Next() - if nerr != nil { - t.Fatal(nerr) - } - if len(updates) != 2 { - t.Fatalf("expected two updates, got %+v", updates) - } - - _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() - if err != nil { - t.Fatal(err) - } - - updates, nerr = w.Next() - if nerr != nil { - t.Fatal(nerr) - } - if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) { - t.Fatalf("expected two updates, got %+v", updates) - } -} diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go index 7e5059cfb31b..338827d46445 100644 --- a/proxy/grpcproxy/cluster.go +++ b/proxy/grpcproxy/cluster.go @@ -22,12 +22,10 @@ import ( "sync" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/clientv3/naming" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/clientv3/naming/endpoints" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second @@ -36,35 +34,46 @@ const resolveRetryRate = 1 type clusterProxy struct { clus clientv3.Cluster ctx context.Context - gr *naming.GRPCResolver // advertise client URL advaddr string prefix string + em endpoints.Manager + umu sync.RWMutex - umap map[string]gnaming.Update + umap map[string]endpoints.Endpoint } // NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints. // The returned channel is closed when there is grpc-proxy endpoint registered // and the client's context is canceled so the 'register' loop returns. +// TODO: Expand the API to report creation errors func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) { + var em endpoints.Manager + if advaddr != "" && prefix != "" { + var err error + if em, err = endpoints.NewManager(c, prefix); err != nil { + plog.Errorf("failed to provision endpointsManager %q (%v)", prefix, err) + return nil, nil + } + } + cp := &clusterProxy{ clus: c.Cluster, ctx: c.Ctx(), - gr: &naming.GRPCResolver{Client: c}, advaddr: advaddr, prefix: prefix, - umap: make(map[string]gnaming.Update), + umap: make(map[string]endpoints.Endpoint), + em: em, } donec := make(chan struct{}) - if advaddr != "" && prefix != "" { + if em != nil { go func() { defer close(donec) - cp.resolve(prefix) + cp.establishEndpointWatch(prefix) }() return cp, donec } @@ -73,38 +82,36 @@ func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.Clus return cp, donec } -func (cp *clusterProxy) resolve(prefix string) { +func (cp *clusterProxy) establishEndpointWatch(prefix string) { rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate) for rm.Wait(cp.ctx) == nil { - wa, err := cp.gr.Resolve(prefix) + wc, err := cp.em.NewWatchChannel(cp.ctx) if err != nil { - plog.Warningf("failed to resolve %q (%v)", prefix, err) + plog.Warningf("failed to establish endpoint watch %q (%v)", prefix, err) continue } - cp.monitor(wa) + cp.monitor(wc) } } -func (cp *clusterProxy) monitor(wa gnaming.Watcher) { - for cp.ctx.Err() == nil { - ups, err := wa.Next() - if err != nil { - plog.Warningf("clusterProxy watcher error (%v)", err) - if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { - return - } - } - - cp.umu.Lock() - for i := range ups { - switch ups[i].Op { - case gnaming.Add: - cp.umap[ups[i].Addr] = *ups[i] - case gnaming.Delete: - delete(cp.umap, ups[i].Addr) +func (cp *clusterProxy) monitor(wc endpoints.WatchChannel) { + for { + select { + case <-cp.ctx.Done(): + plog.Info("watching endpoints interrupted (%v)", cp.ctx.Err()) + return + case updates := <-wc: + cp.umu.Lock() + for _, up := range updates { + switch up.Op { + case endpoints.Add: + cp.umap[up.Endpoint.Addr] = up.Endpoint + case endpoints.Delete: + delete(cp.umap, up.Endpoint.Addr) + } } + cp.umu.Unlock() } - cp.umu.Unlock() } } diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go index ba628c3ebcaa..b02faeb83ba8 100644 --- a/proxy/grpcproxy/register.go +++ b/proxy/grpcproxy/register.go @@ -20,10 +20,9 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" - "go.etcd.io/etcd/clientv3/naming" + "go.etcd.io/etcd/clientv3/naming/endpoints" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second @@ -67,8 +66,12 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (* return nil, err } - gr := &naming.GRPCResolver{Client: c} - if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + return nil, err + } + endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()} + if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil { return nil, err } diff --git a/proxy/grpcproxy/register_test.go b/proxy/grpcproxy/register_test.go index 33b01547c0e7..0a15bcda563c 100644 --- a/proxy/grpcproxy/register_test.go +++ b/proxy/grpcproxy/register_test.go @@ -19,11 +19,9 @@ import ( "time" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/clientv3/naming" + "go.etcd.io/etcd/clientv3/naming/endpoints" "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" - - gnaming "google.golang.org/grpc/naming" ) func TestRegister(t *testing.T) { @@ -35,26 +33,16 @@ func TestRegister(t *testing.T) { paddr := clus.Members[0].GRPCAddr() testPrefix := "test-name" - wa := createWatcher(t, cli, testPrefix) - ups, err := wa.Next() - if err != nil { - t.Fatal(err) - } - if len(ups) != 0 { - t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups) - } + wa := mustCreateWatcher(t, cli, testPrefix) donec := Register(cli, testPrefix, paddr, 5) - ups, err = wa.Next() - if err != nil { - t.Fatal(err) - } + ups := <-wa if len(ups) != 1 { t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups) } - if ups[0].Addr != paddr { - t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr) + if ups[0].Endpoint.Addr != paddr { + t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Endpoint.Addr) } cli.Close() @@ -66,11 +54,14 @@ func TestRegister(t *testing.T) { } } -func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher { - gr := &naming.GRPCResolver{Client: c} - watcher, err := gr.Resolve(prefix) +func mustCreateWatcher(t *testing.T, c *clientv3.Client, prefix string) endpoints.WatchChannel { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + t.Fatalf("failed to create endpoints.Manager: %v", err) + } + wc, err := em.NewWatchChannel(c.Ctx()) if err != nil { t.Fatalf("failed to resolve %q (%v)", prefix, err) } - return watcher + return wc } diff --git a/test b/test index 31a686dbe396..d90f1133e3f2 100755 --- a/test +++ b/test @@ -192,7 +192,8 @@ function integration_pass { INTEGTESTPKG=("${REPO_PATH}/integration" "${REPO_PATH}/client/integration" "${REPO_PATH}/clientv3/integration/..." - "${REPO_PATH}/contrib/raftexample") + "${REPO_PATH}/contrib/raftexample" + "${REPO_PATH}/proxy/grpcproxy") else INTEGTESTPKG=("${TEST[@]}") fi @@ -205,6 +206,7 @@ function integration_extra { go test -timeout 25m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/clientv3/integration/..." go test -timeout 1m -v -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/contrib/raftexample" go test -timeout 5m -v ${RACE} -tags v2v3 "$@" "${REPO_PATH}/etcdserver/api/v2store" + go test -timeout 5m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/proxy/grpcproxy" go test -timeout 1m -v ${RACE} -cpu "${TEST_CPUS}" -run=Example "$@" "${TEST[@]}" }