Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fork with changes from master #1

Merged
merged 7 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ type DoneInfo struct {
BytesSent bool
// BytesReceived indicates if any byte has been received from the server.
BytesReceived bool
// ServerLoad is the load received from server. It's usually sent as part of
// trailing metadata.
//
// The only supported type now is *orca_v1.LoadReport.
ServerLoad interface{}
}

var (
Expand Down
95 changes: 70 additions & 25 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
doneCh: make(chan struct{}),

manualResolver: r,
csEvltr: &balancer.ConnectivityStateEvaluator{},
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
Expand Down Expand Up @@ -238,15 +237,15 @@ type lbBalancer struct {
// but with only READY SCs will be gerenated.
backendAddrs []resolver.Address
// Roundrobin functionalities.
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
// from remote balancer within fallbackTimeout.
fallbackTimerExpired bool
serverListReceived bool
remoteBalancerConnected bool
serverListReceived bool
inFallback bool
// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
// when resolved address updates are received, and read in the goroutine
// handling fallback.
Expand All @@ -264,13 +263,16 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
return
}

if lb.state == connectivity.Connecting {
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}

var readySCs []balancer.SubConn
if lb.usePickFirst {
if lb.state == connectivity.Ready || lb.state == connectivity.Idle {
for _, sc := range lb.subConns {
readySCs = append(readySCs, sc)
break
}
for _, sc := range lb.subConns {
readySCs = append(readySCs, sc)
break
}
} else {
for _, a := range lb.backendAddrs {
Expand All @@ -286,10 +288,13 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
// If there's no ready SubConns, always re-pick. This is to avoid drops
// unless at least one SubConn is ready. Otherwise we may drop more
// often than want because of drops + re-picks(which become re-drops).
//
// This doesn't seem to be necessary after the connecting check above.
// Kept for safety.
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
if len(lb.fullServerList) <= 0 {
if lb.inFallback {
lb.picker = newRRPicker(readySCs)
return
}
Expand All @@ -305,6 +310,34 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
prevLBPicker.updateReadySCs(readySCs)
}

// aggregateSubConnStats calculate the aggregated state of SubConns in
// lb.SubConns. These SubConns are subconns in use (when switching between
// fallback and grpclb). lb.scState contains states for all SubConns, including
// those in cache (SubConns are cached for 10 seconds after remove).
//
// The aggregated state is:
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
var numConnecting uint64

for _, sc := range lb.subConns {
if state, ok := lb.scStates[sc]; ok {
switch state {
case connectivity.Ready:
return connectivity.Ready
case connectivity.Connecting:
numConnecting++
}
}
}
if numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}

func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
Expand All @@ -328,18 +361,33 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
// kept the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
}
// Force regenerate picker if
// - this sc became ready from not-ready
// - this sc became not-ready from ready
lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)

// Enter fallback when the aggregated state is not Ready and the connection
// to remote balancer is lost.
if lb.state != connectivity.Ready {
if !lb.inFallback && !lb.remoteBalancerConnected {
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
}
}
}

// updateStateAndPicker re-calculate the aggregated state, and regenerate picker
// if overall state is changed.
//
// If forceRegeneratePicker is true, picker will be regenerated.
func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
oldAggrState := lb.state
lb.state = lb.csEvltr.RecordTransition(oldS, s)

