diff --git a/Documentation/dev-guide/grpc_naming.md b/Documentation/dev-guide/grpc_naming.md index 77287c6e1c72..c279bd948310 100644 --- a/Documentation/dev-guide/grpc_naming.md +++ b/Documentation/dev-guide/grpc_naming.md @@ -6,40 +6,41 @@ etcd provides a gRPC resolver to support an alternative name system that fetches ## Using etcd discovery with go-grpc -The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution: +The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client: ```go import ( - "go.etcd.io/etcd/clientv3" - etcdnaming "go.etcd.io/etcd/clientv3/naming" + "go.etcd.io/etcd/v3/clientv3" + resolver "go.etcd.io/etcd/v3/clientv3/naming/resolver" "google.golang.org/grpc" ) -... - cli, cerr := clientv3.NewFromURL("http://localhost:2379") -r := &etcdnaming.GRPCResolver{Client: cli} -b := grpc.RoundRobin(r) -conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...) +etcdResolver, err := resolver.NewBuilder(clus.RandClient()); +conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver)) ``` ## Managing service endpoints -The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys. +The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/") +with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints. +Endpoints are added to the service by creating new keys and removed from the service by deleting keys. ### Adding an endpoint New endpoints can be added to the service through `etcdctl`: ```sh -ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' +ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' ``` -The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`: +The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`: ```go -r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."}) + +em := endpoints.NewManager(client, "foo/bar/my-service") +err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"}); ``` ### Deleting an endpoint @@ -47,13 +48,14 @@ r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2. Hosts can be deleted from the service through `etcdctl`: ```sh -ETCDCTL_API=3 etcdctl del my-service/1.2.3.4 +ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4 ``` -The etcd client's `GRPCResolver.Update` method also supports deleting endpoints: +The etcd client's `endpoints.Manager` method also supports deleting endpoints: ```go -r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"}) +em := endpoints.NewManager(client, "foo/bar/my-service") +err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1"); ``` ### Registering an endpoint with a lease @@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '` ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' ETCDCTL_API=3 etcdctl lease keep-alive $lease ``` +In the golang: + +```go +em := endpoints.NewManager(client, "foo/bar/my-service") +err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"}); +``` + +### Atomically updating endpoints + +If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly: + +``` +em := endpoints.NewManager(c, "foo") + +err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{ + endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}), + endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})}) +``` \ No newline at end of file diff --git a/clientv3/naming/doc.go b/clientv3/naming/doc.go index 4b618a3d817c..213bf3918491 100644 --- a/clientv3/naming/doc.go +++ b/clientv3/naming/doc.go @@ -12,44 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services. +// Package naming provides: +// - subpackage endpoints: an abstraction layer to store and read endpoints +// information from etcd. +// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC +// services based on the endpoints configuration // // To use, first import the packages: // // import ( // "go.etcd.io/etcd/clientv3" -// etcdnaming "go.etcd.io/etcd/clientv3/naming" -// +// "go.etcd.io/etcd/clientv3/naming/endpoints" +// "go.etcd.io/etcd/clientv3/naming/resolver" // "google.golang.org/grpc" -// "google.golang.org/grpc/naming" // ) // // First, register new endpoint addresses for a service: // // func etcdAdd(c *clientv3.Client, service, addr string) error { -// r := &etcdnaming.GRPCResolver{Client: c} -// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}) +// em := endpoints.NewManager(c, service) +// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}); // } // // Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer: // // func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) { -// r := &etcdnaming.GRPCResolver{Client: c} -// b := grpc.RoundRobin(r) -// return grpc.Dial(service, grpc.WithBalancer(b)) +// etcdResolver, err := resolver.NewBuilder(c); +// if err { return nil, err } +// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) // } // // Optionally, force delete an endpoint: // // func etcdDelete(c *clientv3, service, addr string) error { -// r := &etcdnaming.GRPCResolver{Client: c} -// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"}) +// em := endpoints.NewManager(c, service) +// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr) // } // // Or register an expiring endpoint with a lease: // -// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error { -// r := &etcdnaming.GRPCResolver{Client: c} -// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid)) +// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error { +// em := endpoints.NewManager(c, service) +// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid)); // } package naming diff --git a/clientv3/naming/endpoints/endpoints.go b/clientv3/naming/endpoints/endpoints.go new file mode 100644 index 000000000000..d0220105236f --- /dev/null +++ b/clientv3/naming/endpoints/endpoints.go @@ -0,0 +1,82 @@ +package endpoints + +import ( + "context" + + "go.etcd.io/etcd/clientv3" +) + +// Endpoint represents a single address the connection can be established with. +// +// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address. +// Please document etcd version since which version each field is supported. +type Endpoint struct { + // Addr is the server address on which a connection will be established. + // Since etcd 3.1 + Addr string + + // Metadata is the information associated with Addr, which may be used + // to make load balancing decision. + // Since etcd 3.1 + Metadata interface{} +} + +type Operation uint8 + +const ( + // Add indicates an Endpoint is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update describes a single edit action of an Endpoint. +type Update struct { + // Op - action Add or Delete. + Op Operation + Key string + Endpoint Endpoint +} + +// WatchChannel is used to deliver notifications about endpoints updates. +type WatchChannel chan []*Update + +// Key2EndpointMap maps etcd key into struct describing the endpoint. +type Key2EndpointMap map[string]Endpoint + +// UpdateWithOpts describes endpoint update (add or delete) together +// with etcd options (e.g. to attach an endpoint to a lease). +type UpdateWithOpts struct { + Update + Opts []clientv3.OpOption +} + +// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration. +func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts { + return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts} +} + +// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion. +func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts { + return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts} +} + +// Manager can be used to add/remove & inspect endpoints stored in etcd for +// a particular target. +type Manager interface { + // Update allows to atomically add/remove a few endpoints from etcd. + Update(ctx context.Context, updates []*UpdateWithOpts) error + + // AddEndpoint registers a single endpoint in etcd. + // For more advanced use-cases use the Update method. + AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error + // DeleteEndpoint deletes a single endpoint stored in etcd. + // For more advanced use-cases use the Update method. + DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error + + // List returns all the endpoints for the current target as a map. + List(ctx context.Context) (Key2EndpointMap, error) + // NewWatchChannel creates a channel that populates or endpoint updates. + // Cancel the 'ctx' to close the watcher. + NewWatchChannel(ctx context.Context) (WatchChannel, error) +} diff --git a/clientv3/naming/endpoints/endpoints_impl.go b/clientv3/naming/endpoints/endpoints_impl.go new file mode 100644 index 000000000000..b33fc6fbad73 --- /dev/null +++ b/clientv3/naming/endpoints/endpoints_impl.go @@ -0,0 +1,121 @@ +package endpoints + +// TODO: The API is not yet implemented. + +import ( + "context" + "fmt" + + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/naming/endpoints/internal" +) + +type endpointManager struct { + // TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652 +} + +func NewManager(client *clientv3.Client, target string) (Manager, error) { + // To be implemented (https://github.com/etcd-io/etcd/issues/12652) + return nil, fmt.Errorf("Not implemented yet") +} + +func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error { + // TODO: For loop in a single transaction: + internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format. + switch internalUpdate.Op { + //case internal.Add: + // var v []byte + // if v, err = json.Marshal(internalUpdate); err != nil { + // return status.Error(codes.InvalidArgument, err.Error()) + // } + // _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) + //case internal.Delete: + // _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) + //default: + // return status.Error(codes.InvalidArgument, "naming: bad naming op") + } + return fmt.Errorf("Not implemented yet") +} + +func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error { + return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)}) +} + +func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error { + return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)}) +} + +func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { + return nil, fmt.Errorf("Not implemented yet") + + // TODO: Implementation to be inspired by: + // Next gets the next set of updates from the etcd resolver. + //// Calls to Next should be serialized; concurrent calls are not safe since + //// there is no way to reconcile the update ordering. + //func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { + // if gw.wch == nil { + // // first Next() returns all addresses + // return gw.firstNext() + // } + // if gw.err != nil { + // return nil, gw.err + // } + // + // // process new events on target/* + // wr, ok := <-gw.wch + // if !ok { + // gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) + // return nil, gw.err + // } + // if gw.err = wr.Err(); gw.err != nil { + // return nil, gw.err + // } + // + // updates := make([]*naming.Update, 0, len(wr.Events)) + // for _, e := range wr.Events { + // var jupdate naming.Update + // var err error + // switch e.Type { + // case etcd.EventTypePut: + // err = json.Unmarshal(e.Kv.Value, &jupdate) + // jupdate.Op = naming.Add + // case etcd.EventTypeDelete: + // err = json.Unmarshal(e.PrevKv.Value, &jupdate) + // jupdate.Op = naming.Delete + // default: + // continue + // } + // if err == nil { + // updates = append(updates, &jupdate) + // } + // } + // return updates, nil + //} + // + //func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { + // // Use serialized request so resolution still works if the target etcd + // // server is partitioned away from the quorum. + // resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) + // if gw.err = err; err != nil { + // return nil, err + // } + // + // updates := make([]*naming.Update, 0, len(resp.Kvs)) + // for _, kv := range resp.Kvs { + // var jupdate naming.Update + // if err := json.Unmarshal(kv.Value, &jupdate); err != nil { + // continue + // } + // updates = append(updates, &jupdate) + // } + // + // opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} + // gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) + // return updates, nil + //} +} + +func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { + // TODO: Implementation + return nil, fmt.Errorf("Not implemented yet") +} diff --git a/clientv3/naming/endpoints/internal/update.go b/clientv3/naming/endpoints/internal/update.go new file mode 100644 index 000000000000..71aa83fed4cf --- /dev/null +++ b/clientv3/naming/endpoints/internal/update.go @@ -0,0 +1,38 @@ +package internal + +// Operation describes action performed on endpoint (addition vs deletion). +// Must stay JSON-format compatible with: +// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Operation +type Operation uint8 + +const ( + // Add indicates a new address is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update defines a persistent (JSON marshalled) format representing +// endpoint within the etcd storage. +// +// As the format can be persisted by one version of etcd client library and +// read by other the format must be kept backward compatible and +// in particular must be superset of the grpc(<=1.29.1) naming.Update structure: +// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Update +// +// Please document since which version of etcd-client given property is supported. +// Please keep the naming consistent with e.g. https://pkg.go.dev/google.golang.org/grpc/resolver#Address. +// +// Notice that it is not valid having both empty string Addr and nil Metadata in an Update. +type Update struct { + // Op indicates the operation of the update. + // Since etcd 3.1. + Op Operation + // Addr is the updated address. It is empty string if there is no address update. + // Since etcd 3.1. + Addr string + // Metadata is the updated metadata. It is nil if there is no metadata update. + // Metadata is not required for a custom naming implementation. + // Since etcd 3.1. + Metadata interface{} +} diff --git a/clientv3/naming/resolver/resolver.go b/clientv3/naming/resolver/resolver.go new file mode 100644 index 000000000000..dc0f79164e88 --- /dev/null +++ b/clientv3/naming/resolver/resolver.go @@ -0,0 +1,25 @@ +package resolver + +import ( + "google.golang.org/grpc/resolver" + + "go.etcd.io/etcd/clientv3" +) + +type builder struct { + // ... +} + +func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + // To be implemented... + // Using endpoints.NewWatcher() to subscribe for endpoints changes. + return nil, nil +} + +func (b builder) Scheme() string { + return "etcd" +} + +func NewBuilder(client *clientv3.Client) (resolver.Builder, error) { + return builder{}, nil +} diff --git a/go.mod b/go.mod index 6ced2410e9b9..acad658e0581 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( golang.org/x/sync v0.1.0 golang.org/x/sys v0.13.0 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 - google.golang.org/grpc v1.26.0 + google.golang.org/grpc v1.29.1 gopkg.in/cheggaaa/pb.v1 v1.0.25 gopkg.in/yaml.v2 v2.4.0 sigs.k8s.io/yaml v1.1.0 diff --git a/go.sum b/go.sum index 95c7df6b565b..26550c4438b9 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= @@ -31,7 +32,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -48,7 +50,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -56,6 +57,7 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -268,8 +270,9 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2El google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= -google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4= +google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/integration/clientv3/naming/endpoints_test.go b/integration/clientv3/naming/endpoints_test.go new file mode 100644 index 000000000000..30172bb4c5f8 --- /dev/null +++ b/integration/clientv3/naming/endpoints_test.go @@ -0,0 +1,135 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package naming_test + +import ( + "context" + "reflect" + "testing" + + etcd "go.etcd.io/etcd/clientv3" + endpoints2 "go.etcd.io/etcd/clientv3/naming/endpoints" + + "go.etcd.io/etcd/integration" + "go.etcd.io/etcd/pkg/testutil" +) + +func TestEndpointManager(t *testing.T) { + t.Skip("Not implemented yet") + + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints2.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + ctx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + w, err := em.NewWatchChannel(ctx) + if err != nil { + t.Fatal("failed to establish watch", err) + } + + e1 := endpoints2.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"} + err = em.AddEndpoint(context.TODO(), "foo/a1", e1) + if err != nil { + t.Fatal("failed to add foo", err) + } + + us := <-w + + if us == nil { + t.Fatal("failed to get update", err) + } + + wu := endpoints2.Update{ + Op: endpoints2.Add, + Key: "foo/a1", + Endpoint: e1, + } + + if !reflect.DeepEqual(us[0], wu) { + t.Fatalf("up = %#v, want %#v", us[0], wu) + } + + err = em.DeleteEndpoint(context.TODO(), "foo/a1") + if err != nil { + t.Fatalf("failed to udpate %v", err) + } + + us = <-w + if err != nil { + t.Fatalf("failed to get udpate %v", err) + } + + wu = endpoints2.Update{ + Op: endpoints2.Delete, + Key: "foo/a1", + } + + if !reflect.DeepEqual(us, wu) { + t.Fatalf("up = %#v, want %#v", us[1], wu) + } +} + +// TestEndpointManagerAtomicity ensures the resolver will initialize +// correctly with multiple hosts and correctly receive multiple +// updates in a single revision. +func TestEndpointManagerAtomicity(t *testing.T) { + t.Skip("Not implemented yet") + + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + c := clus.RandClient() + em, err := endpoints2.NewManager(c, "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + err = em.Update(context.TODO(), []*endpoints2.UpdateWithOpts{ + endpoints2.NewAddUpdateOpts("foo/host", endpoints2.Endpoint{Addr: "127.0.0.1:2000"}), + endpoints2.NewAddUpdateOpts("foo/host2", endpoints2.Endpoint{Addr: "127.0.0.1:2001"})}) + if err != nil { + t.Fatal(err) + } + + ctx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + w, err := em.NewWatchChannel(ctx) + if err != nil { + t.Fatal(err) + } + + updates := <-w + if len(updates) != 2 { + t.Fatalf("expected two updates, got %+v", updates) + } + + _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() + if err != nil { + t.Fatal(err) + } + + updates = <-w + if len(updates) != 2 || (updates[0].Op != endpoints2.Delete && updates[1].Op != endpoints2.Delete) { + t.Fatalf("expected two delete updates, got %+v", updates) + } +} diff --git a/integration/clientv3/naming/resolver_test.go b/integration/clientv3/naming/resolver_test.go new file mode 100644 index 000000000000..bb3f1ddd1777 --- /dev/null +++ b/integration/clientv3/naming/resolver_test.go @@ -0,0 +1,71 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package naming_test + +import ( + "context" + "testing" + + endpoints2 "go.etcd.io/etcd/clientv3/naming/endpoints" + "go.etcd.io/etcd/clientv3/naming/resolver" + + "google.golang.org/grpc" + + "go.etcd.io/etcd/integration" + "go.etcd.io/etcd/pkg/testutil" +) + +// This test mimics scenario described in grpc_naming.md doc. + +func TestEtcdGrpcResolver(t *testing.T) { + t.Skip("Not implemented yet") + + defer testutil.AfterTest(t) + + // s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000 + // s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001 + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints2.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + e1 := endpoints2.Endpoint{Addr: "127.0.0.1:20000"} + e2 := endpoints2.Endpoint{Addr: "127.0.0.1:20001"} + + err = em.AddEndpoint(context.TODO(), "foo/e1", e1) + if err != nil { + t.Fatal("failed to add foo", err) + } + etcdResolver, err := resolver.NewBuilder(clus.RandClient()) + + conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) + if err != nil { + t.Fatal("failed to connect to foo (e1)", err) + } + + // TODO: send requests to conn, ensure s1 received it. + + em.DeleteEndpoint(context.TODO(), "foo/e1") + em.AddEndpoint(context.TODO(), "foo/e2", e2) + + // TODO: Send requests to conn and make sure s2 receive it. + // Might require restarting s1 to break the existing (open) connection. + + conn.GetState() // this line is to avoid compiler warning that conn is unused. +}