Skip to content

Commit

Permalink
grpc: Remove health check func dial option used for testing (#7820)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Nov 8, 2024
1 parent 5b40f07 commit 74738cf
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 54 deletions.
4 changes: 2 additions & 2 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
if !ac.scopts.HealthCheckEnabled {
return
}
healthCheckFunc := ac.cc.dopts.healthCheckFunc
healthCheckFunc := internal.HealthCheckFunc
if healthCheckFunc == nil {
// The health package is not imported to set health check function.
//
Expand Down Expand Up @@ -1477,7 +1477,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
}
// Start the health checking stream.
go func() {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
Expand Down
16 changes: 0 additions & 16 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ type dialOptions struct {
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
minConnectTimeout func() time.Duration
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
Expand Down Expand Up @@ -445,10 +444,6 @@ func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOp
})
}

func init() {
internal.WithHealthCheckFunc = withHealthCheckFunc
}

// WithDialer returns a DialOption that specifies a function to use for dialing
// network addresses. If FailOnNonTempDialError() is set to true, and an error
// is returned by f, gRPC checks the error's Temporary() method to decide if it
Expand Down Expand Up @@ -662,16 +657,6 @@ func WithDisableHealthCheck() DialOption {
})
}

// withHealthCheckFunc replaces the default health check function with the
// provided one. It makes tests easier to change the health check function.
//
// For testing purpose only.
func withHealthCheckFunc(f internal.HealthChecker) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.healthCheckFunc = f
})
}

func defaultDialOptions() dialOptions {
return dialOptions{
copts: transport.ConnectOptions{
Expand All @@ -682,7 +667,6 @@ func defaultDialOptions() dialOptions {
BufferPool: mem.DefaultBufferPool(),
},
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
defaultScheme: "dns",
maxCallAttempts: defaultMaxCallAttempts,
Expand Down
2 changes: 0 additions & 2 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
)

var (
// WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc any // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
// BalancerUnregister is exported by package balancer to unregister a balancer.
Expand Down
65 changes: 31 additions & 34 deletions test/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var testHealthCheckFunc = internal.HealthCheckFunc

func newTestHealthServer() *testHealthServer {
return newTestHealthServerWithWatchFunc(defaultWatchFunc)
}
Expand Down Expand Up @@ -119,14 +117,22 @@ func (s *testHealthServer) SetServingStatus(service string, status healthpb.Heal
s.mu.Unlock()
}

func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper internal.HealthChecker) {
func setupHealthCheckWrapper(t *testing.T) (hcEnterChan chan struct{}, hcExitChan chan struct{}) {
t.Helper()

hcEnterChan = make(chan struct{})
hcExitChan = make(chan struct{})
wrapper = func(ctx context.Context, newStream func(string) (any, error), update func(connectivity.State, error), service string) error {
origHealthCheckFn := internal.HealthCheckFunc
internal.HealthCheckFunc = func(ctx context.Context, newStream func(string) (any, error), update func(connectivity.State, error), service string) error {
close(hcEnterChan)
defer close(hcExitChan)
return testHealthCheckFunc(ctx, newStream, update, service)
return origHealthCheckFn(ctx, newStream, update, service)
}

t.Cleanup(func() {
internal.HealthCheckFunc = origHealthCheckFn
})

return
}

Expand All @@ -153,9 +159,8 @@ func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Lis
}

type clientConfig struct {
balancerName string
testHealthCheckFuncWrapper internal.HealthChecker
extraDialOption []grpc.DialOption
balancerName string
extraDialOption []grpc.DialOption
}

func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resolver) {
Expand All @@ -170,9 +175,6 @@ func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resol
if c.balancerName != "" {
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, c.balancerName)))
}
if c.testHealthCheckFuncWrapper != nil {
opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper))
}
opts = append(opts, c.extraDialOption...)
}

Expand Down Expand Up @@ -281,8 +283,8 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) {
s, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)

hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, hcExitChan := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{})
tc := testgrpc.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
Expand Down Expand Up @@ -359,8 +361,8 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) {
s, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)

hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, hcExitChan := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{})
tc := testgrpc.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
Expand Down Expand Up @@ -409,8 +411,8 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
_, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)

hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, hcExitChan := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{})
tc := testgrpc.NewTestServiceClient(cc)
sc := parseServiceConfig(t, r, `{
"healthCheckConfig": {
Expand Down Expand Up @@ -489,8 +491,8 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
_, lis, ts := setupServer(t, nil)
ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING)

hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, hcExitChan := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{})
tc := testgrpc.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
Expand Down Expand Up @@ -555,8 +557,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
_, lis, ts := setupServer(t, watchFunc)
ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)

hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, hcExitChan := setupHealthCheckWrapper(t)
_, r := setupClient(t, &clientConfig{})

// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
Expand Down Expand Up @@ -618,8 +620,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
s, lis, ts := setupServer(t, watchFunc)
ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING)

hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
_, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, hcExitChan := setupHealthCheckWrapper(t)
_, r := setupClient(t, &clientConfig{})

// The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until
Expand Down Expand Up @@ -659,11 +661,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) {
}

func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()},
})
hcEnterChan, _ := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()}})
tc := testgrpc.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
Expand Down Expand Up @@ -694,10 +693,8 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
}

func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{
testHealthCheckFuncWrapper: testHealthCheckFuncWrapper,
})
hcEnterChan, _ := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{})
tc := testgrpc.NewTestServiceClient(cc)
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}},
Expand Down Expand Up @@ -728,8 +725,8 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
}

func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) {
hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper()
cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper})
hcEnterChan, _ := setupHealthCheckWrapper(t)
cc, r := setupClient(t, &clientConfig{})
tc := testgrpc.NewTestServiceClient(cc)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}})

Expand Down

0 comments on commit 74738cf

Please sign in to comment.