From dc49de8acd511e4d47ad0cbf58bd77f4775c165f Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 21 Nov 2019 10:27:29 -0800 Subject: [PATCH] balancer: add V2Picker, ClientConn.UpdateState, SubConnState.ConnectionError (#3186) Also implement V2 versions of base.*, xds, pickfirst, grpclb, and round robin balancers. --- balancer/balancer.go | 110 +++++++++-- balancer/base/balancer.go | 110 ++++++++--- balancer/base/base.go | 29 +++ balancer/grpclb/grpclb.go | 4 +- balancer/grpclb/grpclb_picker.go | 17 +- balancer/grpclb/grpclb_test.go | 37 ---- balancer/grpclb/grpclb_util.go | 5 +- balancer/roundrobin/roundrobin.go | 18 +- balancer_conn_wrappers.go | 23 ++- balancer_conn_wrappers_test.go | 4 +- balancer_switching_test.go | 1 - balancer_test.go | 1 - balancer_v1_wrapper.go | 34 ++-- clientconn.go | 35 ++-- grpc_test.go | 2 + health/client.go | 14 +- health/client_test.go | 2 +- internal/internal.go | 2 +- picker_wrapper.go | 172 +++++++++++------- picker_wrapper_test.go | 15 +- pickfirst.go | 89 ++++++--- resolver_conn_wrapper_test.go | 20 +- test/balancer_test.go | 33 ++-- test/healthcheck_test.go | 2 +- vet.sh | 2 + .../balancer/edsbalancer/balancergroup.go | 66 +++---- .../edsbalancer/balancergroup_test.go | 119 ++++-------- .../balancer/edsbalancer/edsbalancer.go | 53 +++--- .../balancer/edsbalancer/edsbalancer_test.go | 95 ++++------ xds/internal/balancer/edsbalancer/priority.go | 19 +- .../balancer/edsbalancer/priority_test.go | 98 ++++------ .../balancer/edsbalancer/test_util_test.go | 14 +- xds/internal/balancer/xds.go | 7 +- xds/internal/balancer/xds_test.go | 9 +- 34 files changed, 701 insertions(+), 560 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 0c4c0a70bbaa..531a174a8399 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -117,6 +117,15 @@ type NewSubConnOptions struct { HealthCheckEnabled bool } +// State contains the balancer's state relevant to the gRPC ClientConn. +type State struct { + // State contains the connectivity state of the balancer, which is used to + // determine the state of the ClientConn. + ConnectivityState connectivity.State + // Picker is used to choose connections (SubConns) for RPCs. + Picker V2Picker +} + // ClientConn represents a gRPC ClientConn. // // This interface is to be implemented by gRPC. Users should not need a @@ -137,8 +146,17 @@ type ClientConn interface { // // gRPC will update the connectivity state of the ClientConn, and will call pick // on the new picker to pick new SubConn. + // + // Deprecated: use UpdateState instead UpdateBalancerState(s connectivity.State, p Picker) + // UpdateState notifies gRPC that the balancer's internal state has + // changed. + // + // gRPC will update the connectivity state of the ClientConn, and will call pick + // on the new picker to pick new SubConns. + UpdateState(State) + // ResolveNow is called by balancer to notify gRPC to do a name resolving. ResolveNow(resolver.ResolveNowOptions) @@ -185,11 +203,19 @@ type ConfigParser interface { ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error) } -// PickOptions contains addition information for the Pick operation. -type PickOptions struct { +// PickOptions is a type alias of PickInfo for legacy reasons. +// +// Deprecated: use PickInfo instead. +type PickOptions = PickInfo + +// PickInfo contains additional information for the Pick operation. +type PickInfo struct { // FullMethodName is the method name that NewClientStream() is called // with. The canonical format is /service/Method. FullMethodName string + // Ctx is the RPC's context, and may contain relevant RPC-level information + // like the outgoing header metadata. + Ctx context.Context } // DoneInfo contains additional information for done. @@ -215,7 +241,7 @@ var ( ErrNoSubConnAvailable = errors.New("no SubConn is available") // ErrTransientFailure indicates all SubConns are in TransientFailure. // WaitForReady RPCs will block, non-WaitForReady RPCs will fail. - ErrTransientFailure = errors.New("all SubConns are in TransientFailure") + ErrTransientFailure = TransientFailureError(errors.New("all SubConns are in TransientFailure")) ) // Picker is used by gRPC to pick a SubConn to send an RPC. @@ -223,6 +249,8 @@ var ( // internal state has changed. // // The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState(). +// +// Deprecated: use V2Picker instead type Picker interface { // Pick returns the SubConn to be used to send the RPC. // The returned SubConn must be one returned by NewSubConn(). @@ -243,18 +271,76 @@ type Picker interface { // // If the returned error is not nil: // - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState() - // - If the error is ErrTransientFailure: + // - If the error is ErrTransientFailure or implements IsTransientFailure() + // bool, returning true: // - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState() // is called to pick again; // - Otherwise, RPC will fail with unavailable error. // - Else (error is other non-nil error): - // - The RPC will fail with unavailable error. + // - The RPC will fail with the error's status code, or Unknown if it is + // not a status error. // // The returned done() function will be called once the rpc has finished, // with the final status of that RPC. If the SubConn returned is not a // valid SubConn type, done may not be called. done may be nil if balancer // doesn't care about the RPC status. - Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error) + Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error) +} + +// PickResult contains information related to a connection chosen for an RPC. +type PickResult struct { + // SubConn is the connection to use for this pick, if its state is Ready. + // If the state is not Ready, gRPC will block the RPC until a new Picker is + // provided by the balancer (using ClientConn.UpdateState). The SubConn + // must be one returned by ClientConn.NewSubConn. + SubConn SubConn + + // Done is called when the RPC is completed. If the SubConn is not ready, + // this will be called with a nil parameter. If the SubConn is not a valid + // type, Done may not be called. May be nil if the balancer does not wish + // to be notified when the RPC completes. + Done func(DoneInfo) +} + +type transientFailureError struct { + error +} + +func (e *transientFailureError) IsTransientFailure() bool { return true } + +// TransientFailureError wraps err in an error implementing +// IsTransientFailure() bool, returning true. +func TransientFailureError(err error) error { + return &transientFailureError{error: err} +} + +// V2Picker is used by gRPC to pick a SubConn to send an RPC. +// Balancer is expected to generate a new picker from its snapshot every time its +// internal state has changed. +// +// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState(). +type V2Picker interface { + // Pick returns the connection to use for this RPC and related information. + // + // Pick should not block. If the balancer needs to do I/O or any blocking + // or time-consuming work to service this call, it should return + // ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when + // the Picker is updated (using ClientConn.UpdateState). + // + // If an error is returned: + // + // - If the error is ErrNoSubConnAvailable, gRPC will block until a new + // Picker is provided by the balancer (using ClientConn.UpdateState). + // + // - If the error implements IsTransientFailure() bool, returning true, + // wait for ready RPCs will wait, but non-wait for ready RPCs will be + // terminated with this error's Error() string and status code + // Unavailable. + // + // - Any other errors terminate all RPCs with the code and message + // provided. If the error is not a status error, it will be converted by + // gRPC to a status error with code Unknown. + Pick(info PickInfo) (PickResult, error) } // Balancer takes input from gRPC, manages SubConns, and collects and aggregates @@ -292,8 +378,11 @@ type Balancer interface { // SubConnState describes the state of a SubConn. type SubConnState struct { + // ConnectivityState is the connectivity state of the SubConn. ConnectivityState connectivity.State - // TODO: add last connection error + // ConnectionError is set if the ConnectivityState is TransientFailure, + // describing the reason the SubConn failed. Otherwise, it is nil. + ConnectionError error } // ClientConnState describes the state of a ClientConn relevant to the @@ -335,9 +424,8 @@ type V2Balancer interface { // // It's not thread safe. type ConnectivityStateEvaluator struct { - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. - numTransientFailure uint64 // Number of addrConns in transientFailure. + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. } // RecordTransition records state change happening in subConn and based on that @@ -357,8 +445,6 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne cse.numReady += updateVal case connectivity.Connecting: cse.numConnecting += updateVal - case connectivity.TransientFailure: - cse.numTransientFailure += updateVal } } diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index 1a5c1aa7e909..d952f09f345a 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -20,6 +20,7 @@ package base import ( "context" + "errors" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" @@ -28,25 +29,32 @@ import ( ) type baseBuilder struct { - name string - pickerBuilder PickerBuilder - config Config + name string + pickerBuilder PickerBuilder + v2PickerBuilder V2PickerBuilder + config Config } func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { - return &baseBalancer{ - cc: cc, - pickerBuilder: bb.pickerBuilder, + bal := &baseBalancer{ + cc: cc, + pickerBuilder: bb.pickerBuilder, + v2PickerBuilder: bb.v2PickerBuilder, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, - // Initialize picker to a picker that always return - // ErrNoSubConnAvailable, because when state of a SubConn changes, we - // may call UpdateBalancerState with this picker. - picker: NewErrPicker(balancer.ErrNoSubConnAvailable), - config: bb.config, + config: bb.config, } + // Initialize picker to a picker that always returns + // ErrNoSubConnAvailable, because when state of a SubConn changes, we + // may call UpdateState with this picker. + if bb.pickerBuilder != nil { + bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable) + } else { + bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable) + } + return bal } func (bb *baseBuilder) Name() string { @@ -56,8 +64,9 @@ func (bb *baseBuilder) Name() string { var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer type baseBalancer struct { - cc balancer.ClientConn - pickerBuilder PickerBuilder + cc balancer.ClientConn + pickerBuilder PickerBuilder + v2PickerBuilder V2PickerBuilder csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State @@ -65,6 +74,7 @@ type baseBalancer struct { subConns map[resolver.Address]balancer.SubConn scStates map[balancer.SubConn]connectivity.State picker balancer.Picker + v2Picker balancer.V2Picker config Config } @@ -72,8 +82,15 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) panic("not implemented") } -func (b *baseBalancer) ResolverError(error) { - // Ignore +func (b *baseBalancer) ResolverError(err error) { + switch b.state { + case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: + if b.picker != nil { + b.picker = NewErrPicker(err) + } else { + b.v2Picker = NewErrPickerV2(err) + } + } } func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -114,20 +131,44 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // from it. The picker is // - errPicker with ErrTransientFailure if the balancer is in TransientFailure, // - built by the pickerBuilder with all READY SubConns otherwise. -func (b *baseBalancer) regeneratePicker() { +func (b *baseBalancer) regeneratePicker(err error) { if b.state == connectivity.TransientFailure { - b.picker = NewErrPicker(balancer.ErrTransientFailure) + if b.pickerBuilder != nil { + b.picker = NewErrPicker(balancer.ErrTransientFailure) + } else { + if err != nil { + b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err)) + } else { + // This means the last subchannel transition was not to + // TransientFailure (otherwise err must be set), but the + // aggregate state of the balancer is TransientFailure, meaning + // there are no other addresses. + b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses"))) + } + } return } - readySCs := make(map[resolver.Address]balancer.SubConn) + if b.pickerBuilder != nil { + readySCs := make(map[resolver.Address]balancer.SubConn) - // Filter out all ready SCs from full subConn map. - for addr, sc := range b.subConns { - if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { - readySCs[addr] = sc + // Filter out all ready SCs from full subConn map. + for addr, sc := range b.subConns { + if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { + readySCs[addr] = sc + } + } + b.picker = b.pickerBuilder.Build(readySCs) + } else { + readySCs := make(map[balancer.SubConn]SubConnInfo) + + // Filter out all ready SCs from full subConn map. + for addr, sc := range b.subConns { + if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { + readySCs[sc] = SubConnInfo{Address: addr} + } } + b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) } - b.picker = b.pickerBuilder.Build(readySCs) } func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { @@ -166,10 +207,14 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su // - the aggregated state of balancer became non-TransientFailure from TransientFailure if (s == connectivity.Ready) != (oldS == connectivity.Ready) || (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { - b.regeneratePicker() + b.regeneratePicker(state.ConnectionError) } - b.cc.UpdateBalancerState(b.state, b.picker) + if b.picker != nil { + b.cc.UpdateBalancerState(b.state, b.picker) + } else { + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker}) + } } // Close is a nop because base balancer doesn't have internal state to clean up, @@ -186,6 +231,19 @@ type errPicker struct { err error // Pick() always returns this err. } -func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { return nil, nil, p.err } + +// NewErrPickerV2 returns a V2Picker that always returns err on Pick(). +func NewErrPickerV2(err error) balancer.V2Picker { + return &errPickerV2{err: err} +} + +type errPickerV2 struct { + err error // Pick() always returns this err. +} + +func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{}, p.err +} diff --git a/balancer/base/base.go b/balancer/base/base.go index 34b1f2994a7d..4192918b9e28 100644 --- a/balancer/base/base.go +++ b/balancer/base/base.go @@ -42,6 +42,26 @@ type PickerBuilder interface { Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker } +// V2PickerBuilder creates balancer.V2Picker. +type V2PickerBuilder interface { + // Build returns a picker that will be used by gRPC to pick a SubConn. + Build(info PickerBuildInfo) balancer.V2Picker +} + +// PickerBuildInfo contains information needed by the picker builder to +// construct a picker. +type PickerBuildInfo struct { + // ReadySCs is a map from all ready SubConns to the Addresses used to + // create them. + ReadySCs map[balancer.SubConn]SubConnInfo +} + +// SubConnInfo contains information about a SubConn created by the base +// balancer. +type SubConnInfo struct { + Address resolver.Address // the address used to create this SubConn +} + // NewBalancerBuilder returns a balancer builder. The balancers // built by this builder will use the picker builder to build pickers. func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder { @@ -62,3 +82,12 @@ func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) config: config, } } + +// NewBalancerBuilderV2 returns a base balancer builder configured by the provided config. +func NewBalancerBuilderV2(name string, pb V2PickerBuilder, config Config) balancer.Builder { + return &baseBuilder{ + name: name, + v2PickerBuilder: pb, + config: config, + } +} diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 06d9f2aff1cd..1e34786834b2 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -214,7 +214,7 @@ type lbBalancer struct { state connectivity.State subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. - picker balancer.Picker + picker balancer.V2Picker // Support fallback to resolved backend addresses if there's no response // from remote balancer within fallbackTimeout. remoteBalancerConnected bool @@ -369,7 +369,7 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop lb.regeneratePicker(resetDrop) } - lb.cc.UpdateBalancerState(lb.state, lb.picker) + lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker}) } // fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use diff --git a/balancer/grpclb/grpclb_picker.go b/balancer/grpclb/grpclb_picker.go index 6f023bc5eed3..421dbfa1682e 100644 --- a/balancer/grpclb/grpclb_picker.go +++ b/balancer/grpclb/grpclb_picker.go @@ -19,7 +19,6 @@ package grpclb import ( - "context" "sync" "sync/atomic" @@ -96,8 +95,8 @@ type errPicker struct { err error } -func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { - return nil, nil, p.err +func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{}, p.err } // rrPicker does roundrobin on subConns. It's typically used when there's no @@ -118,12 +117,12 @@ func newRRPicker(readySCs []balancer.SubConn) *rrPicker { } } -func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { p.mu.Lock() defer p.mu.Unlock() sc := p.subConns[p.subConnsNext] p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns) - return sc, nil, nil + return balancer.PickResult{SubConn: sc}, nil } // lbPicker does two layers of picks: @@ -154,7 +153,7 @@ func newLBPicker(serverList []*lbpb.Server, readySCs []balancer.SubConn, stats * } } -func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (p *lbPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { p.mu.Lock() defer p.mu.Unlock() @@ -165,12 +164,12 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balance // If it's a drop, return an error and fail the RPC. if s.Drop { p.stats.drop(s.LoadBalanceToken) - return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb") + return balancer.PickResult{}, status.Errorf(codes.Unavailable, "request dropped by grpclb") } // If not a drop but there's no ready subConns. if len(p.subConns) <= 0 { - return nil, nil, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } // Return the next ready subConn in the list, also collect rpc stats. @@ -183,7 +182,7 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balance p.stats.knownReceived() } } - return sc, done, nil + return balancer.PickResult{SubConn: sc, Done: done}, nil } func (p *lbPicker) updateReadySCs(readySCs []balancer.SubConn) { diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index a38a3a136164..dc2665be6e10 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -829,46 +829,9 @@ func TestFallback(t *testing.T) { } } -type pickfirstFailOnEmptyAddrsListBuilder struct { - balancer.Builder // pick_first builder. -} - -func (b *pickfirstFailOnEmptyAddrsListBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - pf := b.Builder.Build(cc, opts) - return &pickfirstFailOnEmptyAddrsList{pf} -} - -type pickfirstFailOnEmptyAddrsList struct { - balancer.Balancer // pick_first balancer. -} - -func (b *pickfirstFailOnEmptyAddrsList) UpdateClientConnState(s balancer.ClientConnState) error { - addrs := s.ResolverState.Addresses - if len(addrs) == 0 { - return balancer.ErrBadResolverState - } - b.Balancer.HandleResolvedAddrs(addrs, nil) - return nil -} - -func (b *pickfirstFailOnEmptyAddrsList) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - b.Balancer.HandleSubConnStateChange(sc, state.ConnectivityState) -} - -func (b *pickfirstFailOnEmptyAddrsList) ResolverError(error) {} - func TestFallBackWithNoServerAddress(t *testing.T) { defer leakcheck.Check(t) - defer func() func() { - // Override pick_first with a balancer that returns error to trigger - // re-resolve, to test that when grpclb accepts no server address, - // re-resolve is never triggered. - pfb := balancer.Get("pick_first") - balancer.Register(&pickfirstFailOnEmptyAddrsListBuilder{pfb}) - return func() { balancer.Register(pfb) } - }()() - resolveNowCh := make(chan struct{}, 1) r, cleanup := manual.GenerateAndRegisterManualResolver() r.ResolveNowCallback = func(resolver.ResolveNowOptions) { diff --git a/balancer/grpclb/grpclb_util.go b/balancer/grpclb/grpclb_util.go index e916ca3d2d36..636725e5416d 100644 --- a/balancer/grpclb/grpclb_util.go +++ b/balancer/grpclb/grpclb_util.go @@ -24,7 +24,6 @@ import ( "time" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" ) @@ -195,8 +194,8 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { } } -func (ccc *lbCacheClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { - ccc.cc.UpdateBalancerState(s, p) +func (ccc *lbCacheClientConn) UpdateState(s balancer.State) { + ccc.cc.UpdateState(s) } func (ccc *lbCacheClientConn) close() { diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index 29f7a4ddd68f..d4d645501c14 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -22,14 +22,12 @@ package roundrobin import ( - "context" "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/grpcrand" - "google.golang.org/grpc/resolver" ) // Name is the name of round_robin balancer. @@ -37,7 +35,7 @@ const Name = "round_robin" // newBuilder creates a new roundrobin balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilderWithConfig(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilderV2(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true}) } func init() { @@ -46,13 +44,13 @@ func init() { type rrPickerBuilder struct{} -func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker { - grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs) - if len(readySCs) == 0 { - return base.NewErrPicker(balancer.ErrNoSubConnAvailable) +func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker { + grpclog.Infof("roundrobinPicker: newPicker called with info: %v", info) + if len(info.ReadySCs) == 0 { + return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable) } var scs []balancer.SubConn - for _, sc := range readySCs { + for sc := range info.ReadySCs { scs = append(scs, sc) } return &rrPicker{ @@ -74,10 +72,10 @@ type rrPicker struct { next int } -func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { p.mu.Lock() sc := p.subConns[p.next] p.next = (p.next + 1) % len(p.subConns) p.mu.Unlock() - return sc, nil, nil + return balancer.PickResult{SubConn: sc}, nil } diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index e98fe765e36a..824f28e740a9 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -34,6 +34,7 @@ import ( type scStateUpdate struct { sc balancer.SubConn state connectivity.State + err error } // ccBalancerWrapper is a wrapper on top of cc for balancers. @@ -74,7 +75,7 @@ func (ccb *ccBalancerWrapper) watcher() { ccb.balancerMu.Lock() su := t.(*scStateUpdate) if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { - ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state}) + ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) } else { ccb.balancer.HandleSubConnStateChange(su.sc, su.state) } @@ -91,7 +92,7 @@ func (ccb *ccBalancerWrapper) watcher() { for acbw := range scs { ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } - ccb.UpdateBalancerState(connectivity.Connecting, nil) + ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) return } } @@ -101,7 +102,7 @@ func (ccb *ccBalancerWrapper) close() { ccb.done.Fire() } -func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { +func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { // When updating addresses for a SubConn, if the address in use is not in // the new addresses, the old ac will be tearDown() and a new ac will be // created. tearDown() generates a state change with Shutdown state, we @@ -115,6 +116,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co ccb.scBuffer.Put(&scStateUpdate{ sc: sc, state: s, + err: err, }) } @@ -186,6 +188,21 @@ func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balanc ccb.cc.csMgr.updateState(s) } +func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { + ccb.mu.Lock() + defer ccb.mu.Unlock() + if ccb.subConns == nil { + return + } + // Update picker before updating state. Even though the ordering here does + // not matter, it can lead to multiple calls of Pick in the common start-up + // case where we wait for ready and then perform an RPC. If the picker is + // updated later, we could call the "connecting" picker when the state is + // updated, and then call the "ready" picker after the picker gets updated. + ccb.cc.blockingpicker.updatePickerV2(s.Picker) + ccb.cc.csMgr.updateState(s.ConnectivityState) +} + func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { ccb.cc.resolveNow(o) } diff --git a/balancer_conn_wrappers_test.go b/balancer_conn_wrappers_test.go index 1b4ad1c19161..a29a7dd8fade 100644 --- a/balancer_conn_wrappers_test.go +++ b/balancer_conn_wrappers_test.go @@ -43,9 +43,7 @@ func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) { func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error { return b.updateClientConnState(s) } -func (*funcBalancer) ResolverError(error) { - panic("unimplemented") // resolver never reports error -} +func (*funcBalancer) ResolverError(error) {} func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { panic("unimplemented") // we never have sub-conns } diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 7a6449fc0f32..1ccbaba6cdc0 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" - _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/internal" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" diff --git a/balancer_test.go b/balancer_test.go index 77e937cb2fb7..524fd43a6f52 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -28,7 +28,6 @@ import ( "time" "google.golang.org/grpc/codes" - _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/naming" "google.golang.org/grpc/status" ) diff --git a/balancer_v1_wrapper.go b/balancer_v1_wrapper.go index 66e9a44ac4da..db04b08b8429 100644 --- a/balancer_v1_wrapper.go +++ b/balancer_v1_wrapper.go @@ -19,7 +19,6 @@ package grpc import ( - "context" "sync" "google.golang.org/grpc/balancer" @@ -49,7 +48,7 @@ func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.B csEvltr: &balancer.ConnectivityStateEvaluator{}, state: connectivity.Idle, } - cc.UpdateBalancerState(connectivity.Idle, bw) + cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: bw}) go bw.lbWatcher() return bw } @@ -243,7 +242,7 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne if bw.state != sa { bw.state = sa } - bw.cc.UpdateBalancerState(bw.state, bw) + bw.cc.UpdateState(balancer.State{ConnectivityState: bw.state, Picker: bw}) if s == connectivity.Shutdown { // Remove state for this sc. delete(bw.connSt, sc) @@ -275,17 +274,17 @@ func (bw *balancerWrapper) Close() { // The picker is the balancerWrapper itself. // It either blocks or returns error, consistent with v1 balancer Get(). -func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) { +func (bw *balancerWrapper) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) { failfast := true // Default failfast is true. - if ss, ok := rpcInfoFromContext(ctx); ok { + if ss, ok := rpcInfoFromContext(info.Ctx); ok { failfast = ss.failfast } - a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast}) + a, p, err := bw.balancer.Get(info.Ctx, BalancerGetOptions{BlockingWait: !failfast}) if err != nil { - return nil, nil, err + return balancer.PickResult{}, toRPCErr(err) } if p != nil { - done = func(balancer.DoneInfo) { p() } + result.Done = func(balancer.DoneInfo) { p() } defer func() { if err != nil { p() @@ -297,38 +296,39 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) defer bw.mu.Unlock() if bw.pickfirst { // Get the first sc in conns. - for _, sc := range bw.conns { - return sc, done, nil + for _, result.SubConn = range bw.conns { + return result, nil } - return nil, nil, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } - sc, ok1 := bw.conns[resolver.Address{ + var ok1 bool + result.SubConn, ok1 = bw.conns[resolver.Address{ Addr: a.Addr, Type: resolver.Backend, ServerName: "", Metadata: a.Metadata, }] - s, ok2 := bw.connSt[sc] + s, ok2 := bw.connSt[result.SubConn] if !ok1 || !ok2 { // This can only happen due to a race where Get() returned an address // that was subsequently removed by Notify. In this case we should // retry always. - return nil, nil, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } switch s.s { case connectivity.Ready, connectivity.Idle: - return sc, done, nil + return result, nil case connectivity.Shutdown, connectivity.TransientFailure: // If the returned sc has been shut down or is in transient failure, // return error, and this RPC will fail or wait for another picker (if // non-failfast). - return nil, nil, balancer.ErrTransientFailure + return balancer.PickResult{}, balancer.ErrTransientFailure default: // For other states (connecting or unknown), the v1 balancer would // traditionally wait until ready and then issue the RPC. Returning // ErrNoSubConnAvailable will be a slight improvement in that it will // allow the balancer to choose another address in case others are // connected. - return nil, nil, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } } diff --git a/clientconn.go b/clientconn.go index e0d3642ea474..14ce9c76aa37 100644 --- a/clientconn.go +++ b/clientconn.go @@ -688,7 +688,7 @@ func (cc *ClientConn) switchBalancer(name string) { cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) } -func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { +func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() @@ -696,7 +696,7 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi } // TODO(bar switching) send updates to all balancer wrappers when balancer // gracefully switching is supported. - cc.balancerWrapper.handleSubConnStateChange(sc, s) + cc.balancerWrapper.handleSubConnStateChange(sc, s, err) cc.mu.Unlock() } @@ -793,7 +793,7 @@ func (ac *addrConn) connect() error { } // Update connectivity state within the lock to prevent subsequent or // concurrent calls from resetting the transport more than once. - ac.updateConnectivityState(connectivity.Connecting) + ac.updateConnectivityState(connectivity.Connecting, nil) ac.mu.Unlock() // Start a goroutine connecting to the server asynchronously. @@ -879,7 +879,8 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { } func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { - t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ + t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ + Ctx: ctx, FullMethodName: method, }) if err != nil { @@ -1048,7 +1049,7 @@ type addrConn struct { } // Note: this requires a lock on ac.mu. -func (ac *addrConn) updateConnectivityState(s connectivity.State) { +func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { if ac.state == s { return } @@ -1061,7 +1062,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State) { Severity: channelz.CtINFO, }) } - ac.cc.handleSubConnStateChange(ac.acbw, s) + ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) } // adjustParams updates parameters used to create transports upon @@ -1110,7 +1111,7 @@ func (ac *addrConn) resetTransport() { // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm connectDeadline := time.Now().Add(dialDuration) - ac.updateConnectivityState(connectivity.Connecting) + ac.updateConnectivityState(connectivity.Connecting, nil) ac.transport = nil ac.mu.Unlock() @@ -1123,7 +1124,7 @@ func (ac *addrConn) resetTransport() { ac.mu.Unlock() return } - ac.updateConnectivityState(connectivity.TransientFailure) + ac.updateConnectivityState(connectivity.TransientFailure, err) // Backoff. b := ac.resetBackoff @@ -1179,6 +1180,7 @@ func (ac *addrConn) resetTransport() { // first successful one. It returns the transport, the address and a Event in // the successful case. The Event fires when the returned transport disconnects. func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { + var firstConnErr error for _, addr := range addrs { ac.mu.Lock() if ac.state == connectivity.Shutdown { @@ -1207,11 +1209,14 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T if err == nil { return newTr, addr, reconnect, nil } + if firstConnErr == nil { + firstConnErr = err + } ac.cc.blockingpicker.updateConnectionError(err) } // Couldn't connect to any address. - return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address") + return nil, resolver.Address{}, nil, firstConnErr } // createTransport creates a connection to addr. It returns the transport and a @@ -1244,7 +1249,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne // state to Connecting. // // TODO: this should be Idle when grpc-go properly supports it. - ac.updateConnectivityState(connectivity.Connecting) + ac.updateConnectivityState(connectivity.Connecting, nil) } }) ac.mu.Unlock() @@ -1259,7 +1264,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne // state to Connecting. // // TODO: this should be Idle when grpc-go properly supports it. - ac.updateConnectivityState(connectivity.Connecting) + ac.updateConnectivityState(connectivity.Connecting, nil) } }) ac.mu.Unlock() @@ -1316,7 +1321,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) { var healthcheckManagingState bool defer func() { if !healthcheckManagingState { - ac.updateConnectivityState(connectivity.Ready) + ac.updateConnectivityState(connectivity.Ready, nil) } }() @@ -1352,13 +1357,13 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) { ac.mu.Unlock() return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) } - setConnectivityState := func(s connectivity.State) { + setConnectivityState := func(s connectivity.State, lastErr error) { ac.mu.Lock() defer ac.mu.Unlock() if ac.transport != currentTr { return } - ac.updateConnectivityState(s) + ac.updateConnectivityState(s, lastErr) } // Start the health checking stream. go func() { @@ -1424,7 +1429,7 @@ func (ac *addrConn) tearDown(err error) { ac.transport = nil // We have to set the state to Shutdown before anything else to prevent races // between setting the state and logic that waits on context cancellation / etc. - ac.updateConnectivityState(connectivity.Shutdown) + ac.updateConnectivityState(connectivity.Shutdown, nil) ac.cancel() ac.curAddr = resolver.Address{} if err == errConnDrain && curTr != nil { diff --git a/grpc_test.go b/grpc_test.go index 5c132dd97379..7c41c57ae6d7 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -24,6 +24,8 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/leakcheck" + + _ "google.golang.org/grpc/grpclog/glogger" ) type s struct{} diff --git a/health/client.go b/health/client.go index 419f06040d1d..b5bee4838024 100644 --- a/health/client.go +++ b/health/client.go @@ -56,7 +56,7 @@ const healthCheckMethod = "/grpc.health.v1.Health/Watch" // This function implements the protocol defined at: // https://github.com/grpc/grpc/blob/master/doc/health-checking.md -func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), service string) error { +func clientHealthCheck(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), service string) error { tryCnt := 0 retryConnection: @@ -70,7 +70,7 @@ retryConnection: if ctx.Err() != nil { return nil } - setConnectivityState(connectivity.Connecting) + setConnectivityState(connectivity.Connecting, nil) rawS, err := newStream(healthCheckMethod) if err != nil { continue retryConnection @@ -79,7 +79,7 @@ retryConnection: s, ok := rawS.(grpc.ClientStream) // Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes. if !ok { - setConnectivityState(connectivity.Ready) + setConnectivityState(connectivity.Ready, nil) return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS) } @@ -95,22 +95,22 @@ retryConnection: // Reports healthy for the LBing purposes if health check is not implemented in the server. if status.Code(err) == codes.Unimplemented { - setConnectivityState(connectivity.Ready) + setConnectivityState(connectivity.Ready, nil) return err } // Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED. if err != nil { - setConnectivityState(connectivity.TransientFailure) + setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err)) continue retryConnection } // As a message has been received, removes the need for backoff for the next retry by resetting the try count. tryCnt = 0 if resp.Status == healthpb.HealthCheckResponse_SERVING { - setConnectivityState(connectivity.Ready) + setConnectivityState(connectivity.Ready, nil) } else { - setConnectivityState(connectivity.TransientFailure) + setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status)) } } } diff --git a/health/client_test.go b/health/client_test.go index 394c62d168e5..ee7129814581 100644 --- a/health/client_test.go +++ b/health/client_test.go @@ -51,7 +51,7 @@ func TestClientHealthCheckBackoff(t *testing.T) { } defer func() { backoffFunc = oldBackoffFunc }() - clientHealthCheck(context.Background(), newStream, func(connectivity.State) {}, "test") + clientHealthCheck(context.Background(), newStream, func(connectivity.State, error) {}, "test") if !reflect.DeepEqual(got, want) { t.Fatalf("Backoff durations for %v retries are %v. (expected: %v)", maxRetries, got, want) diff --git a/internal/internal.go b/internal/internal.go index b96b3597caa1..eae18e18ccd7 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -60,7 +60,7 @@ var ( // // The health checking protocol is defined at: // https://github.com/grpc/grpc/blob/master/doc/health-checking.md -type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error +type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), serviceName string) error const ( // CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode. diff --git a/picker_wrapper.go b/picker_wrapper.go index 45baa2ae13da..00447894f07b 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -20,6 +20,7 @@ package grpc import ( "context" + "fmt" "io" "sync" @@ -31,49 +32,78 @@ import ( "google.golang.org/grpc/status" ) +// v2PickerWrapper wraps a balancer.Picker while providing the +// balancer.V2Picker API. It requires a pickerWrapper to generate errors +// including the latest connectionError. To be deleted when balancer.Picker is +// updated to the balancer.V2Picker API. +type v2PickerWrapper struct { + picker balancer.Picker + connErr *connErr +} + +func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + sc, done, err := v.picker.Pick(info.Ctx, info) + if err != nil { + if err == balancer.ErrTransientFailure { + return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError())) + } + return balancer.PickResult{}, err + } + return balancer.PickResult{SubConn: sc, Done: done}, nil +} + // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick // actions and unblock when there's a picker update. type pickerWrapper struct { mu sync.Mutex done bool blockingCh chan struct{} - picker balancer.Picker + picker balancer.V2Picker - // The latest connection happened. - connErrMu sync.Mutex - connErr error + // The latest connection error. TODO: remove when V1 picker is deprecated; + // balancer should be responsible for providing the error. + *connErr } -func newPickerWrapper() *pickerWrapper { - bp := &pickerWrapper{blockingCh: make(chan struct{})} - return bp +type connErr struct { + mu sync.Mutex + err error } -func (bp *pickerWrapper) updateConnectionError(err error) { - bp.connErrMu.Lock() - bp.connErr = err - bp.connErrMu.Unlock() +func (c *connErr) updateConnectionError(err error) { + c.mu.Lock() + c.err = err + c.mu.Unlock() } -func (bp *pickerWrapper) connectionError() error { - bp.connErrMu.Lock() - err := bp.connErr - bp.connErrMu.Unlock() +func (c *connErr) connectionError() error { + c.mu.Lock() + err := c.err + c.mu.Unlock() return err } +func newPickerWrapper() *pickerWrapper { + return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}} +} + // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. -func (bp *pickerWrapper) updatePicker(p balancer.Picker) { - bp.mu.Lock() - if bp.done { - bp.mu.Unlock() +func (pw *pickerWrapper) updatePicker(p balancer.Picker) { + pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr}) +} + +// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. +func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) { + pw.mu.Lock() + if pw.done { + pw.mu.Unlock() return } - bp.picker = p - // bp.blockingCh should never be nil. - close(bp.blockingCh) - bp.blockingCh = make(chan struct{}) - bp.mu.Unlock() + pw.picker = p + // pw.blockingCh should never be nil. + close(pw.blockingCh) + pw.blockingCh = make(chan struct{}) + pw.mu.Unlock() } func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) { @@ -100,83 +130,85 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f // - the current picker returns other errors and failfast is false. // - the subConn returned by the current picker is not READY // When one of these situations happens, pick blocks until the picker gets updated. -func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) { +func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) { var ch chan struct{} + var lastPickErr error for { - bp.mu.Lock() - if bp.done { - bp.mu.Unlock() + pw.mu.Lock() + if pw.done { + pw.mu.Unlock() return nil, nil, ErrClientConnClosing } - if bp.picker == nil { - ch = bp.blockingCh + if pw.picker == nil { + ch = pw.blockingCh } - if ch == bp.blockingCh { + if ch == pw.blockingCh { // This could happen when either: - // - bp.picker is nil (the previous if condition), or + // - pw.picker is nil (the previous if condition), or // - has called pick on the current picker. - bp.mu.Unlock() + pw.mu.Unlock() select { case <-ctx.Done(): - if connectionErr := bp.connectionError(); connectionErr != nil { - switch ctx.Err() { - case context.DeadlineExceeded: - return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr) - case context.Canceled: - return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr) - } + var errStr string + if lastPickErr != nil { + errStr = "latest balancer error: " + lastPickErr.Error() + } else if connectionErr := pw.connectionError(); connectionErr != nil { + errStr = "latest connection error: " + connectionErr.Error() + } else { + errStr = ctx.Err().Error() + } + switch ctx.Err() { + case context.DeadlineExceeded: + return nil, nil, status.Error(codes.DeadlineExceeded, errStr) + case context.Canceled: + return nil, nil, status.Error(codes.Canceled, errStr) } - return nil, nil, ctx.Err() case <-ch: } continue } - ch = bp.blockingCh - p := bp.picker - bp.mu.Unlock() + ch = pw.blockingCh + p := pw.picker + pw.mu.Unlock() - subConn, done, err := p.Pick(ctx, opts) + pickResult, err := p.Pick(info) if err != nil { - switch err { - case balancer.ErrNoSubConnAvailable: + if err == balancer.ErrNoSubConnAvailable { continue - case balancer.ErrTransientFailure: + } + if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() { if !failfast { + lastPickErr = err continue } - return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError()) - case context.DeadlineExceeded: - return nil, nil, status.Error(codes.DeadlineExceeded, err.Error()) - case context.Canceled: - return nil, nil, status.Error(codes.Canceled, err.Error()) - default: - if _, ok := status.FromError(err); ok { - return nil, nil, err - } - // err is some other error. - return nil, nil, status.Error(codes.Unknown, err.Error()) + return nil, nil, status.Error(codes.Unavailable, err.Error()) } + if _, ok := status.FromError(err); ok { + return nil, nil, err + } + // err is some other error. + return nil, nil, status.Error(codes.Unknown, err.Error()) } - acw, ok := subConn.(*acBalancerWrapper) + acw, ok := pickResult.SubConn.(*acBalancerWrapper) if !ok { grpclog.Error("subconn returned from pick is not *acBalancerWrapper") continue } if t, ok := acw.getAddrConn().getReadyTransport(); ok { if channelz.IsOn() { - return t, doneChannelzWrapper(acw, done), nil + return t, doneChannelzWrapper(acw, pickResult.Done), nil } - return t, done, nil + return t, pickResult.Done, nil } - if done != nil { + if pickResult.Done != nil { // Calling done with nil error, no bytes sent and no bytes received. // DoneInfo with default value works. - done(balancer.DoneInfo{}) + pickResult.Done(balancer.DoneInfo{}) } grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick") // If ok == false, ac.state is not READY. @@ -186,12 +218,12 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer. } } -func (bp *pickerWrapper) close() { - bp.mu.Lock() - defer bp.mu.Unlock() - if bp.done { +func (pw *pickerWrapper) close() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { return } - bp.done = true - close(bp.blockingCh) + pw.done = true + close(pw.blockingCh) } diff --git a/picker_wrapper_test.go b/picker_wrapper_test.go index d6e8c36361be..50916a2dfde0 100644 --- a/picker_wrapper_test.go +++ b/picker_wrapper_test.go @@ -26,9 +26,10 @@ import ( "time" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" - _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/internal/transport" + "google.golang.org/grpc/status" ) const goroutineCount = 5 @@ -54,7 +55,7 @@ type testingPicker struct { maxCalled int64 } -func (p *testingPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (p *testingPicker) Pick(ctx context.Context, info balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { if atomic.AddInt64(&p.maxCalled, -1) < 0 { return nil, nil, fmt.Errorf("pick called to many times (> goroutineCount)") } @@ -68,7 +69,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) { bp := newPickerWrapper() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, _, err := bp.pick(ctx, true, balancer.PickOptions{}); err != context.DeadlineExceeded { + if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded { t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err) } } @@ -79,7 +80,7 @@ func (s) TestBlockingPick(t *testing.T) { var finishedCount uint64 for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT { + if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { t.Errorf("bp.pick returned non-nil error: %v", err) } atomic.AddUint64(&finishedCount, 1) @@ -99,7 +100,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) { // All goroutines should block because picker returns no sc available. for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT { + if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { t.Errorf("bp.pick returned non-nil error: %v", err) } atomic.AddUint64(&finishedCount, 1) @@ -120,7 +121,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) { // picks are not failfast. for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(context.Background(), false, balancer.PickOptions{}); err != nil || tr != testT { + if tr, _, err := bp.pick(context.Background(), false, balancer.PickInfo{}); err != nil || tr != testT { t.Errorf("bp.pick returned non-nil error: %v", err) } atomic.AddUint64(&finishedCount, 1) @@ -140,7 +141,7 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) { // All goroutines should block because sc is not ready. for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(context.Background(), true, balancer.PickOptions{}); err != nil || tr != testT { + if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { t.Errorf("bp.pick returned non-nil error: %v", err) } atomic.AddUint64(&finishedCount, 1) diff --git a/pickfirst.go b/pickfirst.go index ed05b02ed96e..c43dac9ad842 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -19,12 +19,14 @@ package grpc import ( - "context" + "errors" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" ) // PickFirstBalancerName is the name of the pick_first balancer. @@ -45,35 +47,67 @@ func (*pickfirstBuilder) Name() string { } type pickfirstBalancer struct { - cc balancer.ClientConn - sc balancer.SubConn + state connectivity.State + cc balancer.ClientConn + sc balancer.SubConn } +var _ balancer.V2Balancer = &pickfirstBalancer{} // Assert we implement v2 + func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { if err != nil { - if grpclog.V(2) { - grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err) - } + b.ResolverError(err) return } + b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) // Ignore error +} + +func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s}) +} + +func (b *pickfirstBalancer) ResolverError(err error) { + switch b.state { + case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: + // Set a failing picker if we don't have a good picker. + b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: status.Errorf(codes.Unavailable, "name resolver error: %v", err)}}, + ) + } + if grpclog.V(2) { + grpclog.Infof("pickfirstBalancer: ResolverError called with error %v", err) + } +} + +func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error { + if len(cs.ResolverState.Addresses) == 0 { + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } if b.sc == nil { - b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) + var err error + b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{}) if err != nil { - //TODO(yuxuanli): why not change the cc state to Idle? if grpclog.V(2) { grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) } - return + b.state = connectivity.TransientFailure + b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: status.Errorf(codes.Unavailable, "error creating connection: %v", err)}}, + ) + return balancer.ErrBadResolverState } - b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc}) + b.state = connectivity.Idle + b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}}) b.sc.Connect() } else { - b.sc.UpdateAddresses(addrs) + b.sc.UpdateAddresses(cs.ResolverState.Addresses) b.sc.Connect() } + return nil } -func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { +func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { if grpclog.V(2) { grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s) } @@ -83,18 +117,28 @@ func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s conn } return } - if s == connectivity.Shutdown { + b.state = s.ConnectivityState + if s.ConnectivityState == connectivity.Shutdown { b.sc = nil return } - switch s { + switch s.ConnectivityState { case connectivity.Ready, connectivity.Idle: - b.cc.UpdateBalancerState(s, &picker{sc: sc}) + b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}}) case connectivity.Connecting: - b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrNoSubConnAvailable}) + b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) case connectivity.TransientFailure: - b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrTransientFailure}) + err := balancer.ErrTransientFailure + // TODO: this can be unconditional after the V1 API is removed, as + // SubConnState will always contain a connection error. + if s.ConnectionError != nil { + err = balancer.TransientFailureError(s.ConnectionError) + } + b.cc.UpdateState(balancer.State{ + ConnectivityState: s.ConnectivityState, + Picker: &picker{err: err}, + }) } } @@ -102,15 +146,12 @@ func (b *pickfirstBalancer) Close() { } type picker struct { - err error - sc balancer.SubConn + result balancer.PickResult + err error } -func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { - if p.err != nil { - return nil, nil, p.err - } - return p.sc, nil, nil +func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + return p.result, p.err } func init() { diff --git a/resolver_conn_wrapper_test.go b/resolver_conn_wrapper_test.go index b1c352a1d17d..bf9c3c293ff4 100644 --- a/resolver_conn_wrapper_test.go +++ b/resolver_conn_wrapper_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -178,6 +179,19 @@ func testResolverErrorPolling(t *testing.T, badUpdate func(*manual.Resolver), go } } +const happyBalancerName = "happy balancer" + +func init() { + // Register a balancer that never returns an error from + // UpdateClientConnState, and doesn't do anything else either. + fb := &funcBalancer{ + updateClientConnState: func(s balancer.ClientConnState) error { + return nil + }, + } + balancer.Register(&funcBalancerBuilder{name: happyBalancerName, instance: fb}) +} + // TestResolverErrorPolling injects resolver errors and verifies ResolveNow is // called with the appropriate backoff strategy being consulted between // ResolveNow calls. @@ -188,7 +202,8 @@ func (s) TestResolverErrorPolling(t *testing.T) { // UpdateState will block if ResolveNow is being called (which blocks on // rn), so call it in a goroutine. go r.CC.UpdateState(resolver.State{}) - }) + }, + WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName))) } // TestServiceConfigErrorPolling injects a service config error and verifies @@ -202,7 +217,8 @@ func (s) TestServiceConfigErrorPolling(t *testing.T) { // UpdateState will block if ResolveNow is being called (which blocks on // rn), so call it in a goroutine. go r.CC.UpdateState(resolver.State{}) - }) + }, + WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName))) } // TestResolverErrorInBuild makes the resolver.Builder call into the ClientConn diff --git a/test/balancer_test.go b/test/balancer_test.go index 30c01eb03755..afa9ec1b7ea2 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -49,7 +49,7 @@ type testBalancer struct { sc balancer.SubConn newSubConnOptions balancer.NewSubConnOptions - pickOptions []balancer.PickOptions + pickInfos []balancer.PickInfo doneInfo []balancer.DoneInfo } @@ -70,7 +70,7 @@ func (b *testBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) grpclog.Errorf("testBalancer: failed to NewSubConn: %v", err) return } - b.cc.UpdateBalancerState(connectivity.Connecting, &picker{sc: b.sc, bal: b}) + b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &picker{sc: b.sc, bal: b}}) b.sc.Connect() } } @@ -88,11 +88,11 @@ func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectiv switch s { case connectivity.Ready, connectivity.Idle: - b.cc.UpdateBalancerState(s, &picker{sc: sc, bal: b}) + b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{sc: sc, bal: b}}) case connectivity.Connecting: - b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrNoSubConnAvailable, bal: b}) + b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}}) case connectivity.TransientFailure: - b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrTransientFailure, bal: b}) + b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}}) } } @@ -105,12 +105,13 @@ type picker struct { bal *testBalancer } -func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if p.err != nil { - return nil, nil, p.err + return balancer.PickResult{}, p.err } - p.bal.pickOptions = append(p.bal.pickOptions, opts) - return p.sc, func(d balancer.DoneInfo) { p.bal.doneInfo = append(p.bal.doneInfo, d) }, nil + info.Ctx = nil // Do not validate context. + p.bal.pickInfos = append(p.bal.pickInfos, info) + return balancer.PickResult{SubConn: p.sc, Done: func(d balancer.DoneInfo) { p.bal.doneInfo = append(p.bal.doneInfo, d) }}, nil } func (s) TestCredsBundleFromBalancer(t *testing.T) { @@ -177,8 +178,8 @@ func testDoneInfo(t *testing.T, e env) { if len(b.doneInfo) < 2 || !reflect.DeepEqual(b.doneInfo[1].Trailer, testTrailerMetadata) { t.Fatalf("b.doneInfo = %v; want b.doneInfo[1].Trailer = %v", b.doneInfo, testTrailerMetadata) } - if len(b.pickOptions) != len(b.doneInfo) { - t.Fatalf("Got %d picks, but %d doneInfo, want equal amount", len(b.pickOptions), len(b.doneInfo)) + if len(b.pickInfos) != len(b.doneInfo) { + t.Fatalf("Got %d picks, but %d doneInfo, want equal amount", len(b.pickInfos), len(b.doneInfo)) } // To test done() is always called, even if it's returned with a non-Ready // SubConn. @@ -194,8 +195,8 @@ func testDoneInfo(t *testing.T, e env) { }() te.srv.Stop() <-finished - if len(b.pickOptions) != len(b.doneInfo) { - t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickOptions), len(b.doneInfo)) + if len(b.pickInfos) != len(b.doneInfo) { + t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickInfos), len(b.doneInfo)) } } @@ -246,11 +247,11 @@ func testDoneLoads(t *testing.T, e env) { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, nil) } - poWant := []balancer.PickOptions{ + piWant := []balancer.PickInfo{ {FullMethodName: "/grpc.testing.TestService/EmptyCall"}, } - if !reflect.DeepEqual(b.pickOptions, poWant) { - t.Fatalf("b.pickOptions = %v; want %v", b.pickOptions, poWant) + if !reflect.DeepEqual(b.pickInfos, piWant) { + t.Fatalf("b.pickInfos = %v; want %v", b.pickInfos, piWant) } if len(b.doneInfo) < 1 { diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 5b95c6a257b0..1aa9d6b47c20 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -115,7 +115,7 @@ func (s *testHealthServer) SetServingStatus(service string, status healthpb.Heal func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struct{}, wrapper internal.HealthChecker) { hcEnterChan = make(chan struct{}) hcExitChan = make(chan struct{}) - wrapper = func(ctx context.Context, newStream func(string) (interface{}, error), update func(state connectivity.State), service string) error { + wrapper = func(ctx context.Context, newStream func(string) (interface{}, error), update func(connectivity.State, error), service string) error { close(hcEnterChan) defer close(hcExitChan) return testHealthCheckFunc(ctx, newStream, update, service) diff --git a/vet.sh b/vet.sh index b798aa255c74..798921acc852 100755 --- a/vet.sh +++ b/vet.sh @@ -128,6 +128,8 @@ staticcheck -go 1.9 -checks 'inherit,-ST1015' ./... > "${SC_OUT}" || true .NewServiceConfig .Metadata is deprecated: use Attributes .Type is deprecated: use Attributes +.UpdateBalancerState +balancer.Picker grpc.CallCustomCodec grpc.Code grpc.Compressor diff --git a/xds/internal/balancer/edsbalancer/balancergroup.go b/xds/internal/balancer/edsbalancer/balancergroup.go index 20c4f3824747..fc10f3d34123 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup.go +++ b/xds/internal/balancer/edsbalancer/balancergroup.go @@ -17,7 +17,6 @@ package edsbalancer import ( - "context" "fmt" "sync" "time" @@ -55,9 +54,8 @@ type subBalancerWithConfig struct { id internal.Locality group *balancerGroup - mu sync.Mutex - state connectivity.State - picker balancer.Picker + mu sync.Mutex + state balancer.State // The static part of sub-balancer. Keeps balancerBuilders and addresses. // To be used when restarting sub-balancer. @@ -68,11 +66,15 @@ type subBalancerWithConfig struct { balancer balancer.Balancer } -// UpdateBalancerState overrides balancer.ClientConn, to keep state and picker. func (sbc *subBalancerWithConfig) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { + grpclog.Fatalln("not implemented") +} + +// UpdateState overrides balancer.ClientConn, to keep state and picker. +func (sbc *subBalancerWithConfig) UpdateState(state balancer.State) { sbc.mu.Lock() - sbc.state, sbc.picker = state, picker - sbc.group.updateBalancerState(sbc.id, state, picker) + sbc.state = state + sbc.group.updateBalancerState(sbc.id, state) sbc.mu.Unlock() } @@ -84,8 +86,8 @@ func (sbc *subBalancerWithConfig) NewSubConn(addrs []resolver.Address, opts bala func (sbc *subBalancerWithConfig) updateBalancerStateWithCachedPicker() { sbc.mu.Lock() - if sbc.picker != nil { - sbc.group.updateBalancerState(sbc.id, sbc.state, sbc.picker) + if sbc.state.Picker != nil { + sbc.group.updateBalancerState(sbc.id, sbc.state) } sbc.mu.Unlock() } @@ -144,7 +146,7 @@ func (sbc *subBalancerWithConfig) stopBalancer() { type pickerState struct { weight uint32 - picker balancer.Picker + picker balancer.V2Picker state connectivity.State } @@ -359,7 +361,7 @@ func (bg *balancerGroup) remove(id internal.Locality) { // Normally picker update is triggered by SubConn state change. But we // want to update state and picker to reflect the changes, too. Because // we don't want `ClientConn` to pick this sub-balancer anymore. - bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState)) } bg.incomingMu.Unlock() } @@ -411,7 +413,7 @@ func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) { // Normally picker update is triggered by SubConn state change. But we // want to update state and picker to reflect the changes, too. Because // `ClientConn` should do pick with the new weights now. - bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState)) } } @@ -481,8 +483,8 @@ func (bg *balancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resol // updateBalancerState: create an aggregated picker and an aggregated // connectivity state, then forward to ClientConn. -func (bg *balancerGroup) updateBalancerState(id internal.Locality, state connectivity.State, picker balancer.Picker) { - grpclog.Infof("balancer group: update balancer state: %v, %v, %p", id, state, picker) +func (bg *balancerGroup) updateBalancerState(id internal.Locality, state balancer.State) { + grpclog.Infof("balancer group: update balancer state: %v, %v", id, state) bg.incomingMu.Lock() defer bg.incomingMu.Unlock() @@ -493,10 +495,10 @@ func (bg *balancerGroup) updateBalancerState(id internal.Locality, state connect grpclog.Warningf("balancer group: pickerState for %v not found when update picker/state", id) return } - pickerSt.picker = newLoadReportPicker(picker, id, bg.loadStore) - pickerSt.state = state + pickerSt.picker = newLoadReportPicker(state.Picker, id, bg.loadStore) + pickerSt.state = state.ConnectivityState if bg.incomingStarted { - bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState)) } } @@ -533,7 +535,7 @@ func (bg *balancerGroup) close() { bg.balancerCache.Clear(true) } -func buildPickerAndState(m map[internal.Locality]*pickerState) (connectivity.State, balancer.Picker) { +func buildPickerAndState(m map[internal.Locality]*pickerState) balancer.State { var readyN, connectingN int readyPickerWithWeights := make([]pickerState, 0, len(m)) for _, ps := range m { @@ -555,9 +557,9 @@ func buildPickerAndState(m map[internal.Locality]*pickerState) (connectivity.Sta aggregatedState = connectivity.TransientFailure } if aggregatedState == connectivity.TransientFailure { - return aggregatedState, base.NewErrPicker(balancer.ErrTransientFailure) + return balancer.State{aggregatedState, base.NewErrPickerV2(balancer.ErrTransientFailure)} } - return aggregatedState, newPickerGroup(readyPickerWithWeights) + return balancer.State{aggregatedState, newPickerGroup(readyPickerWithWeights)} } // RandomWRR constructor, to be modified in tests. @@ -587,12 +589,12 @@ func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup { } } -func (pg *pickerGroup) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { +func (pg *pickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if pg.length <= 0 { - return nil, nil, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } - p := pg.w.Next().(balancer.Picker) - return p.Pick(ctx, opts) + p := pg.w.Next().(balancer.V2Picker) + return p.Pick(info) } const ( @@ -601,26 +603,26 @@ const ( ) type loadReportPicker struct { - balancer.Picker + p balancer.V2Picker id internal.Locality loadStore lrs.Store } -func newLoadReportPicker(p balancer.Picker, id internal.Locality, loadStore lrs.Store) *loadReportPicker { +func newLoadReportPicker(p balancer.V2Picker, id internal.Locality, loadStore lrs.Store) *loadReportPicker { return &loadReportPicker{ - Picker: p, + p: p, id: id, loadStore: loadStore, } } -func (lrp *loadReportPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { - conn, done, err = lrp.Picker.Pick(ctx, opts) +func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + res, err := lrp.p.Pick(info) if lrp.loadStore != nil && err == nil { lrp.loadStore.CallStarted(lrp.id) - td := done - done = func(info balancer.DoneInfo) { + td := res.Done + res.Done = func(info balancer.DoneInfo) { lrp.loadStore.CallFinished(lrp.id, info.Err) if load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport); ok { lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization) @@ -637,5 +639,5 @@ func (lrp *loadReportPicker) Pick(ctx context.Context, opts balancer.PickOptions } } } - return + return res, err } diff --git a/xds/internal/balancer/edsbalancer/balancergroup_test.go b/xds/internal/balancer/edsbalancer/balancergroup_test.go index 58cae6e58e29..d64296b8f0c5 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup_test.go +++ b/xds/internal/balancer/edsbalancer/balancergroup_test.go @@ -17,7 +17,6 @@ package edsbalancer import ( - "context" "fmt" "reflect" "testing" @@ -49,6 +48,13 @@ func init() { defaultSubBalancerCloseTimeout = time.Millisecond } +func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn { + return func() balancer.SubConn { + scst, _ := p.Pick(balancer.PickInfo{}) + return scst.SubConn + } +} + // 1 balancer, 1 backend -> 2 backends -> 1 backend. func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { cc := newTestClientConn(t) @@ -68,9 +74,9 @@ func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { // Test pick with one backend. p1 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc1) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1) + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc1) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } @@ -84,10 +90,7 @@ func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { // Test roundrobin pick. p2 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -102,9 +105,9 @@ func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { // Test pick with only the second subconn. p3 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc2) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + gotSC, _ := p3.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSC.SubConn, sc2) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2) } } } @@ -134,10 +137,7 @@ func TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { // Test roundrobin on the last picker. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -173,10 +173,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { // Test roundrobin on the last picker. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -186,10 +183,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { // Expect two sc1's in the result, because balancer1 will be picked twice, // but there's only one sc in it. want = []balancer.SubConn{sc1, sc1, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -202,10 +196,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown) p3 := <-cc.newPickerCh want = []balancer.SubConn{sc1, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -213,10 +204,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { bg.handleSubConnStateChange(sc1, connectivity.TransientFailure) p4 := <-cc.newPickerCh want = []balancer.SubConn{sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -224,7 +212,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { bg.handleSubConnStateChange(sc4, connectivity.Connecting) p5 := <-cc.newPickerCh for i := 0; i < 5; i++ { - if _, _, err := p5.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrNoSubConnAvailable { + if _, err := p5.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) } } @@ -233,7 +221,7 @@ func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { bg.handleSubConnStateChange(sc4, connectivity.TransientFailure) p6 := <-cc.newPickerCh for i := 0; i < 5; i++ { - if _, _, err := p6.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + if _, err := p6.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) } } @@ -270,10 +258,7 @@ func TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { // Test roundrobin on the last picker. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -308,10 +293,7 @@ func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2, sc3} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -323,10 +305,7 @@ func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { } p2 := <-cc.newPickerCh want = []balancer.SubConn{sc1, sc3} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -340,7 +319,7 @@ func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { } p3 := <-cc.newPickerCh for i := 0; i < 5; i++ { - if _, _, err := p3.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) } } @@ -377,10 +356,7 @@ func TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { // Test roundrobin on the last picker. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -389,10 +365,7 @@ func TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { // Test roundrobin with new weight. p2 := <-cc.newPickerCh want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -440,11 +413,11 @@ func TestBalancerGroup_LoadReport(t *testing.T) { wantCost []testServerLoad ) for i := 0; i < 10; i++ { - sc, done, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - locality := backendToBalancerID[sc] + scst, _ := p1.Pick(balancer.PickInfo{}) + locality := backendToBalancerID[scst.SubConn] wantStart = append(wantStart, locality) - if done != nil && sc != sc1 { - done(balancer.DoneInfo{ + if scst.Done != nil && scst.SubConn != sc1 { + scst.Done(balancer.DoneInfo{ ServerLoad: &orcapb.OrcaLoadReport{ CpuUtilization: 10, MemUtilization: 5, @@ -510,10 +483,7 @@ func TestBalancerGroup_start_close(t *testing.T) { m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], } - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -551,10 +521,7 @@ func TestBalancerGroup_start_close(t *testing.T) { m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[1]], m2[testBackendAddrs[2]], } - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -624,10 +591,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*balancerGroup, *testClientC m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], } - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -647,10 +611,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*balancerGroup, *testClientC want = []balancer.SubConn{ m1[testBackendAddrs[0]], m1[testBackendAddrs[1]], } - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -690,10 +651,7 @@ func TestBalancerGroup_locality_caching(t *testing.T) { // addr2 is down, b2 only has addr3 in READY state. addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]], } - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -832,10 +790,7 @@ func TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing. addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]], } - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) } } diff --git a/xds/internal/balancer/edsbalancer/edsbalancer.go b/xds/internal/balancer/edsbalancer/edsbalancer.go index a13bcc54802e..0d75d3bac5ed 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer.go @@ -18,7 +18,6 @@ package edsbalancer import ( - "context" "encoding/json" "reflect" "sync" @@ -52,14 +51,6 @@ type balancerGroupWithConfig struct { configs map[internal.Locality]*localityConfig } -// balancerState keeps the previous state updated by a priority. It's kept so if -// a lower priority is removed, the higher priority's state will be sent to -// parent ClientConn (even if it's not Ready). -type balancerState struct { - state connectivity.State - picker balancer.Picker -} - // EDSBalancer does load balancing based on the EDS responses. Note that it // doesn't implement the balancer interface. It's intended to be used by a high // level balancer implementation. @@ -82,7 +73,7 @@ type EDSBalancer struct { // priorities are pointers, and will be nil when EDS returns empty result. priorityInUse priorityType priorityLowest priorityType - priorityToState map[priorityType]*balancerState + priorityToState map[priorityType]*balancer.State // The timer to give a priority 10 seconds to connect. And if the priority // doesn't go into Ready/Failure, start the next priority. // @@ -93,10 +84,9 @@ type EDSBalancer struct { subConnMu sync.Mutex subConnToPriority map[balancer.SubConn]priorityType - pickerMu sync.Mutex - drops []*dropper - innerPicker balancer.Picker // The picker without drop support. - innerState connectivity.State // The state of the picker. + pickerMu sync.Mutex + drops []*dropper + innerState balancer.State // The state of the picker without drop support. } // NewXDSBalancer create a new EDSBalancer. @@ -106,7 +96,7 @@ func NewXDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) *EDSBalancer { subBalancerBuilder: balancer.Get(roundrobin.Name), priorityToLocalities: make(map[priorityType]*balancerGroupWithConfig), - priorityToState: make(map[priorityType]*balancerState), + priorityToState: make(map[priorityType]*balancer.State), subConnToPriority: make(map[balancer.SubConn]priorityType), loadStore: loadStore, } @@ -177,9 +167,12 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []xdsclient.OverloadDropConfig if dropsChanged { xdsB.pickerMu.Lock() xdsB.drops = newDrops - if xdsB.innerPicker != nil { + if xdsB.innerState.Picker != nil { // Update picker with old inner picker, new drops. - xdsB.cc.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops, xdsB.loadStore)) + xdsB.cc.UpdateState(balancer.State{ + ConnectivityState: xdsB.innerState.ConnectivityState, + Picker: newDropPicker(xdsB.innerState.Picker, newDrops, xdsB.loadStore)}, + ) } xdsB.pickerMu.Unlock() } @@ -363,22 +356,21 @@ func (xdsB *EDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connect } } -// updateBalancerState first handles priority, and then wraps picker in a drop -// picker before forwarding the update. -func (xdsB *EDSBalancer) updateBalancerState(priority priorityType, s connectivity.State, p balancer.Picker) { +// updateState first handles priority, and then wraps picker in a drop picker +// before forwarding the update. +func (xdsB *EDSBalancer) updateState(priority priorityType, s balancer.State) { _, ok := xdsB.priorityToLocalities[priority] if !ok { grpclog.Infof("eds: received picker update from unknown priority") return } - if xdsB.handlePriorityWithNewState(priority, s, p) { + if xdsB.handlePriorityWithNewState(priority, s) { xdsB.pickerMu.Lock() defer xdsB.pickerMu.Unlock() - xdsB.innerPicker = p xdsB.innerState = s // Don't reset drops when it's a state change. - xdsB.cc.UpdateBalancerState(s, newDropPicker(p, xdsB.drops, xdsB.loadStore)) + xdsB.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, xdsB.drops, xdsB.loadStore)}) } } @@ -402,7 +394,10 @@ func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts bal return ebwcc.parent.newSubConn(ebwcc.priority, addrs, opts) } func (ebwcc *edsBalancerWrapperCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { - ebwcc.parent.updateBalancerState(ebwcc.priority, state, picker) + grpclog.Fatalln("not implemented") +} +func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) { + ebwcc.parent.updateState(ebwcc.priority, state) } func (xdsB *EDSBalancer) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { @@ -427,11 +422,11 @@ func (xdsB *EDSBalancer) Close() { type dropPicker struct { drops []*dropper - p balancer.Picker + p balancer.V2Picker loadStore lrs.Store } -func newDropPicker(p balancer.Picker, drops []*dropper, loadStore lrs.Store) *dropPicker { +func newDropPicker(p balancer.V2Picker, drops []*dropper, loadStore lrs.Store) *dropPicker { return &dropPicker{ drops: drops, p: p, @@ -439,7 +434,7 @@ func newDropPicker(p balancer.Picker, drops []*dropper, loadStore lrs.Store) *dr } } -func (d *dropPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { +func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { var ( drop bool category string @@ -455,9 +450,9 @@ func (d *dropPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn if d.loadStore != nil { d.loadStore.CallDropped(category) } - return nil, nil, status.Errorf(codes.Unavailable, "RPC is dropped") + return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped") } // TODO: (eds) don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. - return d.p.Pick(ctx, opts) + return d.p.Pick(info) } diff --git a/xds/internal/balancer/edsbalancer/edsbalancer_test.go b/xds/internal/balancer/edsbalancer/edsbalancer_test.go index edc4b411d246..0e30f7512fe6 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer_test.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer_test.go @@ -17,7 +17,6 @@ package edsbalancer import ( - "context" "fmt" "reflect" "sort" @@ -67,9 +66,9 @@ func TestEDS_OneLocality(t *testing.T) { // Pick with only the first backend. p1 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc1) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1) + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc1) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } @@ -85,10 +84,7 @@ func TestEDS_OneLocality(t *testing.T) { // Test roundrobin with two subconns. p2 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -106,9 +102,9 @@ func TestEDS_OneLocality(t *testing.T) { // Test pick with only the second subconn. p3 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc2) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + gotSCSt, _ := p3.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc2) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } @@ -129,9 +125,9 @@ func TestEDS_OneLocality(t *testing.T) { // Test pick with only the third subconn. p4 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc3) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc3) + gotSCSt, _ := p4.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc3) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3) } } @@ -143,7 +139,7 @@ func TestEDS_OneLocality(t *testing.T) { // Picks with drops. p5 := <-cc.newPickerCh for i := 0; i < 100; i++ { - _, _, err := p5.Pick(context.Background(), balancer.PickOptions{}) + _, err := p5.Pick(balancer.PickInfo{}) // TODO: the dropping algorithm needs a design. When the dropping algorithm // is fixed, this test also needs fix. if i < 50 && err == nil { @@ -184,10 +180,7 @@ func TestEDS_TwoLocalities(t *testing.T) { // Test roundrobin with two subconns. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -205,10 +198,7 @@ func TestEDS_TwoLocalities(t *testing.T) { // Test roundrobin with three subconns. p2 := <-cc.newPickerCh want = []balancer.SubConn{sc1, sc2, sc3} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -227,10 +217,7 @@ func TestEDS_TwoLocalities(t *testing.T) { // Test pick with two subconns (without the first one). p3 := <-cc.newPickerCh want = []balancer.SubConn{sc2, sc3} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -250,10 +237,7 @@ func TestEDS_TwoLocalities(t *testing.T) { // Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect // two sc2's and sc3, sc4. want = []balancer.SubConn{sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -269,10 +253,7 @@ func TestEDS_TwoLocalities(t *testing.T) { // (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and // sc4. So expect four sc2's and sc3, sc4. want = []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p5.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p5)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -298,10 +279,7 @@ func TestEDS_TwoLocalities(t *testing.T) { // Locality-1 will be not be picked, and locality-2 will be picked. // Locality-2 contains sc3 and sc4. So expect sc3, sc4. want = []balancer.SubConn{sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p6.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p6)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -375,10 +353,7 @@ func TestEDS_EndpointsHealth(t *testing.T) { // Test roundrobin with the subconns. p1 := <-cc.newPickerCh want := readySCs - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -411,7 +386,7 @@ type testConstBalancer struct { } func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { - tb.cc.UpdateBalancerState(connectivity.Ready, &testConstPicker{err: errTestConstPicker}) + tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &testConstPicker{err: errTestConstPicker}}) } func (tb *testConstBalancer) HandleResolvedAddrs(a []resolver.Address, err error) { @@ -429,11 +404,11 @@ type testConstPicker struct { sc balancer.SubConn } -func (tcp *testConstPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { +func (tcp *testConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if tcp.err != nil { - return nil, nil, tcp.err + return balancer.PickResult{}, tcp.err } - return tcp.sc, nil, nil + return balancer.PickResult{SubConn: tcp.sc}, nil } // Create XDS balancer, and update sub-balancer before handling eds responses. @@ -459,7 +434,7 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) { p0 := <-cc.newPickerCh for i := 0; i < 5; i++ { - _, _, err := p0.Pick(context.Background(), balancer.PickOptions{}) + _, err := p0.Pick(balancer.PickInfo{}) if !reflect.DeepEqual(err, errTestConstPicker) { t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker) } @@ -482,10 +457,7 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) { // Test roundrobin with two subconns. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -507,7 +479,7 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) { p2 := <-cc.newPickerCh for i := 0; i < 5; i++ { - _, _, err := p2.Pick(context.Background(), balancer.PickOptions{}) + _, err := p2.Pick(balancer.PickInfo{}) if !reflect.DeepEqual(err, errTestConstPicker) { t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker) } @@ -529,10 +501,7 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) { p3 := <-cc.newPickerCh want = []balancer.SubConn{sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -589,7 +558,7 @@ func TestDropPicker(t *testing.T) { } for i := 0; i < pickCount; i++ { - _, _, err := p.Pick(context.Background(), balancer.PickOptions{}) + _, err := p.Pick(balancer.PickInfo{}) if err == nil { scCount++ } @@ -641,12 +610,12 @@ func TestEDS_LoadReport(t *testing.T) { ) for i := 0; i < 10; i++ { - sc, done, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - locality := backendToBalancerID[sc] + scst, _ := p1.Pick(balancer.PickInfo{}) + locality := backendToBalancerID[scst.SubConn] wantStart = append(wantStart, locality) - if done != nil && sc != sc1 { - done(balancer.DoneInfo{}) - wantEnd = append(wantEnd, backendToBalancerID[sc]) + if scst.Done != nil && scst.SubConn != sc1 { + scst.Done(balancer.DoneInfo{}) + wantEnd = append(wantEnd, backendToBalancerID[scst.SubConn]) } } diff --git a/xds/internal/balancer/edsbalancer/priority.go b/xds/internal/balancer/edsbalancer/priority.go index 0ea4b96c56b4..0ae465527cff 100644 --- a/xds/internal/balancer/edsbalancer/priority.go +++ b/xds/internal/balancer/edsbalancer/priority.go @@ -45,7 +45,7 @@ func (xdsB *EDSBalancer) handlePriorityChange() { // Everything was removed by EDS. if !xdsB.priorityLowest.isSet() { xdsB.priorityInUse = newPriorityTypeUnset() - xdsB.cc.UpdateBalancerState(connectivity.TransientFailure, base.NewErrPicker(balancer.ErrTransientFailure)) + xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)}) return } @@ -59,7 +59,7 @@ func (xdsB *EDSBalancer) handlePriorityChange() { if _, ok := xdsB.priorityToLocalities[xdsB.priorityInUse]; !ok { xdsB.priorityInUse = xdsB.priorityLowest if s, ok := xdsB.priorityToState[xdsB.priorityLowest]; ok { - xdsB.cc.UpdateBalancerState(s.state, s.picker) + xdsB.cc.UpdateState(*s) } else { // If state for priorityLowest is not found, this means priorityLowest was // started, but never sent any update. The init timer fired and @@ -69,13 +69,13 @@ func (xdsB *EDSBalancer) handlePriorityChange() { // We don't have an old state to send to parent, but we also don't // want parent to keep using picker from old_priorityInUse. Send an // update to trigger block picks until a new picker is ready. - xdsB.cc.UpdateBalancerState(connectivity.Connecting, base.NewErrPicker(balancer.ErrNoSubConnAvailable)) + xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)}) } return } // priorityInUse is not ready, look for next priority, and use if found. - if s, ok := xdsB.priorityToState[xdsB.priorityInUse]; ok && s.state != connectivity.Ready { + if s, ok := xdsB.priorityToState[xdsB.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready { pNext := xdsB.priorityInUse.nextLower() if _, ok := xdsB.priorityToLocalities[pNext]; ok { xdsB.startPriority(pNext) @@ -119,7 +119,7 @@ func (xdsB *EDSBalancer) startPriority(priority priorityType) { // handlePriorityWithNewState start/close priorities based on the connectivity // state. It returns whether the state should be forwarded to parent ClientConn. -func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s connectivity.State, p balancer.Picker) bool { +func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s balancer.State) bool { xdsB.priorityMu.Lock() defer xdsB.priorityMu.Unlock() @@ -136,14 +136,13 @@ func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s con bState, ok := xdsB.priorityToState[priority] if !ok { - bState = &balancerState{} + bState = &balancer.State{} xdsB.priorityToState[priority] = bState } - oldState := bState.state - bState.state = s - bState.picker = p + oldState := bState.ConnectivityState + *bState = s - switch s { + switch s.ConnectivityState { case connectivity.Ready: return xdsB.handlePriorityWithNewStateReady(priority) case connectivity.TransientFailure: diff --git a/xds/internal/balancer/edsbalancer/priority_test.go b/xds/internal/balancer/edsbalancer/priority_test.go index 73d9cc633209..1279d3b16cef 100644 --- a/xds/internal/balancer/edsbalancer/priority_test.go +++ b/xds/internal/balancer/edsbalancer/priority_test.go @@ -18,7 +18,6 @@ package edsbalancer import ( - "context" "reflect" "testing" "time" @@ -55,10 +54,7 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) { // Test roundrobin with only p0 subconns. p1 := <-cc.newPickerCh want := []balancer.SubConn{sc1} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { // t.Fatalf("want %v, got %v", want, err) t.Fatalf("want %v, got %v", want, err) } @@ -124,10 +120,7 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { // Test roundrobin with only p0 subconns. p0 := <-cc.newPickerCh want := []balancer.SubConn{sc0} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p0.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil { // t.Fatalf("want %v, got %v", want, err) t.Fatalf("want %v, got %v", want, err) } @@ -145,9 +138,9 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { // Test pick with 1. p1 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc1) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1) + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc1) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } @@ -181,9 +174,9 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { // Test pick with 2. p2 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc2) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + gotSCSt, _ := p2.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc2) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } @@ -202,7 +195,7 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { // Should get an update with 1's old picker, to override 2's old picker. p3 := <-cc.newPickerCh for i := 0; i < 5; i++ { - if _, _, err := p3.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) } } @@ -240,7 +233,7 @@ func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { // Test pick failure. pFail := <-cc.newPickerCh for i := 0; i < 5; i++ { - if _, _, err := pFail.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) } } @@ -263,9 +256,9 @@ func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { // Test pick with 2. p2 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc2) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + gotSCSt, _ := p2.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc2) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } } @@ -312,9 +305,9 @@ func TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) { // Test pick with 2. p2 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc2) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + gotSCSt, _ := p2.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc2) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } @@ -334,9 +327,9 @@ func TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) { // Test pick with 0. p0 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p0.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc0) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc0) + gotSCSt, _ := p0.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc0) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } } } @@ -392,9 +385,9 @@ func TestEDSPriority_InitTimeout(t *testing.T) { // Test pick with 1. p1 := <-cc.newPickerCh for i := 0; i < 5; i++ { - gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - if !reflect.DeepEqual(gotSC, sc1) { - t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1) + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc1) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } } @@ -424,10 +417,7 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { // Test roundrobin with only p0 subconns. p0 := <-cc.newPickerCh want := []balancer.SubConn{sc0} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p0.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil { // t.Fatalf("want %v, got %v", want, err) t.Fatalf("want %v, got %v", want, err) } @@ -446,10 +436,7 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { // Test roundrobin with only p1 subconns. p1 := <-cc.newPickerCh want = []balancer.SubConn{sc1} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { // t.Fatalf("want %v, got %v", want, err) t.Fatalf("want %v, got %v", want, err) } @@ -465,10 +452,7 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { // Test roundrobin with only p0 subconns. p2 := <-cc.newPickerCh want = []balancer.SubConn{sc0} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -491,10 +475,7 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { // Test roundrobin with only two p0 subconns. p3 := <-cc.newPickerCh want = []balancer.SubConn{sc0, sc2} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -512,10 +493,7 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { // Test roundrobin with only p1 subconns. p4 := <-cc.newPickerCh want = []balancer.SubConn{sc3, sc4} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil { t.Fatalf("want %v, got %v", want, err) } } @@ -551,11 +529,7 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Test roundrobin with only p0 subconns. p0 := <-cc.newPickerCh want := []balancer.SubConn{sc0} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p0.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { - // t.Fatalf("want %v, got %v", want, err) + if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -572,7 +546,7 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Test pick return TransientFailure. pFail := <-cc.newPickerCh for i := 0; i < 5; i++ { - if _, _, err := pFail.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) } } @@ -605,10 +579,7 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Test roundrobin with only p1 subconns. p1 := <-cc.newPickerCh want = []balancer.SubConn{sc11} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) } @@ -626,8 +597,8 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Test pick return TransientFailure. pFail1 := <-cc.newPickerCh for i := 0; i < 5; i++ { - if sc, _, err := pFail1.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error _, _, %v, got %v, _ ,%v", balancer.ErrTransientFailure, sc, err) + if scst, err := pFail1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("want pick error _, %v, got %v, _ ,%v", balancer.ErrTransientFailure, scst, err) } } @@ -639,10 +610,7 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Test roundrobin with only p0 subconns. p2 := <-cc.newPickerCh want = []balancer.SubConn{sc01} - if err := isRoundRobin(want, func() balancer.SubConn { - sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) - return sc - }); err != nil { + if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) } diff --git a/xds/internal/balancer/edsbalancer/test_util_test.go b/xds/internal/balancer/edsbalancer/test_util_test.go index 78f8d4b09498..799fc3f7c1e0 100644 --- a/xds/internal/balancer/edsbalancer/test_util_test.go +++ b/xds/internal/balancer/edsbalancer/test_util_test.go @@ -63,7 +63,7 @@ type testClientConn struct { newSubConnCh chan balancer.SubConn // The last 10 subconn created. removeSubConnCh chan balancer.SubConn // The last 10 subconn removed. - newPickerCh chan balancer.Picker // The last picker updated. + newPickerCh chan balancer.V2Picker // The last picker updated. newStateCh chan connectivity.State // The last state. subConnIdx int @@ -77,7 +77,7 @@ func newTestClientConn(t *testing.T) *testClientConn { newSubConnCh: make(chan balancer.SubConn, 10), removeSubConnCh: make(chan balancer.SubConn, 10), - newPickerCh: make(chan balancer.Picker, 1), + newPickerCh: make(chan balancer.V2Picker, 1), newStateCh: make(chan connectivity.State, 1), } } @@ -109,18 +109,22 @@ func (tcc *testClientConn) RemoveSubConn(sc balancer.SubConn) { } func (tcc *testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { - tcc.t.Logf("testClientConn: UpdateBalancerState(%v, %p)", s, p) + tcc.t.Fatal("not implemented") +} + +func (tcc *testClientConn) UpdateState(bs balancer.State) { + tcc.t.Logf("testClientConn: UpdateState(%v)", bs) select { case <-tcc.newStateCh: default: } - tcc.newStateCh <- s + tcc.newStateCh <- bs.ConnectivityState select { case <-tcc.newPickerCh: default: } - tcc.newPickerCh <- p + tcc.newPickerCh <- bs.Picker } func (tcc *testClientConn) ResolveNow(resolver.ResolveNowOptions) { diff --git a/xds/internal/balancer/xds.go b/xds/internal/balancer/xds.go index 788c43c743d8..5e67250cd991 100644 --- a/xds/internal/balancer/xds.go +++ b/xds/internal/balancer/xds.go @@ -365,9 +365,12 @@ type xdsClientConn struct { balancer.ClientConn } +func (w *xdsClientConn) UpdateState(s balancer.State) { + w.updateState(s.ConnectivityState) + w.ClientConn.UpdateState(s) +} func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { - w.updateState(s) - w.ClientConn.UpdateBalancerState(s, p) + grpclog.Fatalln("not implemented") } type subConnStateUpdate struct { diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index 235ccd64177f..08ca16d0a6c2 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -166,10 +166,11 @@ func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS return nil, nil } -func (testClientConn) RemoveSubConn(balancer.SubConn) {} -func (testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {} -func (testClientConn) ResolveNow(resolver.ResolveNowOptions) {} -func (testClientConn) Target() string { return testServiceName } +func (testClientConn) RemoveSubConn(balancer.SubConn) {} +func (testClientConn) UpdateBalancerState(connectivity.State, balancer.Picker) {} +func (testClientConn) UpdateState(balancer.State) {} +func (testClientConn) ResolveNow(resolver.ResolveNowOptions) {} +func (testClientConn) Target() string { return testServiceName } type scStateChange struct { sc balancer.SubConn