diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md
index 731456feeba3..b350aaed6f0a 100644
--- a/CHANGELOG-3.5.md
+++ b/CHANGELOG-3.5.md
@@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
-## v3.5.0 (2020 TBD)
+## v3.5.0 (2021 TBD)
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_5.md) for any breaking changes.
@@ -63,6 +63,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
- Changed `pkg/flags` function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11616).
- Previously, `SetFlagsFromEnv(prefix string, fs *flag.FlagSet) error`, now `SetFlagsFromEnv(lg *zap.Logger, prefix string, fs *flag.FlagSet) error`.
- Previously, `SetPflagsFromEnv(prefix string, fs *pflag.FlagSet) error`, now `SetPflagsFromEnv(lg *zap.Logger, prefix string, fs *pflag.FlagSet) error`.
+- ClientV3 supports [grpc resolver API](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go).
+ - Endpoints can be managed using [endpoints.Manager](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/endpoints/endpoints.go)
+ - Previously supported [GRPCResolver was decomissioned](https://github.com/etcd-io/etcd/pull/12675). Use [resolver](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go) instead.
+
### `etcdctl`
@@ -174,6 +178,8 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896).
- Fix [auth token invalid after watch reconnects](https://github.com/etcd-io/etcd/pull/12264). Get AuthToken automatically when clientConn is ready.
- Improve [clientv3:get AuthToken gracefully without extra connection](https://github.com/etcd-io/etcd/pull/12165).
+- Changed [clientv3 dialing code]() to use grpc resolver API instead of custom balancer.
+ - Endpoints self identify now as `etcd-endpoints://{id}/#initially={list of endpoints}` e.g. `etcd-endpoints://0xc0009d8540/#initially=[localhost:2079]`
### Package `lease`
diff --git a/client/v3/client.go b/client/v3/client.go
index d0efaee68bf5..e0fc5eb51d29 100644
--- a/client/v3/client.go
+++ b/client/v3/client.go
@@ -206,12 +206,10 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
} else {
opts = append(opts, grpc.WithInsecure())
}
- grpc.WithDisableRetry()
// Interceptor retry and backoff.
- // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
- // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
- // once it is available.
+ // TODO: Replace all of clientv3/retry.go with RetryPolicy:
+ // https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
@@ -252,8 +250,8 @@ func (c *Client) getToken(ctx context.Context) error {
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
-func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
- creds := c.credentialsForEndpoint(ep)
+func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ creds := c.credentialsForEndpoint(c.Endpoints()[0])
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
@@ -278,7 +276,9 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}
- conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
+ initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
+ target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
+ conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
@@ -286,10 +286,10 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
}
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
- if c.creds != nil {
- return c.creds
- }
if endpoint.RequiresCredentials(ep) {
+ if c.creds != nil {
+ return c.creds
+ }
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
}
return nil
@@ -360,18 +360,15 @@ func newClient(cfg *Config) (*Client, error) {
client.cancel()
return nil, fmt.Errorf("at least one Endpoint is required in client config")
}
- dialEndpoint := cfg.Endpoints[0]
-
// Use a provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
- conn, err := client.dialWithBalancer(dialEndpoint)
+ conn, err := client.dialWithBalancer()
if err != nil {
client.cancel()
client.resolver.Close()
+ // TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
return nil, err
}
- // TODO: With the old grpc balancer interface, we waited until the dial timeout
- // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
client.conn = conn
client.Cluster = NewCluster(client)
@@ -390,6 +387,7 @@ func newClient(cfg *Config) (*Client, error) {
if err != nil {
client.Close()
cancel()
+ //TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
return nil, err
}
cancel()
diff --git a/client/v3/client_test.go b/client/v3/client_test.go
index e6918f12c0b2..50704e38c18a 100644
--- a/client/v3/client_test.go
+++ b/client/v3/client_test.go
@@ -82,6 +82,8 @@ func TestDialCancel(t *testing.T) {
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)
+ wantError := context.DeadlineExceeded
+
// grpc.WithBlock to block until connection up or timeout
testCfgs := []Config{
{
@@ -121,8 +123,8 @@ func TestDialTimeout(t *testing.T) {
case <-time.After(5 * time.Second):
t.Errorf("#%d: failed to timeout dial on time", i)
case err := <-donec:
- if err != context.DeadlineExceeded {
- t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
+ if err.Error() != wantError.Error() {
+ t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
}
}
}
diff --git a/client/v3/internal/endpoint/endpoint.go b/client/v3/internal/endpoint/endpoint.go
index c0a2f9acbfa4..014d9882460e 100644
--- a/client/v3/internal/endpoint/endpoint.go
+++ b/client/v3/internal/endpoint/endpoint.go
@@ -15,40 +15,77 @@
package endpoint
import (
+ "net"
"net/url"
- "regexp"
+ "path"
+ "strings"
)
-var (
- STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
-)
+func extractHostFromHostPort(ep string) string {
+ host, _, err := net.SplitHostPort(ep)
+ if err != nil {
+ return ep
+ }
+ return host
+}
+
+func extractHostFromPath(pathStr string) string {
+ return extractHostFromHostPort(path.Base(pathStr))
+}
-func stripPort(ep string) string {
- return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
+//split2 returns the values from strings.SplitN(s, sep, 2).
+//If sep is not found, it returns ("", "", false) instead.
+func split2(s, sep string) (string, string, bool) {
+ spl := strings.SplitN(s, sep, 2)
+ if len(spl) < 2 {
+ return "", "", false
+ }
+ return spl[0], spl[1], true
}
+// This function translates endpoints names supported by etcd server into
+// endpoints as supported by grpc with additional information
+// (server_name for cert validation, requireCreds - whether certs are needed).
+// The main differences:
+// - etcd supports unixs & https names as opposed to unix & http to
+// distinguish need to configure certificates.
+// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
+// - etcd supports unix(s)://local-file naming schema
+// (as opposed to unit:local-file canonical name).
+// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
+// is considered serverName - to allow local testing of cert-protected communication.
+// See more:
+// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
+// - https://golang.org/pkg/net/#Dial
+// - https://github.com/grpc/grpc/blob/master/doc/naming.md
func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
- url, err := url.Parse(ep)
- if err != nil {
- return ep, stripPort(ep), false
+ if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
+ if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
+ // absolute path case
+ schema, absolutePath, _ := split2(ep, "://")
+ return "unix://" + absolutePath, extractHostFromPath(absolutePath), schema == "unixs"
+ }
+ if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
+ // legacy etcd local path
+ schema, localPath, _ := split2(ep, "://")
+ return "unix:" + localPath, extractHostFromPath(localPath), schema == "unixs"
+ }
+ schema, localPath, _ := split2(ep, ":")
+ return "unix:" + localPath, extractHostFromPath(localPath), schema == "unixs"
}
- switch url.Scheme {
- case "http", "https":
- return url.Host, url.Hostname(), url.Scheme == "https"
- case "unix", "unixs":
- requireCreds = url.Scheme == "unixs"
- if url.Opaque != "" {
- return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
- } else if url.Path != "" {
- return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
- } else {
- return "unix:" + url.Host, url.Hostname(), requireCreds
+
+ if strings.Contains(ep, "://") {
+ url, err := url.Parse(ep)
+ if err != nil {
+ return ep, extractHostFromHostPort(ep), false
+ }
+ if url.Scheme == "http" || url.Scheme == "https" {
+ return url.Host, url.Hostname(), url.Scheme == "https"
}
- case "":
- return url.Host + url.Path, url.Host + url.Path, false
- default:
- return ep, stripPort(ep), false
+ return ep, url.Hostname(), false
}
+ // Handles plain addresses like 10.0.0.44:437
+ return ep, extractHostFromHostPort(ep), false
}
// RequiresCredentials returns whether given endpoint requires
diff --git a/client/v3/internal/endpoint/endpoint_test.go b/client/v3/internal/endpoint/endpoint_test.go
index 6eb810cfae91..dbb59df7c581 100644
--- a/client/v3/internal/endpoint/endpoint_test.go
+++ b/client/v3/internal/endpoint/endpoint_test.go
@@ -18,41 +18,48 @@ import (
"testing"
)
-func TestInterpret(t *testing.T) {
+func Test_interpret(t *testing.T) {
tests := []struct {
- endpoint string
- wantAddress string
- wantServerName string
+ endpoint string
+ wantAddress string
+ wantServerName string
+ wantRequiresCreds bool
}{
- {"127.0.0.1", "127.0.0.1", "127.0.0.1"},
- {"localhost", "localhost", "localhost"},
- {"localhost:8080", "localhost:8080", "localhost"},
+ {"127.0.0.1", "127.0.0.1", "127.0.0.1", false},
+ {"localhost", "localhost", "localhost", false},
+ {"localhost:8080", "localhost:8080", "localhost", false},
- {"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
- {"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
+ {"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", false},
+ {"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", false},
- {"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
- {"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
+ {"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", false},
+ {"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", false},
- {"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
- {"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
- {"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
- {"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
+ {"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", true},
+ {"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", true},
+ {"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", true},
+ {"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", true},
- {"http://127.0.0.1", "127.0.0.1", "127.0.0.1"},
- {"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
- {"https://127.0.0.1", "127.0.0.1", "127.0.0.1"},
- {"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
- {"https://localhost:20000", "localhost:20000", "localhost"},
+ {"http://127.0.0.1", "127.0.0.1", "127.0.0.1", false},
+ {"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", false},
+ {"https://127.0.0.1", "127.0.0.1", "127.0.0.1", true},
+ {"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", true},
+ {"https://localhost:20000", "localhost:20000", "localhost", true},
- {"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
- {"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
- {"etcd.io", "etcd.io", "etcd.io"},
- {"http://etcd.io/abc", "etcd.io", "etcd.io"},
- {"dns://something-other", "dns://something-other", "dns://something-other"},
+ {"unix:///tmp/abc", "unix:///tmp/abc", "abc", false},
+ {"unixs:///tmp/abc", "unix:///tmp/abc", "abc", true},
+ {"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", false},
+ {"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", true},
+ {"etcd.io", "etcd.io", "etcd.io", false},
+ {"http://etcd.io/abc", "etcd.io", "etcd.io", false},
+ {"dns://something-other", "dns://something-other", "something-other", false},
+
+ {"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", false},
+ {"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", false},
+ {"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", false},
}
for _, tt := range tests {
- t.Run(tt.endpoint, func(t *testing.T) {
+ t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
gotAddress, gotServerName := Interpret(tt.endpoint)
if gotAddress != tt.wantAddress {
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
@@ -61,5 +68,32 @@ func TestInterpret(t *testing.T) {
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
}
})
+ t.Run("RequiresCredentials_"+tt.endpoint, func(t *testing.T) {
+ requiresCreds := RequiresCredentials(tt.endpoint)
+ if requiresCreds != tt.wantRequiresCreds {
+ t.Errorf("RequiresCredentials() got = %v, want %v", requiresCreds, tt.wantRequiresCreds)
+ }
+ })
+ }
+}
+
+func Test_extractHostFromHostPort(t *testing.T) {
+ tests := []struct {
+ ep string
+ want string
+ }{
+ {ep: "localhost", want: "localhost"},
+ {ep: "localhost:8080", want: "localhost"},
+ {ep: "192.158.7.14:8080", want: "192.158.7.14"},
+ {ep: "192.158.7.14:8080", want: "192.158.7.14"},
+ {ep: "[2001:db8:1f70::999:de8:7648:6e8]", want: "[2001:db8:1f70::999:de8:7648:6e8]"},
+ {ep: "[2001:db8:1f70::999:de8:7648:6e8]:100", want: "2001:db8:1f70::999:de8:7648:6e8"},
+ }
+ for _, tt := range tests {
+ t.Run(tt.ep, func(t *testing.T) {
+ if got := extractHostFromHostPort(tt.ep); got != tt.want {
+ t.Errorf("extractHostFromHostPort() = %v, want %v", got, tt.want)
+ }
+ })
}
}
diff --git a/client/v3/internal/resolver/resolver.go b/client/v3/internal/resolver/resolver.go
index d684a311607d..3ee3cb8e2bb9 100644
--- a/client/v3/internal/resolver/resolver.go
+++ b/client/v3/internal/resolver/resolver.go
@@ -21,6 +21,10 @@ import (
"google.golang.org/grpc/serviceconfig"
)
+const (
+ Schema = "etcd-endpoints"
+)
+
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
// using SetEndpoints.
type EtcdManualResolver struct {
@@ -30,7 +34,7 @@ type EtcdManualResolver struct {
}
func New(endpoints ...string) *EtcdManualResolver {
- r := manual.NewBuilderWithScheme("etcd-endpoints")
+ r := manual.NewBuilderWithScheme(Schema)
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
}
diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go
index 83bee50159f2..0c1e551bb04b 100644
--- a/server/etcdmain/grpc_proxy.go
+++ b/server/etcdmain/grpc_proxy.go
@@ -208,7 +208,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
}()
client := mustNewClient(lg)
- proxyClient := mustNewProxyClient(lg, tlsinfo)
+
+ // The proxy client is used for self-healthchecking.
+ // TODO: The mechanism should be refactored to use internal connection.
+ var proxyClient *clientv3.Client
+ if grpcProxyAdvertiseClientURL != "" {
+ proxyClient = mustNewProxyClient(lg, tlsinfo)
+ }
httpClient := mustNewHTTPClient(lg)
srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
diff --git a/server/proxy/grpcproxy/health.go b/server/proxy/grpcproxy/health.go
index 38546429485e..5a77b819e569 100644
--- a/server/proxy/grpcproxy/health.go
+++ b/server/proxy/grpcproxy/health.go
@@ -56,6 +56,9 @@ func checkHealth(c *clientv3.Client) etcdhttp.Health {
}
func checkProxyHealth(c *clientv3.Client) etcdhttp.Health {
+ if c == nil {
+ return etcdhttp.Health{Health: "false", Reason: "no connection to proxy"}
+ }
h := checkHealth(c)
if h.Health != "true" {
return h
diff --git a/tests/integration/clientv3/lease/lease_test.go b/tests/integration/clientv3/lease/lease_test.go
index e29c3e97e5cc..78196dfc3552 100644
--- a/tests/integration/clientv3/lease/lease_test.go
+++ b/tests/integration/clientv3/lease/lease_test.go
@@ -728,6 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
// before, during, and after quorum loss to confirm Grant/KeepAlive tolerates
// transient cluster failure.
func TestV3LeaseFailureOverlap(t *testing.T) {
+ defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)
diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go
index e237dcf36aed..13ecf5a796d3 100644
--- a/tests/integration/cluster.go
+++ b/tests/integration/cluster.go
@@ -63,7 +63,7 @@ import (
const (
// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
- RequestWaitTimeout = 3 * time.Second
+ RequestWaitTimeout = 5 * time.Second
tickDuration = 10 * time.Millisecond
requestTimeout = 20 * time.Second
@@ -1275,7 +1275,11 @@ func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
for _, m := range clus.Members {
client, err := NewClientV3(m)
if err != nil {
- t.Fatalf("cannot create client: %v", err)
+ if t != nil {
+ t.Fatalf("cannot create client: %v", err)
+ } else {
+ log.Fatalf("cannot create client: %v", err)
+ }
}
clus.clients = append(clus.clients, client)
}