Skip to content

Commit

Permalink
review comments #1
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Apr 26, 2021
1 parent 2a0dd12 commit c3a9ea1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 52 deletions.
2 changes: 1 addition & 1 deletion internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
// GetServerCredentials returns the transport credentials configured on a
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentialsa
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
Expand Down
20 changes: 10 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,7 @@ func init() {
return srv.opts.creds
}
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.mu.Lock()
for a, conns := range srv.conns {
if a != addr {
continue
}
for st := range conns {
st.Drain()
}
}
srv.mu.Unlock()
srv.drainServerTransports(addr)
}
}

Expand Down Expand Up @@ -845,6 +836,15 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}()
}

func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
}
s.mu.Unlock()
}

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/test/xds_server_serving_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {

// Create a server option to get notified about serving mode changes.
modeTracker := newModeTracker()
modeChangeOpt := xds.WithServingModeCallback(func(addr net.Addr, mode xds.ServingMode, err error) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), mode, err)
modeTracker.updateMode(addr, mode)
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
modeTracker.updateMode(addr, args.Mode)
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
Expand Down
33 changes: 15 additions & 18 deletions xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,15 @@ type GRPCServer struct {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
so, grpcOpts := handleServerOptions(opts)
newOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
grpc.ChainStreamInterceptor(xdsStreamInterceptor),
}
newOpts = append(newOpts, grpcOpts...)
newOpts = append(newOpts, opts...)
s := &GRPCServer{
gs: newGRPCServer(newOpts...),
quit: grpcsync.NewEvent(),
opts: so,
opts: handleServerOptions(opts),
}
s.logger = prefixLogger(s)
s.logger.Infof("Created xds.GRPCServer")
Expand All @@ -138,20 +137,15 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
}

// handleServerOptions iterates through the list of server options passed in by
// the user, and handles the xDS server specific options and returns the gRPC
// specific options which are to be passed to grpc.NewServer.
func handleServerOptions(opts []grpc.ServerOption) (*serverOptions, []grpc.ServerOption) {
// the user, and handles the xDS server specific options.
func handleServerOptions(opts []grpc.ServerOption) *serverOptions {
so := &serverOptions{}
var grpcOpts []grpc.ServerOption
for _, opt := range opts {
switch o := opt.(type) {
case *smcOption:
so.modeCallback = o.cb
default:
grpcOpts = append(grpcOpts, opt)
if o, ok := opt.(serverOption); ok {
o.applyServerOption(so)
}
}
return so, grpcOpts
return so
}

// RegisterService registers a service and its implementation to the underlying
Expand Down Expand Up @@ -281,14 +275,14 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
case u := <-updateCh.Get():
updateCh.Load()
args := u.(*modeChangeArgs)
var ss ServingMode
var mode ServingMode
switch args.mode {
case server.ServingModeNotServing:
ss = ServingModeNotServing
mode = ServingModeNotServing
case server.ServingModeServing:
ss = ServingModeServing
mode = ServingModeServing
}
if ss == ServingModeNotServing {
if mode == ServingModeNotServing {
// We type assert our underlying gRPC server to the real
// grpc.Server here before trying to initiate the drain
// operation. This approach avoids performing the same type
Expand All @@ -300,7 +294,10 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
}
}
if s.opts.modeCallback != nil {
s.opts.modeCallback(args.addr, ss, args.err)
s.opts.modeCallback(args.addr, ServingModeChangeArgs{
Mode: mode,
Err: args.err,
})
}
}
}
Expand Down
60 changes: 43 additions & 17 deletions xds/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,44 @@ import (
"google.golang.org/grpc"
)

// Experimental
//
// Notice: All APIs in this file are EXPERIMENTAL and may be changed or removed
// in a later release.

// WithServingModeCallback returns a grpc.ServerOption which allows users to
// ServingModeCallback returns a grpc.ServerOption which allows users to
// register a callback to get notified about serving mode changes.
//
// NOTE: The returned ServerOption must *only* be used in a call to
// xds.NewGRPCServer() and must not be used in a call to grpc.NewServer().
func WithServingModeCallback(cb ServingModeCallback) grpc.ServerOption {
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption {
return &smcOption{cb: cb}
}

type serverOption interface {
applyServerOption(*serverOptions)
}

// smcOption is a server option containing a callback to be invoked when the
// serving mode changes.
type smcOption struct {
grpc.ServerOption
cb ServingModeCallback
// Embedding the empty server option makes it safe to pass it to
// grpc.NewServer().
grpc.EmptyServerOption
cb ServingModeCallbackFunc
}

func (s *smcOption) applyServerOption(o *serverOptions) {
o.modeCallback = s.cb
}

type serverOptions struct {
modeCallback ServingModeCallback
modeCallback ServingModeCallbackFunc
}

// ServingMode indicates the current mode of operation of the server.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
type ServingMode int

const (
Expand All @@ -76,10 +88,24 @@ func (s ServingMode) String() string {
}
}

// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
// ServingModeCallbackFunc is the callback that users can register to get
// notified about the server's serving mode changes. The callback is invoked
// with the address of the listener and its new mode.
//
// Users must not perform any blocking operations in this callback.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
type ServingModeCallbackFunc func(addr net.Addr, args ServingModeChangeArgs)

// ServingModeChangeArgs wraps the arguments passed to the serving mode callback
// function.
type ServingModeChangeArgs struct {
// Mode is the new serving mode of the server listener.
Mode ServingMode
// Err is set to a non-nil error if the server has transitioned into
// not-serving mode.
Err error
}
6 changes: 3 additions & 3 deletions xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ func (s) TestServeSuccess(t *testing.T) {
// Create a new xDS-enabled gRPC server and pass it a server option to get
// notified about serving mode changes.
modeChangeCh := testutils.NewChannel()
modeChangeOption := WithServingModeCallback(func(addr net.Addr, mode ServingMode, err error) {
t.Logf("server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), mode, err)
modeChangeCh.Send(mode)
modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
t.Logf("server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
modeChangeCh.Send(args.Mode)
})
server := NewGRPCServer(modeChangeOption)
defer server.Stop()
Expand Down

0 comments on commit c3a9ea1

Please sign in to comment.