lb.state = lb.aggregateSubConnStates()
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
(lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
lb.regeneratePicker(false)
// - caller wants to regenerate
// - the aggregated state changed
if forceRegeneratePicker || (lb.state != oldAggrState) {
lb.regeneratePicker(resetDrop)
}

lb.cc.UpdateBalancerState(lb.state, lb.picker)
Expand All @@ -357,11 +405,11 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
return
}
lb.mu.Lock()
if lb.serverListReceived {
if lb.inFallback || lb.serverListReceived {
lb.mu.Unlock()
return
}
lb.fallbackTimerExpired = true
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.mu.Unlock()
}
Expand Down Expand Up @@ -405,10 +453,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {

lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs
// If serverListReceived is true, connection to remote balancer was
// successful and there's no need to do fallback anymore.
// If fallbackTimerExpired is false, fallback hasn't happened yet.
if !lb.serverListReceived && lb.fallbackTimerExpired {
if lb.inFallback {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.
Expand Down
42 changes: 29 additions & 13 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,26 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
backendAddrs = append(backendAddrs, addr)
}

// Call refreshSubConns to create/remove SubConns.
// Call refreshSubConns to create/remove SubConns. If we are in fallback,
// this is also exiting fallback.
lb.refreshSubConns(backendAddrs, true)
// Regenerate and update picker no matter if there's update on backends (if
// any SubConn will be newed/removed). Because since the full serverList was
// different, there might be updates in drops or pick weights(different
// number of duplicates). We need to update picker with the fulllist.
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker(true)
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}

// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// refreshSubConns creates/removes SubConns with backendAddrs, and refreshes
// balancer state and picker.
//
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) {
defer func() {
// Regenerate and update picker after refreshing subconns because with
// cache, even if SubConn was newed/removed, there might be no state
// changes (the subconn will be kept in cache, not actually
// newed/removed).
lb.updateStateAndPicker(true, true)
}()

lb.inFallback = !fromGRPCLBServer

opts := balancer.NewSubConnOptions{}
if fromGRPCLBServer {
opts.CredsBundle = lb.grpclbBackendCreds
Expand Down Expand Up @@ -218,6 +220,9 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
lb.mu.Lock()
lb.remoteBalancerConnected = true
lb.mu.Unlock()

// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
Expand Down Expand Up @@ -270,6 +275,17 @@ func (lb *lbBalancer) watchRemoteBalancer() {
// Trigger a re-resolve when the stream errors.
lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})

lb.mu.Lock()
lb.remoteBalancerConnected = false
lb.fullServerList = nil
// Enter fallback when connection to remote balancer is lost, and the
// aggregated state is not Ready.
if !lb.inFallback && lb.state != connectivity.Ready {
// Entering fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
}
lb.mu.Unlock()

if !doBackoff {
retryCount = 0
continue
Expand Down
80 changes: 68 additions & 12 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,21 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe
b.stats.merge(req.GetClientStats())
}
}()
for v := range b.sls {
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
for {
select {
case v := <-b.sls:
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
}
case <-stream.Context().Done():
return stream.Context().Err()
}
if err := stream.Send(resp); err != nil {
return err
}
}
<-b.done
return nil
}

type testServer struct {
Expand Down Expand Up @@ -297,6 +300,9 @@ type testServers struct {
backends []*grpc.Server
beIPs []net.IP
bePorts []int

lbListener net.Listener
beListeners []net.Listener
}

func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
Expand All @@ -317,7 +323,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)

beListeners = append(beListeners, beLis)
beListeners = append(beListeners, newRestartableListener(beLis))
}
backends := startBackends(beServerName, false, beListeners...)

Expand All @@ -327,6 +333,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
return
}
lbLis = newRestartableListener(lbLis)
lbCreds := &serverNameCheckCreds{
sn: lbServerName,
}
Expand All @@ -344,6 +351,9 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
backends: backends,
beIPs: beIPs,
bePorts: bePorts,

lbListener: lbLis,
beListeners: beListeners,
}
cleanup = func() {
defer stopBackends(backends)
Expand Down Expand Up @@ -712,7 +722,7 @@ func TestFallback(t *testing.T) {
testC := testpb.NewTestServiceClient(cc)

r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: "",
Addr: "invalid.address",
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Expand All @@ -723,7 +733,7 @@ func TestFallback(t *testing.T) {

var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if p.Addr.String() != beLis.Addr().String() {
t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
Expand All @@ -739,16 +749,62 @@ func TestFallback(t *testing.T) {
ServerName: beServerName,
}}})

var backendUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
return
backendUsed = true
break
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
if !backendUsed {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}

// Close backend and remote balancer connections, should use fallback.
tss.beListeners[0].(*restartableListener).stopPreviousConns()
tss.lbListener.(*restartableListener).stopPreviousConns()
time.Sleep(time.Second)

var fallbackUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.String() == beLis.Addr().String() {
fallbackUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !fallbackUsed {
t.Fatalf("No RPC sent to fallback after 1 second")
}

// Restart backend and remote balancer, should not use backends.
tss.beListeners[0].(*restartableListener).restart()
tss.lbListener.(*restartableListener).restart()
tss.ls.sls <- sl

time.Sleep(time.Second)

var backendUsed2 bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
backendUsed2 = true
break
}
time.Sleep(time.Millisecond)
}
if !backendUsed2 {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}
}

// The remote balancer sends response with duplicates to grpclb client.
Expand Down
Loading