Skip to content

Commit

Permalink
Introduce grpc-1.30+ compatible client/v3/naming API.
Browse files Browse the repository at this point in the history
This is not yet implementation, just API and tests to be filled
with implementation in next CLs,
tracked by: etcd-io#12652

We propose here 3 packages:
 - clientv3/naming/endpoints ->
    That is abstraction layer over etcd that allows to write, read &
    watch Endpoints information. It's independent from GRPC API. It hides
    the storage details.

 - clientv3/naming/endpoints/internal ->
    That contains the grpc's compatible Update class to preserve the
    internal JSON mashalling format.

 - clientv3/naming/resolver ->
   That implements the GRPC resolver API, such that etcd can be
   used for connection.Dial in grpc.

Please see the grpc_naming.md document changes & grpcproxy/cluster.go
new integration, to see how the new abstractions work.

Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
ptabor authored and chaochn47 committed Oct 19, 2023
1 parent 3663ae1 commit 6c0e4d9
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 38 deletions.
48 changes: 34 additions & 14 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,56 @@ etcd provides a gRPC resolver to support an alternative name system that fetches

## Using etcd discovery with go-grpc

The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:

```go
import (
"go.etcd.io/etcd/clientv3"
etcdnaming "go.etcd.io/etcd/clientv3/naming"
resolver "go.etcd.io/etcd/clientv3/naming/resolver"

"google.golang.org/grpc"
)

...

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints

The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.

### Adding an endpoint

New endpoints can be added to the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
```

The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})

em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```

### Deleting an endpoint

Hosts can be deleted from the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
```

The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
The etcd client's `endpoints.Manager` method also supports deleting endpoints:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
```

### Registering an endpoint with a lease
Expand All @@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl lease keep-alive $lease
```
In the golang:

```go
em := endpoints.NewManager(client, "foo/bar/my-service")
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```

### Atomically updating endpoints

If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:

```
em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
135 changes: 135 additions & 0 deletions clientv3/integration/naming/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package naming

import (
"context"
"reflect"
"testing"

etcd "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/naming/endpoints"

"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
)

func TestEndpointManager(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal("failed to establish watch", err)
}

e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"}
err = em.AddEndpoint(context.TODO(), "foo/a1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}

us := <-w

if us == nil {
t.Fatal("failed to get update", err)
}

wu := endpoints.Update{
Op: endpoints.Add,
Key: "foo/a1",
Endpoint: e1,
}

if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}

err = em.DeleteEndpoint(context.TODO(), "foo/a1")
if err != nil {
t.Fatalf("failed to udpate %v", err)
}

us = <-w
if err != nil {
t.Fatalf("failed to get udpate %v", err)
}

wu = endpoints.Update{
Op: endpoints.Delete,
Key: "foo/a1",
}

if !reflect.DeepEqual(us, wu) {
t.Fatalf("up = %#v, want %#v", us[1], wu)
}
}

// TestEndpointManagerAtomicity ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestEndpointManagerAtomicity(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

c := clus.RandClient()
em, err := endpoints.NewManager(c, "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}

err = em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewAddUpdateOpts("foo/host", endpoints.Endpoint{Addr: "127.0.0.1:2000"}),
endpoints.NewAddUpdateOpts("foo/host2", endpoints.Endpoint{Addr: "127.0.0.1:2001"})})
if err != nil {
t.Fatal(err)
}

ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal(err)
}

updates := <-w
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
}

_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
t.Fatal(err)
}

updates = <-w
if len(updates) != 2 || (updates[0].Op != endpoints.Delete && updates[1].Op != endpoints.Delete) {
t.Fatalf("expected two delete updates, got %+v", updates)
}
}
70 changes: 70 additions & 0 deletions clientv3/integration/naming/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package naming

import (
"context"
"testing"

"google.golang.org/grpc"

"go.etcd.io/etcd/clientv3/naming/endpoints"
"go.etcd.io/etcd/clientv3/naming/resolver"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
)

// This test mimics scenario described in grpc_naming.md doc.

func TestEtcdGrpcResolver(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}

e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}

err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
etcdResolver, err := resolver.NewBuilder(clus.RandClient())

conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
if err != nil {
t.Fatal("failed to connect to foo (e1)", err)
}

// TODO: send requests to conn, ensure s1 received it.

em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)

// TODO: Send requests to conn and make sure s2 receive it.
// Might require restarting s1 to break the existing (open) connection.

conn.GetState() // this line is to avoid compiler warning that conn is unused.
}
31 changes: 17 additions & 14 deletions clientv3/naming/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
// Package naming provides:
// - subpackage endpoints: an abstraction layer to store and read endpoints
// information from etcd.
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
// services based on the endpoints configuration
//
// To use, first import the packages:
//
// import (
// "go.etcd.io/etcd/clientv3"
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
//
// "go.etcd.io/etcd/clientv3/naming/endpoints"
// "go.etcd.io/etcd/clientv3/naming/resolver"
// "google.golang.org/grpc"
// "google.golang.org/grpc/naming"
// )
//
// First, register new endpoint addresses for a service:
//
// func etcdAdd(c *clientv3.Client, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr})
// }
//
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
//
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
// r := &etcdnaming.GRPCResolver{Client: c}
// b := grpc.RoundRobin(r)
// return grpc.Dial(service, grpc.WithBalancer(b))
// etcdResolver, err := resolver.NewBuilder(c);
// if err { return nil, err }
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
// }
//
// Optionally, force delete an endpoint:
//
// func etcdDelete(c *clientv3, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
// em := endpoints.NewManager(c, service)
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
// }
//
// Or register an expiring endpoint with a lease:
//
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid))
// }
package naming
Loading

0 comments on commit 6c0e4d9

Please sign in to comment.