From 3c01138e234950af13647c19cb758a5eaaadddb2 Mon Sep 17 00:00:00 2001 From: yimin Date: Tue, 17 Oct 2023 00:38:00 +0800 Subject: [PATCH] add server grpc keep alive params --- transport/grpc/internal/client/builder.go | 15 ++++---- transport/grpc/internal/server/server.go | 38 ++++++++++++++++---- transport/grpc/options.go | 44 ++++++++++++++++++----- 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/transport/grpc/internal/client/builder.go b/transport/grpc/internal/client/builder.go index be9406cf..87bf6d40 100644 --- a/transport/grpc/internal/client/builder.go +++ b/transport/grpc/internal/client/builder.go @@ -44,15 +44,6 @@ func NewBuilder(opts *Options) *Builder { creds = insecure.NewCredentials() } - var kacp keepalive.ClientParameters - if opts.KeepAliveTime > 0 && opts.KeepAliveTimeout > 0 { - kacp = keepalive.ClientParameters{ - Time: time.Duration(opts.KeepAliveTime) * time.Second, // send pings every opts.KeepAliveTime seconds if there is no activity - Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // wait opts.KeepAliveTimeout second for ping ack before considering the connection dead - PermitWithoutStream: opts.KeepAlivePermitWithoutStream, // if true, send pings even without active streams - } - } - resolvers := make([]resolver.Builder, 0, 2) resolvers = append(resolvers, direct.NewBuilder()) if opts.Discovery != nil { @@ -62,7 +53,13 @@ func NewBuilder(opts *Options) *Builder { b.dialOpts = make([]grpc.DialOption, 0, len(opts.DialOpts)+3) b.dialOpts = append(b.dialOpts, grpc.WithTransportCredentials(creds)) b.dialOpts = append(b.dialOpts, grpc.WithResolvers(resolvers...)) + if opts.KeepAliveTime > 0 && opts.KeepAliveTimeout > 0 { + kacp := keepalive.ClientParameters{ + Time: time.Duration(opts.KeepAliveTime) * time.Second, // send pings every opts.KeepAliveTime seconds if there is no activity + Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // wait opts.KeepAliveTimeout second for ping ack before considering the connection dead + PermitWithoutStream: opts.KeepAlivePermitWithoutStream, // if true, send pings even without active streams + } b.dialOpts = append(b.dialOpts, grpc.WithKeepaliveParams(kacp)) } diff --git a/transport/grpc/internal/server/server.go b/transport/grpc/internal/server/server.go index f16c7c73..900057fd 100644 --- a/transport/grpc/internal/server/server.go +++ b/transport/grpc/internal/server/server.go @@ -7,7 +7,9 @@ import ( xnet "github.com/symsimmy/due/internal/net" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "net" + "time" ) const scheme = "grpc" @@ -21,11 +23,18 @@ type Server struct { } type Options struct { - Addr string - HostAddr string - KeyFile string - CertFile string - ServerOpts []grpc.ServerOption + Addr string + HostAddr string + KeyFile string + CertFile string + KeepAliveEnforcementPolicyMinTime int + KeepAliveEnforcementPolicyPermitWithoutStream bool + KeepAliveMaxConnectionIdle int + KeepAliveMaxConnectionAge int + KeepAliveMaxConnectionAgeGrace int + KeepAliveTime int + KeepAliveTimeout int + ServerOpts []grpc.ServerOption } func NewServer(opts *Options, disabledServices ...string) (*Server, error) { @@ -42,7 +51,7 @@ func NewServer(opts *Options, disabledServices ...string) (*Server, error) { } isSecure := false - serverOpts := make([]grpc.ServerOption, 0, len(opts.ServerOpts)+1) + serverOpts := make([]grpc.ServerOption, 0, len(opts.ServerOpts)+3) serverOpts = append(serverOpts, opts.ServerOpts...) if opts.CertFile != "" && opts.KeyFile != "" { cred, err := credentials.NewServerTLSFromFile(opts.CertFile, opts.KeyFile) @@ -53,6 +62,23 @@ func NewServer(opts *Options, disabledServices ...string) (*Server, error) { isSecure = true } + if opts.KeepAliveTimeout > 0 && opts.KeepAliveTime > 0 { + serverOpts = append(serverOpts, grpc.KeepaliveEnforcementPolicy( + keepalive.EnforcementPolicy{ + MinTime: time.Duration(opts.KeepAliveEnforcementPolicyMinTime) * time.Second, // If a client pings more than once every opts.KeepAliveEnforcementPolicyMinTime seconds, terminate the connection + PermitWithoutStream: opts.KeepAliveEnforcementPolicyPermitWithoutStream, // Allow pings even when there are no active streams + })) + + serverOpts = append(serverOpts, grpc.KeepaliveParams( + keepalive.ServerParameters{ + MaxConnectionIdle: time.Duration(opts.KeepAliveMaxConnectionIdle) * time.Second, // If a client is idle for opts.KeepAliveMaxConnectionIdle seconds, send a GOAWAY + MaxConnectionAge: time.Duration(opts.KeepAliveMaxConnectionAge) * time.Second, // If any connection is alive for more than opts.KeepAliveMaxConnectionAge seconds, send a GOAWAY + MaxConnectionAgeGrace: time.Duration(opts.KeepAliveMaxConnectionAgeGrace) * time.Second, // Allow opts.KeepAliveMaxConnectionAgeGrace seconds for pending RPCs to complete before forcibly closing connections + Time: time.Duration(opts.KeepAliveTime) * time.Second, // Ping the client if it is idle for opts.KeepAliveTime seconds to ensure the connection is still active + Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // Wait opts.KeepAliveTimeout second for the ping ack before assuming the connection is dead + })) + } + s := &Server{} s.listenAddr = listenAddr s.exposeAddr = exposeAddr diff --git a/transport/grpc/options.go b/transport/grpc/options.go index fd6c6431..b09fdc02 100644 --- a/transport/grpc/options.go +++ b/transport/grpc/options.go @@ -9,15 +9,33 @@ import ( ) const ( - defaultServerAddr = ":0" // 默认服务器地址 - defaultClientPoolSize = 10 // 默认客户端连接池大小 + defaultServerAddr = ":0" // 默认服务器地址 + defaultClientPoolSize = 10 // 默认客户端连接池大小 + defaultServerKeepAliveEnforcementPolicyMinTime = 5 // If a client pings more than once every 5 seconds, terminate the connection + defaultServerKeepAliveEnforcementPolicyPermitWithoutStream = true // Allow pings even when there are no active streams + defaultServerKeepAliveMaxConnectionIdle = 15 // If a client is idle for 15 seconds, send a GOAWAY + defaultServerKeepAliveMaxConnectionAge = 30 // If any connection is alive for more than 30 seconds, send a GOAWAY + defaultServerKeepAliveMaxConnectionAgeGrace = 5 // Allow 5 seconds for pending RPCs to complete before forcibly closing connections + defaultServerKeepAliveTime = 5 // Ping the client if it is idle for 5 seconds to ensure the connection is still active + defaultServerKeepAliveTimeout = 1 // Wait 1 second for the ping ack before assuming the connection is dead + defaultClientKeepAliveTime = 10 // send pings every 10 seconds if there is no activity + defaultClientKeepAliveTimeout = 1 // wait 1 second for ping ack before considering the connection dead + defaultClientKeepAlivePermitWithoutStream = true // send pings even without active streams ) const ( - defaultServerAddrKey = "config.transport.grpc.server.addr" - defaultServerHostAddrKey = "config.transport.grpc.server.hostAddr" - defaultServerKeyFileKey = "config.transport.grpc.server.keyFile" - defaultServerCertFileKey = "config.transport.grpc.server.certFile" + defaultServerAddrKey = "config.transport.grpc.server.addr" + defaultServerHostAddrKey = "config.transport.grpc.server.hostAddr" + defaultServerKeyFileKey = "config.transport.grpc.server.keyFile" + defaultServerCertFileKey = "config.transport.grpc.server.certFile" + defaultServerKeepAliveEnforcementPolicyMinTimeKey = "config.transport.grpc.server.keepAlive.minTime" + defaultServerKeepAliveEnforcementPolicyPermitWithoutStreamKey = "config.transport.grpc.server.keepAlive.permitWithoutStream" + defaultServerKeepAliveMaxConnectionIdleKey = "config.transport.grpc.server.keepAlive.MaxConnectionIdle" + defaultServerKeepAliveMaxConnectionAgeKey = "config.transport.grpc.server.keepAlive.MaxConnectionAge" + defaultServerKeepAliveMaxConnectionAgeGraceKey = "config.transport.grpc.server.keepAlive.MaxConnectionAgeGrace" + defaultServerKeepAliveTimeKey = "config.transport.grpc.server.keepAlive.Time" + defaultServerKeepAliveTimeoutKey = "config.transport.grpc.server.keepAlive.Timeout" + defaultClientPoolSizeKey = "config.transport.grpc.client.poolSize" defaultClientCertFileKey = "config.transport.grpc.client.certFile" defaultClientServerNameKey = "config.transport.grpc.client.serverName" @@ -39,12 +57,20 @@ func defaultOptions() *options { opts.server.HostAddr = config.Get(defaultServerHostAddrKey).String() opts.server.KeyFile = config.Get(defaultServerKeyFileKey).String() opts.server.CertFile = config.Get(defaultServerCertFileKey).String() + opts.server.KeepAliveEnforcementPolicyMinTime = config.Get(defaultServerKeepAliveEnforcementPolicyMinTimeKey, defaultServerKeepAliveEnforcementPolicyMinTime).Int() + opts.server.KeepAliveEnforcementPolicyPermitWithoutStream = config.Get(defaultServerKeepAliveEnforcementPolicyPermitWithoutStreamKey, defaultServerKeepAliveEnforcementPolicyPermitWithoutStream).Bool() + opts.server.KeepAliveMaxConnectionIdle = config.Get(defaultServerKeepAliveMaxConnectionIdleKey, defaultServerKeepAliveMaxConnectionIdle).Int() + opts.server.KeepAliveMaxConnectionAge = config.Get(defaultServerKeepAliveMaxConnectionAgeKey, defaultServerKeepAliveMaxConnectionAge).Int() + opts.server.KeepAliveMaxConnectionAgeGrace = config.Get(defaultServerKeepAliveMaxConnectionAgeGraceKey, defaultServerKeepAliveMaxConnectionAgeGrace).Int() + opts.server.KeepAliveTime = config.Get(defaultServerKeepAliveTimeKey, defaultServerKeepAliveTime).Int() + opts.server.KeepAliveTimeout = config.Get(defaultServerKeepAliveTimeoutKey, defaultServerKeepAliveTimeout).Int() + opts.client.PoolSize = config.Get(defaultClientPoolSizeKey, defaultClientPoolSize).Int() opts.client.CertFile = config.Get(defaultClientCertFileKey).String() opts.client.ServerName = config.Get(defaultClientServerNameKey).String() - opts.client.KeepAliveTime = config.Get(defaultClientKeepAliveTimeKey).Int() - opts.client.KeepAliveTimeout = config.Get(defaultClientKeepAliveTimeoutKey).Int() - opts.client.KeepAlivePermitWithoutStream = config.Get(defaultClientKeepAlivePermitWithoutStreamKey).Bool() + opts.client.KeepAliveTime = config.Get(defaultClientKeepAliveTimeKey, defaultClientKeepAliveTime).Int() + opts.client.KeepAliveTimeout = config.Get(defaultClientKeepAliveTimeoutKey, defaultClientKeepAliveTimeout).Int() + opts.client.KeepAlivePermitWithoutStream = config.Get(defaultClientKeepAlivePermitWithoutStreamKey, defaultClientKeepAlivePermitWithoutStream).Bool() return opts }