Skip to content

Commit

Permalink
add server grpc keep alive params
Browse files Browse the repository at this point in the history
  • Loading branch information
yimin committed Oct 16, 2023
1 parent 7086990 commit 3c01138
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 24 deletions.
15 changes: 6 additions & 9 deletions transport/grpc/internal/client/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,6 @@ func NewBuilder(opts *Options) *Builder {
creds = insecure.NewCredentials()
}

var kacp keepalive.ClientParameters
if opts.KeepAliveTime > 0 && opts.KeepAliveTimeout > 0 {
kacp = keepalive.ClientParameters{
Time: time.Duration(opts.KeepAliveTime) * time.Second, // send pings every opts.KeepAliveTime seconds if there is no activity
Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // wait opts.KeepAliveTimeout second for ping ack before considering the connection dead
PermitWithoutStream: opts.KeepAlivePermitWithoutStream, // if true, send pings even without active streams
}
}

resolvers := make([]resolver.Builder, 0, 2)
resolvers = append(resolvers, direct.NewBuilder())
if opts.Discovery != nil {
Expand All @@ -62,7 +53,13 @@ func NewBuilder(opts *Options) *Builder {
b.dialOpts = make([]grpc.DialOption, 0, len(opts.DialOpts)+3)
b.dialOpts = append(b.dialOpts, grpc.WithTransportCredentials(creds))
b.dialOpts = append(b.dialOpts, grpc.WithResolvers(resolvers...))

if opts.KeepAliveTime > 0 && opts.KeepAliveTimeout > 0 {
kacp := keepalive.ClientParameters{
Time: time.Duration(opts.KeepAliveTime) * time.Second, // send pings every opts.KeepAliveTime seconds if there is no activity
Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // wait opts.KeepAliveTimeout second for ping ack before considering the connection dead
PermitWithoutStream: opts.KeepAlivePermitWithoutStream, // if true, send pings even without active streams
}
b.dialOpts = append(b.dialOpts, grpc.WithKeepaliveParams(kacp))
}

Expand Down
38 changes: 32 additions & 6 deletions transport/grpc/internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
xnet "github.com/symsimmy/due/internal/net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"net"
"time"
)

const scheme = "grpc"
Expand All @@ -21,11 +23,18 @@ type Server struct {
}

type Options struct {
Addr string
HostAddr string
KeyFile string
CertFile string
ServerOpts []grpc.ServerOption
Addr string
HostAddr string
KeyFile string
CertFile string
KeepAliveEnforcementPolicyMinTime int
KeepAliveEnforcementPolicyPermitWithoutStream bool
KeepAliveMaxConnectionIdle int
KeepAliveMaxConnectionAge int
KeepAliveMaxConnectionAgeGrace int
KeepAliveTime int
KeepAliveTimeout int
ServerOpts []grpc.ServerOption
}

func NewServer(opts *Options, disabledServices ...string) (*Server, error) {
Expand All @@ -42,7 +51,7 @@ func NewServer(opts *Options, disabledServices ...string) (*Server, error) {
}

isSecure := false
serverOpts := make([]grpc.ServerOption, 0, len(opts.ServerOpts)+1)
serverOpts := make([]grpc.ServerOption, 0, len(opts.ServerOpts)+3)
serverOpts = append(serverOpts, opts.ServerOpts...)
if opts.CertFile != "" && opts.KeyFile != "" {
cred, err := credentials.NewServerTLSFromFile(opts.CertFile, opts.KeyFile)
Expand All @@ -53,6 +62,23 @@ func NewServer(opts *Options, disabledServices ...string) (*Server, error) {
isSecure = true
}

if opts.KeepAliveTimeout > 0 && opts.KeepAliveTime > 0 {
serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy(
keepalive.EnforcementPolicy{
MinTime: time.Duration(opts.KeepAliveEnforcementPolicyMinTime) * time.Second, // If a client pings more than once every opts.KeepAliveEnforcementPolicyMinTime seconds, terminate the connection
PermitWithoutStream: opts.KeepAliveEnforcementPolicyPermitWithoutStream, // Allow pings even when there are no active streams
}))

serverOpts = append(serverOpts, grpc.KeepaliveParams(
keepalive.ServerParameters{
MaxConnectionIdle: time.Duration(opts.KeepAliveMaxConnectionIdle) * time.Second, // If a client is idle for opts.KeepAliveMaxConnectionIdle seconds, send a GOAWAY
MaxConnectionAge: time.Duration(opts.KeepAliveMaxConnectionAge) * time.Second, // If any connection is alive for more than opts.KeepAliveMaxConnectionAge seconds, send a GOAWAY
MaxConnectionAgeGrace: time.Duration(opts.KeepAliveMaxConnectionAgeGrace) * time.Second, // Allow opts.KeepAliveMaxConnectionAgeGrace seconds for pending RPCs to complete before forcibly closing connections
Time: time.Duration(opts.KeepAliveTime) * time.Second, // Ping the client if it is idle for opts.KeepAliveTime seconds to ensure the connection is still active
Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // Wait opts.KeepAliveTimeout second for the ping ack before assuming the connection is dead
}))
}

s := &Server{}
s.listenAddr = listenAddr
s.exposeAddr = exposeAddr
Expand Down
44 changes: 35 additions & 9 deletions transport/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,33 @@ import (
)

const (
defaultServerAddr = ":0" // 默认服务器地址
defaultClientPoolSize = 10 // 默认客户端连接池大小
defaultServerAddr = ":0" // 默认服务器地址
defaultClientPoolSize = 10 // 默认客户端连接池大小
defaultServerKeepAliveEnforcementPolicyMinTime = 5 // If a client pings more than once every 5 seconds, terminate the connection
defaultServerKeepAliveEnforcementPolicyPermitWithoutStream = true // Allow pings even when there are no active streams
defaultServerKeepAliveMaxConnectionIdle = 15 // If a client is idle for 15 seconds, send a GOAWAY
defaultServerKeepAliveMaxConnectionAge = 30 // If any connection is alive for more than 30 seconds, send a GOAWAY
defaultServerKeepAliveMaxConnectionAgeGrace = 5 // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
defaultServerKeepAliveTime = 5 // Ping the client if it is idle for 5 seconds to ensure the connection is still active
defaultServerKeepAliveTimeout = 1 // Wait 1 second for the ping ack before assuming the connection is dead
defaultClientKeepAliveTime = 10 // send pings every 10 seconds if there is no activity
defaultClientKeepAliveTimeout = 1 // wait 1 second for ping ack before considering the connection dead
defaultClientKeepAlivePermitWithoutStream = true // send pings even without active streams
)

const (
defaultServerAddrKey = "config.transport.grpc.server.addr"
defaultServerHostAddrKey = "config.transport.grpc.server.hostAddr"
defaultServerKeyFileKey = "config.transport.grpc.server.keyFile"
defaultServerCertFileKey = "config.transport.grpc.server.certFile"
defaultServerAddrKey = "config.transport.grpc.server.addr"
defaultServerHostAddrKey = "config.transport.grpc.server.hostAddr"
defaultServerKeyFileKey = "config.transport.grpc.server.keyFile"
defaultServerCertFileKey = "config.transport.grpc.server.certFile"
defaultServerKeepAliveEnforcementPolicyMinTimeKey = "config.transport.grpc.server.keepAlive.minTime"
defaultServerKeepAliveEnforcementPolicyPermitWithoutStreamKey = "config.transport.grpc.server.keepAlive.permitWithoutStream"
defaultServerKeepAliveMaxConnectionIdleKey = "config.transport.grpc.server.keepAlive.MaxConnectionIdle"
defaultServerKeepAliveMaxConnectionAgeKey = "config.transport.grpc.server.keepAlive.MaxConnectionAge"
defaultServerKeepAliveMaxConnectionAgeGraceKey = "config.transport.grpc.server.keepAlive.MaxConnectionAgeGrace"
defaultServerKeepAliveTimeKey = "config.transport.grpc.server.keepAlive.Time"
defaultServerKeepAliveTimeoutKey = "config.transport.grpc.server.keepAlive.Timeout"

defaultClientPoolSizeKey = "config.transport.grpc.client.poolSize"
defaultClientCertFileKey = "config.transport.grpc.client.certFile"
defaultClientServerNameKey = "config.transport.grpc.client.serverName"
Expand All @@ -39,12 +57,20 @@ func defaultOptions() *options {
opts.server.HostAddr = config.Get(defaultServerHostAddrKey).String()
opts.server.KeyFile = config.Get(defaultServerKeyFileKey).String()
opts.server.CertFile = config.Get(defaultServerCertFileKey).String()
opts.server.KeepAliveEnforcementPolicyMinTime = config.Get(defaultServerKeepAliveEnforcementPolicyMinTimeKey, defaultServerKeepAliveEnforcementPolicyMinTime).Int()
opts.server.KeepAliveEnforcementPolicyPermitWithoutStream = config.Get(defaultServerKeepAliveEnforcementPolicyPermitWithoutStreamKey, defaultServerKeepAliveEnforcementPolicyPermitWithoutStream).Bool()
opts.server.KeepAliveMaxConnectionIdle = config.Get(defaultServerKeepAliveMaxConnectionIdleKey, defaultServerKeepAliveMaxConnectionIdle).Int()
opts.server.KeepAliveMaxConnectionAge = config.Get(defaultServerKeepAliveMaxConnectionAgeKey, defaultServerKeepAliveMaxConnectionAge).Int()
opts.server.KeepAliveMaxConnectionAgeGrace = config.Get(defaultServerKeepAliveMaxConnectionAgeGraceKey, defaultServerKeepAliveMaxConnectionAgeGrace).Int()
opts.server.KeepAliveTime = config.Get(defaultServerKeepAliveTimeKey, defaultServerKeepAliveTime).Int()
opts.server.KeepAliveTimeout = config.Get(defaultServerKeepAliveTimeoutKey, defaultServerKeepAliveTimeout).Int()

opts.client.PoolSize = config.Get(defaultClientPoolSizeKey, defaultClientPoolSize).Int()
opts.client.CertFile = config.Get(defaultClientCertFileKey).String()
opts.client.ServerName = config.Get(defaultClientServerNameKey).String()
opts.client.KeepAliveTime = config.Get(defaultClientKeepAliveTimeKey).Int()
opts.client.KeepAliveTimeout = config.Get(defaultClientKeepAliveTimeoutKey).Int()
opts.client.KeepAlivePermitWithoutStream = config.Get(defaultClientKeepAlivePermitWithoutStreamKey).Bool()
opts.client.KeepAliveTime = config.Get(defaultClientKeepAliveTimeKey, defaultClientKeepAliveTime).Int()
opts.client.KeepAliveTimeout = config.Get(defaultClientKeepAliveTimeoutKey, defaultClientKeepAliveTimeout).Int()
opts.client.KeepAlivePermitWithoutStream = config.Get(defaultClientKeepAlivePermitWithoutStreamKey, defaultClientKeepAlivePermitWithoutStream).Bool()

return opts
}
Expand Down

0 comments on commit 3c01138

Please sign in to comment.