Skip to content

Commit

Permalink
Cherry-pick #6856 to v1.60.x release branch (#6864)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Dec 18, 2023
1 parent 6430548 commit 6e384cf
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 9 deletions.
22 changes: 13 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData

serverWorkerChannel chan func()
serverWorkerChannel chan func()
serverWorkerChannelClose func()
}

type serverOptions struct {
Expand Down Expand Up @@ -623,15 +624,14 @@ func (s *Server) serverWorker() {
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan func())
s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
close(s.serverWorkerChannel)
})
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
}

func (s *Server) stopServerWorkers() {
close(s.serverWorkerChannel)
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
Expand Down Expand Up @@ -1898,15 +1898,19 @@ func (s *Server) stop(graceful bool) {
s.closeServerTransportsLocked()
}

if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
}

for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil

if s.opts.numServerWorkers > 0 {
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
// connections have been closed above ensures that there are no
// goroutines executing the callback passed to st.HandleStreams (where
// the channel is written to).
s.serverWorkerChannelClose()
}

if s.events != nil {
s.events.Finish()
s.events = nil
Expand Down
88 changes: 88 additions & 0 deletions server_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@ package grpc_test
import (
"context"
"io"
"runtime"
"sync"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

// TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
Expand Down Expand Up @@ -97,3 +103,85 @@ func (s) TestServer_MaxHandlers(t *testing.T) {
t.Fatal("Received unexpected RPC error:", err)
}
}

// Tests the case where the stream worker goroutine option is enabled, and a
// number of RPCs are initiated around the same time that Stop() is called. This
// used to result in a write to a closed channel. This test verifies that there
// is no panic.
func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
// This deferred stop takes care of stopping the server when one of the
// below grpc.Dials fail, and the test exits early.
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
const numChannels = 20
const numRPCLoops = 20

// Create a bunch of clientconns and ensure that they are READY by making an
// RPC on them.
ccs := make([]*grpc.ClientConn, numChannels)
for i := 0; i < numChannels; i++ {
var err error
ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err)
}
defer ccs[i].Close()
client := testgrpc.NewTestServiceClient(ccs[i])
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}

// Make a bunch of concurrent RPCs on the above clientconns. These will
// eventually race with Stop(), and will start to fail.
var wg sync.WaitGroup
for i := 0; i < numChannels; i++ {
client := testgrpc.NewTestServiceClient(ccs[i])
for j := 0; j < numRPCLoops; j++ {
wg.Add(1)
go func(client testgrpc.TestServiceClient) {
defer wg.Done()
for {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
continue
}
if code := status.Code(err); code == codes.Unavailable {
// Once Stop() has been called on the server, we expect
// subsequent calls to fail with Unavailable.
return
}
t.Errorf("EmptyCall() failed: %v", err)
return
}
}(client)
}
}

// Call Stop() concurrently with the above RPC attempts.
ss.Stop()
wg.Wait()
}

// Tests the case where the stream worker goroutine option is enabled, and both
// Stop() and GracefulStop() care called. This used to result in a close of a
// closed channel. This test verifies that there is no panic.
func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
defer ss.Stop()

if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
t.Fatalf("Failed to create client to stub server: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(ss.CC)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

ss.S.GracefulStop()
}

0 comments on commit 6e384cf

Please sign in to comment.