diff --git a/transport/grpc/internal/client/builder.go b/transport/grpc/internal/client/builder.go index d7ab6d75..be9406cf 100644 --- a/transport/grpc/internal/client/builder.go +++ b/transport/grpc/internal/client/builder.go @@ -7,8 +7,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "sync" + "time" ) type Builder struct { @@ -19,11 +21,14 @@ type Builder struct { } type Options struct { - PoolSize int - CertFile string - ServerName string - Discovery registry.Discovery - DialOpts []grpc.DialOption + PoolSize int + CertFile string + ServerName string + Discovery registry.Discovery + DialOpts []grpc.DialOption + KeepAliveTime int + KeepAliveTimeout int + KeepAlivePermitWithoutStream bool } func NewBuilder(opts *Options) *Builder { @@ -39,15 +44,27 @@ func NewBuilder(opts *Options) *Builder { creds = insecure.NewCredentials() } + var kacp keepalive.ClientParameters + if opts.KeepAliveTime > 0 && opts.KeepAliveTimeout > 0 { + kacp = keepalive.ClientParameters{ + Time: time.Duration(opts.KeepAliveTime) * time.Second, // send pings every opts.KeepAliveTime seconds if there is no activity + Timeout: time.Duration(opts.KeepAliveTimeout) * time.Second, // wait opts.KeepAliveTimeout second for ping ack before considering the connection dead + PermitWithoutStream: opts.KeepAlivePermitWithoutStream, // if true, send pings even without active streams + } + } + resolvers := make([]resolver.Builder, 0, 2) resolvers = append(resolvers, direct.NewBuilder()) if opts.Discovery != nil { resolvers = append(resolvers, discovery.NewBuilder(opts.Discovery)) } - b.dialOpts = make([]grpc.DialOption, 0, len(opts.DialOpts)+2) + b.dialOpts = make([]grpc.DialOption, 0, len(opts.DialOpts)+3) b.dialOpts = append(b.dialOpts, grpc.WithTransportCredentials(creds)) b.dialOpts = append(b.dialOpts, grpc.WithResolvers(resolvers...)) + if opts.KeepAliveTime > 0 && opts.KeepAliveTimeout > 0 { + b.dialOpts = append(b.dialOpts, grpc.WithKeepaliveParams(kacp)) + } return b } diff --git a/transport/grpc/options.go b/transport/grpc/options.go index 34a6b853..fd6c6431 100644 --- a/transport/grpc/options.go +++ b/transport/grpc/options.go @@ -14,13 +14,16 @@ const ( ) const ( - defaultServerAddrKey = "config.transport.grpc.server.addr" - defaultServerHostAddrKey = "config.transport.grpc.server.hostAddr" - defaultServerKeyFileKey = "config.transport.grpc.server.keyFile" - defaultServerCertFileKey = "config.transport.grpc.server.certFile" - defaultClientPoolSizeKey = "config.transport.grpc.client.poolSize" - defaultClientCertFileKey = "config.transport.grpc.client.certFile" - defaultClientServerNameKey = "config.transport.grpc.client.serverName" + defaultServerAddrKey = "config.transport.grpc.server.addr" + defaultServerHostAddrKey = "config.transport.grpc.server.hostAddr" + defaultServerKeyFileKey = "config.transport.grpc.server.keyFile" + defaultServerCertFileKey = "config.transport.grpc.server.certFile" + defaultClientPoolSizeKey = "config.transport.grpc.client.poolSize" + defaultClientCertFileKey = "config.transport.grpc.client.certFile" + defaultClientServerNameKey = "config.transport.grpc.client.serverName" + defaultClientKeepAliveTimeKey = "config.transport.grpc.client.keepAlive.time" + defaultClientKeepAliveTimeoutKey = "config.transport.grpc.client.keepAlive.timeout" + defaultClientKeepAlivePermitWithoutStreamKey = "config.transport.grpc.client.keepAlive.permitWithoutStream" ) type Option func(o *options) @@ -39,6 +42,9 @@ func defaultOptions() *options { opts.client.PoolSize = config.Get(defaultClientPoolSizeKey, defaultClientPoolSize).Int() opts.client.CertFile = config.Get(defaultClientCertFileKey).String() opts.client.ServerName = config.Get(defaultClientServerNameKey).String() + opts.client.KeepAliveTime = config.Get(defaultClientKeepAliveTimeKey).Int() + opts.client.KeepAliveTimeout = config.Get(defaultClientKeepAliveTimeoutKey).Int() + opts.client.KeepAlivePermitWithoutStream = config.Get(defaultClientKeepAlivePermitWithoutStreamKey).Bool() return opts } @@ -73,6 +79,13 @@ func WithClientDiscovery(discovery registry.Discovery) Option { return func(o *options) { o.client.Discovery = discovery } } +// WithClientKeepAliveParams 设置客户端keep alive参数 +func WithClientKeepAliveParams(time int, timeout int, permitWithoutStream bool) Option { + return func(o *options) { + o.client.KeepAliveTime, o.client.KeepAliveTimeout, o.client.KeepAlivePermitWithoutStream = time, timeout, permitWithoutStream + } +} + // WithClientDialOptions 设置客户端拨号选项 func WithClientDialOptions(opts ...grpc.DialOption) Option { return func(o *options) { o.client.DialOpts = opts }