Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Backport clientv3 naming implementation #16800

Merged
merged 3 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints
Expand Down Expand Up @@ -84,4 +84,4 @@ em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
```
36 changes: 28 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import (
"time"

"github.com/google/uuid"
"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
)

var (
Expand Down Expand Up @@ -95,7 +96,8 @@ type Client struct {

callOpts []grpc.CallOption

lg *zap.Logger
lgMu *sync.RWMutex
lg *zap.Logger
}

// New creates a new etcdv3 client from a given configuration.
Expand All @@ -112,7 +114,7 @@ func New(cfg Config) (*Client, error) {
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel}
return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()}
}

// NewFromURL creates a new etcdv3 client from a URL.
Expand All @@ -125,6 +127,23 @@ func NewFromURLs(urls []string) (*Client, error) {
return New(Config{Endpoints: urls})
}

// WithLogger sets a logger
func (c *Client) WithLogger(lg *zap.Logger) *Client {
c.lgMu.Lock()
c.lg = lg
c.lgMu.Unlock()
return c
}

// GetLogger gets the logger.
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
func (c *Client) GetLogger() *zap.Logger {
c.lgMu.RLock()
l := c.lg
c.lgMu.RUnlock()
return l
}

// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
Expand Down Expand Up @@ -423,6 +442,7 @@ func newClient(cfg *Config) (*Client, error) {
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}

lcfg := logutil.DefaultZapLoggerConfig
Expand Down
101 changes: 89 additions & 12 deletions clientv3/integration/naming/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
)

func TestEndpointManager(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Expand All @@ -54,10 +52,10 @@ func TestEndpointManager(t *testing.T) {
us := <-w

if us == nil {
t.Fatal("failed to get update", err)
t.Fatal("failed to get update")
}

wu := endpoints.Update{
wu := &endpoints.Update{
Op: endpoints.Add,
Key: "foo/a1",
Endpoint: e1,
Expand All @@ -69,30 +67,28 @@ func TestEndpointManager(t *testing.T) {

err = em.DeleteEndpoint(context.TODO(), "foo/a1")
if err != nil {
t.Fatalf("failed to udpate %v", err)
t.Fatalf("failed to update %v", err)
}

us = <-w
if err != nil {
t.Fatalf("failed to get udpate %v", err)
if us == nil {
t.Fatal("failed to get update")
}

wu = endpoints.Update{
wu = &endpoints.Update{
Op: endpoints.Delete,
Key: "foo/a1",
}

if !reflect.DeepEqual(us, wu) {
t.Fatalf("up = %#v, want %#v", us[1], wu)
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
}

// TestEndpointManagerAtomicity ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestEndpointManagerAtomicity(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Expand Down Expand Up @@ -133,3 +129,84 @@ func TestEndpointManagerAtomicity(t *testing.T) {
t.Fatalf("expected two delete updates, got %+v", updates)
}
}

func TestEndpointManagerCRUD(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}

// Add
k1 := "foo/a1"
e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata1"}
err = em.AddEndpoint(context.TODO(), k1, e1)
if err != nil {
t.Fatal("failed to add", k1, err)
}

k2 := "foo/a2"
e2 := endpoints.Endpoint{Addr: "127.0.0.2", Metadata: "metadata2"}
err = em.AddEndpoint(context.TODO(), k2, e2)
if err != nil {
t.Fatal("failed to add", k2, err)
}

eps, err := em.List(context.TODO())
if err != nil {
t.Fatal("failed to list foo")
}
if len(eps) != 2 {
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
}
if !reflect.DeepEqual(eps[k1], e1) {
t.Fatalf("unexpected endpoints: %s", k1)
}
if !reflect.DeepEqual(eps[k2], e2) {
t.Fatalf("unexpected endpoints: %s", k2)
}

// Delete
err = em.DeleteEndpoint(context.TODO(), k1)
if err != nil {
t.Fatal("failed to delete", k2, err)
}

eps, err = em.List(context.TODO())
if err != nil {
t.Fatal("failed to list foo")
}
if len(eps) != 1 {
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
}
if !reflect.DeepEqual(eps[k2], e2) {
t.Fatalf("unexpected endpoints: %s", k2)
}

// Update
k3 := "foo/a3"
e3 := endpoints.Endpoint{Addr: "127.0.0.3", Metadata: "metadata3"}
updates := []*endpoints.UpdateWithOpts{
{Update: endpoints.Update{Op: endpoints.Add, Key: k3, Endpoint: e3}},
{Update: endpoints.Update{Op: endpoints.Delete, Key: k2}},
}
err = em.Update(context.TODO(), updates)
if err != nil {
t.Fatal("failed to update", err)
}

eps, err = em.List(context.TODO())
if err != nil {
t.Fatal("failed to list foo")
}
if len(eps) != 1 {
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
}
if !reflect.DeepEqual(eps[k3], e3) {
t.Fatalf("unexpected endpoints: %s", k3)
}
}
86 changes: 71 additions & 15 deletions clientv3/integration/naming/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,112 @@
package naming

import (
"bytes"
"context"
"testing"
"time"

"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"

"go.etcd.io/etcd/clientv3/naming/endpoints"
"go.etcd.io/etcd/clientv3/naming/resolver"
"go.etcd.io/etcd/integration"
grpctest "go.etcd.io/etcd/pkg/grpc_testing"
"go.etcd.io/etcd/pkg/testutil"
)

// This test mimics scenario described in grpc_naming.md doc.

func TestEtcdGrpcResolver(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)
s1PayloadBody := []byte{'1'}
s1 := newDummyStubServer(s1PayloadBody)
if err := s1.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s1)", err)
}
defer s1.Stop()

// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001
s2PayloadBody := []byte{'2'}
s2 := newDummyStubServer(s2PayloadBody)
if err := s2.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s2)", err)
}
defer s2.Stop()

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

em, err := endpoints.NewManager(clus.RandClient(), "foo")
em, err := endpoints.NewManager(clus.Client(0), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}

e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}
e1 := endpoints.Endpoint{Addr: s1.Addr()}
e2 := endpoints.Endpoint{Addr: s2.Addr()}

err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
etcdResolver, err := resolver.NewBuilder(clus.RandClient())

conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
b, err := resolver.NewBuilder(clus.Client(1))
if err != nil {
t.Fatal("failed to connect to foo (e1)", err)
t.Fatal("failed to new resolver builder", err)
}
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
if err != nil {
t.Fatal("failed to connect to foo", err)
}
defer conn.Close()

// TODO: send requests to conn, ensure s1 received it.
c := testpb.NewTestServiceClient(conn)
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo (e1)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
}

em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)

// TODO: Send requests to conn and make sure s2 receive it.
// Might require restarting s1 to break the existing (open) connection.
// We use a loop with deadline of 30s to avoid test getting flake
// as it's asynchronous for gRPC Client to update underlying connections.
maxRetries := 300
retryPeriod := 100 * time.Millisecond
retries := 0
for {
time.Sleep(retryPeriod)
retries++

resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
if err != nil {
if retries < maxRetries {
continue
}
t.Fatal("failed to invoke rpc to foo (e2)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
if retries < maxRetries {
continue
}
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
}
break
}
}

conn.GetState() // this line is to avoid compiler warning that conn is unused.
func newDummyStubServer(body []byte) *grpctest.StubServer {
return &grpctest.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{
Payload: &testpb.Payload{
Type: testpb.PayloadType_COMPRESSABLE,
Body: body,
},
}, nil
},
}
}
2 changes: 1 addition & 1 deletion clientv3/naming/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Update struct {
}

// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
type WatchChannel <-chan []*Update

// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
Expand Down
Loading
Loading