Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11668][SDK] Add max life time support for the connections in the conn pool of Golang SDK #11669

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Options struct {
BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false
AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of the AddColumns, just a cache, used internal
Auth Auth // dataproxy authentication interface
MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers)
}
```

Expand Down
191 changes: 163 additions & 28 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,50 @@ var (

// Dialer is the interface of a dialer that return a NetConn
type Dialer interface {
Dial(addr string) (gnet.Conn, error)
// Dial dials to the addr and bind ctx to the returned connection, which network(TCP/UDP) to use is determined by the Dialer
// Dial should use gnet.Client.DialContext() to get a connection that can be driven by a gnet event engine.
Dial(addr string, ctx any) (gnet.Conn, error)
}

// ConnContext is the additional attributes to set to a gnet.Conn
type ConnContext struct {
CreatedAt time.Time // the created time of the connection
Endpoint string // the address of the remote endpoint
}

// EndpointRestrictedConnPool is the interface of a simple endpoint restricted connection pool that
// the connection's remote address must be in an endpoint list, if not, it will be closed and can
// not be used anymore, it is useful for holding the connections to a service whose endpoints can
// be changed at runtime.
// Best practice:
// gnet is a high-performance networking package, the best way to use this pool is:
// 1. call Get() to get a gnet.Conn;
// 2. use the conn to read/write for a duration, 1m, for example, and then put the conn back to the pool and get a new one for load balancing, avoid putting/getting frequently;
// 3. do not switch(put and get) to a new conn in the callback of gnet.Conn.AsyncWrite(buf []byte, callback AsyncCallback) or gnet.Conn.AsyncWritev(bs [][]byte, callback AsyncCallback), it may be blocked;
// 4. if you use TCP conn and can not update endpoints by service discovery directly, for example, your endpoints are behind at the back of a LB, it is better to set a max lifetime
// for your pool, so that you can restart your endpoints(RS) without data lost by:
// 1). set the weight of your endpoint(RS) to 0, so that no new connection incoming;
// 2). wait for the existing connections to close by lifetime timeout;
// 3). restart your endpoint.
type EndpointRestrictedConnPool interface {
// Get gets a connection
// Get gets a connection, it's concurrency-safe, but you can not call it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev().
Get() (gnet.Conn, error)
// Put puts a connection back to the pool, if err is not nil, the connection will be closed by the pool
// Put puts a connection back to the pool, if err is not nil, the connection will be closed by the pool, it's concurrency-safe,
// but you can not call it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev().
Put(conn gnet.Conn, err error)
// UpdateEndpoints updates the endpoints the pool to dial to
// UpdateEndpoints updates the endpoints the pool to dial to, it's not concurrency-safe.
UpdateEndpoints(all, add, del []string)
// NumPooled returns the connection number in the pool, not the number of all the connection that the pool created
// NumPooled returns the connection number in the pool, not the number of all the connection that the pool created, it's concurrency-safe.
NumPooled() int
// OnConnClosed used to notify that a connection is closed, the connection will be removed from the pool, if err is not nil, the remote endpoint will mark as unavailable
// OnConnClosed used to notify that a connection is closed, the connection will be removed from the pool, if err is not nil, the remote endpoint will mark as unavailable, it's concurrency-safe.
OnConnClosed(conn gnet.Conn, err error)
// Close closes the pool
Close()
}

// NewConnPool news a EndpointRestrictedConnPool
func NewConnPool(initEndpoints []string, connsPerEndpoint, size int,
dialer Dialer, log logger.Logger) (EndpointRestrictedConnPool, error) {
dialer Dialer, log logger.Logger, maxConnLifetime time.Duration) (EndpointRestrictedConnPool, error) {
if len(initEndpoints) == 0 {
return nil, ErrInitEndpointEmpty
}
Expand Down Expand Up @@ -107,7 +126,8 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint, size int,
Multiplier: 2,
Randomization: 0.5,
},
closeCh: make(chan struct{}),
closeCh: make(chan struct{}),
maxConnLifetime: maxConnLifetime,
}

// store endpoints
Expand Down Expand Up @@ -143,7 +163,25 @@ type connPool struct {
backoff util.ExponentialBackoff
closeCh chan struct{}
closeOnce sync.Once
endpointConnCounts sync.Map // store the conn count of each endpoint
endpointConnCounts sync.Map // store the conn count of each endpoint
maxConnLifetime time.Duration // the max lifetime of a connection
}

func (p *connPool) expired(conn gnet.Conn) bool {
if conn == nil || p.maxConnLifetime <= 0 {
return false
}

ctx := conn.Context()
if ctx == nil {
return false
}

connCtx, ok := ctx.(ConnContext)
if !ok {
return false
}
return connCtx.CreatedAt.Add(p.maxConnLifetime).Before(time.Now())
}

func (p *connPool) Get() (gnet.Conn, error) {
Expand Down Expand Up @@ -204,7 +242,7 @@ func (p *connPool) newConn() (gnet.Conn, error) {

func (p *connPool) dialNewConn(ep string) (gnet.Conn, error) {
p.log.Debug("dialNewConn()")
conn, err := p.dialer.Dial(ep)
conn, err := p.dialer.Dial(ep, ConnContext{CreatedAt: time.Now(), Endpoint: ep})
if err != nil {
p.markUnavailable(ep)
return nil, err
Expand Down Expand Up @@ -278,6 +316,15 @@ func (p *connPool) put(conn gnet.Conn, err error, isNewConn bool) {
return
}

// if conn is expired, close it
if p.expired(conn) {
p.log.Debug("connection expired, close it, addr:", addr, ", err:", err)
CloseConn(conn, defaultConnCloseDelay)
// 关闭连接后,可用连接数变少,addr对应的节点的连接数可能也不均衡,尽管会递归调用当前函数,仍在这里追加创建新的连接
_ = p.appendNewConn(addr)
return
}

select {
case p.connChan <- conn:
// update the conn count
Expand Down Expand Up @@ -481,6 +528,17 @@ func (p *connPool) recoverAndRebalance() {
reBalanceTicker := time.NewTicker(defaultConnCloseDelay + 30*time.Second)
defer reBalanceTicker.Stop()

// clean expired conn every minute
var cleanExpiredConnTicker *time.Ticker
if p.maxConnLifetime > 0 {
cleanExpiredConnTicker = time.NewTicker(1 * time.Minute)
}
defer func() {
if cleanExpiredConnTicker != nil {
cleanExpiredConnTicker.Stop()
}
}()

for {
select {
case <-recoverTicker.C:
Expand All @@ -495,10 +553,79 @@ func (p *connPool) recoverAndRebalance() {
p.rebalance()
case <-p.closeCh:
return
default:
if cleanExpiredConnTicker != nil {
select {
case <-cleanExpiredConnTicker.C:
p.cleanExpiredConns()
default:
time.Sleep(time.Second)
}
}
}
}
}

func getRemoteAddr(conn gnet.Conn) string {
if conn == nil {
return ""
}

addr := conn.RemoteAddr()
if addr != nil {
return addr.String()
}
ctx := conn.Context()
if ctx == nil {
return ""
}

connCtx, ok := ctx.(ConnContext)
if !ok {
return ""
}
return connCtx.Endpoint
}

func (p *connPool) cleanExpiredConns() {
p.log.Debug("cleanExpiredConns()")
var leftConns []gnet.Conn
var expiredConns []gnet.Conn
loop:
for i := 0; i < cap(p.connChan); i++ {
select {
case conn := <-p.connChan:
if p.expired(conn) {
expiredConns = append(expiredConns, conn)
continue
}

// not the expired conn, put it back
leftConns = append(leftConns, conn)
default:
// no more conn, exit the loop
break loop
}
}

// put the conn back to the chan
for _, left := range leftConns {
select {
case p.connChan <- left:
default:
CloseConn(left, defaultConnCloseDelay)
}
}

// close the expired conn and append new conn with the same addr
for _, expired := range expiredConns {
addr := getRemoteAddr(expired)
p.log.Debug("connection expired, close it, addr:", addr, ", err:", nil)
CloseConn(expired, defaultConnCloseDelay)
_ = p.appendNewConn(addr)
}
}

func (p *connPool) dump() {
p.log.Debug("all endpoints:")
eps := p.endpoints.Load()
Expand Down Expand Up @@ -535,7 +662,7 @@ func (p *connPool) recover() bool {
}
if time.Since(lastUnavailable) > p.backoff.Next(retries) {
// try to create new conn
conn, err := p.dialer.Dial(key.(string))
conn, err := p.dialer.Dial(key.(string), ConnContext{CreatedAt: time.Now(), Endpoint: key.(string)})
if err == nil {
p.log.Debug("endpoint recovered, addr: ", key)
p.put(conn, nil, true)
Expand Down Expand Up @@ -658,15 +785,11 @@ func (p *connPool) rebalance() {

// add new conn
for i := currentCount; i < expectedConnPerEndpoint; i++ {
conn, err := p.dialNewConn(addr)
if err == nil {
p.log.Debug("adding connection for addr: ", addr)
p.put(conn, nil, true)
rebalanced = true
} else {
p.log.Warn("failed to add connection during rebalancing, addr: ", addr, ", err: ", err)
break
err := p.appendNewConn(addr)
if err != nil {
continue
}
rebalanced = true
}
} else if currentCount > expectedConnPerEndpoint {
rebalanced = true
Expand All @@ -682,15 +805,11 @@ func (p *connPool) rebalance() {
return true
}
for i := 0; i < expectedConnPerEndpoint; i++ {
conn, err := p.dialNewConn(addr)
if err == nil {
p.log.Debug("adding connection for addr: ", addr)
p.put(conn, nil, true)
rebalanced = true
} else {
p.log.Warn("failed to add connection during rebalancing, addr: ", addr, ", err: ", err)
break
err := p.appendNewConn(addr)
if err != nil {
continue
}
rebalanced = true
}
return true
})
Expand All @@ -700,6 +819,22 @@ func (p *connPool) rebalance() {
}
}

func (p *connPool) appendNewConn(addr string) error {
if addr == "" {
return errors.New("addr is empty")
}

conn, err := p.dialNewConn(addr)
if err != nil {
p.log.Warn("failed to add connection, addr: ", addr, ", err: ", err)
return err
}

p.log.Debug("adding connection for addr: ", addr)
p.put(conn, nil, true)
return nil
}

func (p *connPool) removeEndpointConn(addr string, count int) {
var leftConns []gnet.Conn
var removed int
Expand All @@ -713,7 +848,7 @@ loop:
}

if remoteAddr.String() == addr {
p.log.Info("reducing connection for addr: ", addr)
p.log.Debug("reducing connection for addr: ", addr)
// we do not decrease conn count here, if the frequence of rebalancing is less then defaultConnCloseDelay, may lead to an inaccurate expected conn count per endpoint
CloseConn(conn, defaultConnCloseDelay)
removed++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (c *client) initConns() error {

// minimum connection number per endpoint is 1
connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / float64(epLen)))
pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log)
pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log, c.options.MaxConnLifetime)
if err != nil {
return err
}
Expand All @@ -174,7 +174,7 @@ func (c *client) initConns() error {
}

func (c *client) initFramer() error {
framer, err := framer.NewLengthField(framer.LengthFieldCfg{
fr, err := framer.NewLengthField(framer.LengthFieldCfg{
MaxFrameLen: 64 * 1024,
FieldOffset: 0,
FieldLength: 4,
Expand All @@ -184,7 +184,7 @@ func (c *client) initFramer() error {
if err != nil {
return err
}
c.framer = framer
c.framer = fr
return nil
}

Expand All @@ -209,8 +209,8 @@ func (c *client) initWorkers() error {
return nil
}

func (c *client) Dial(addr string) (gnet.Conn, error) {
return c.netClient.Dial("tcp", addr)
func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) {
return c.netClient.DialContext("tcp", addr, ctx)
}

func (c *client) Send(ctx context.Context, msg Message) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"

"github.com/prometheus/client_golang/prometheus"

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -78,6 +79,7 @@ type Options struct {
AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of the AddColumns, just a cache, used internal
Auth Auth // dataproxy authentication interface
MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers)
}

// ValidateAndSetDefault validates an options and set up the default values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,10 @@ func WithAuth(auth Auth) Option {
o.Auth = auth
}
}

// WithMaxConnLifetime sets MaxConnLifetime
func WithMaxConnLifetime(lifetime time.Duration) Option {
return func(o *Options) {
o.MaxConnLifetime = lifetime
}
}
Loading