Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-20.0] pool: reopen connection closed by idle timeout (#17818) #17830

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ type ConnPool[C Connection] struct {
connect Connector[C]
// refresh is the callback to check whether the pool needs to be refreshed
refresh RefreshCheck

// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
Expand Down Expand Up @@ -380,6 +379,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {

if conn == nil {
var err error
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
conn, err = pool.connNew(context.Background())
if err != nil {
pool.closedConn()
Expand All @@ -392,6 +393,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
pool.Metrics.maxLifetimeClosed.Add(1)
conn.Close()
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
pool.closedConn()
return
Expand Down Expand Up @@ -455,15 +458,22 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime)))
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error {
var err error
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) (err error) {
dbconn.Conn, err = pool.config.connect(ctx)
if err != nil {
return err
}

dbconn.timeUsed.set(now)
if setting := dbconn.Conn.Setting(); setting != nil {
err = dbconn.Conn.ApplySetting(ctx, setting)
if err != nil {
dbconn.Close()
return err
}
}

dbconn.timeCreated.set(now)
dbconn.timeUsed.set(now)
return nil
}

Expand Down Expand Up @@ -720,7 +730,11 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
pool.closedConn()
}
}
}
}
Expand Down
73 changes: 59 additions & 14 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ var (

type TestState struct {
lastID, open, close, reset atomic.Int64
waits []time.Time
mu sync.Mutex

chaos struct {
waits []time.Time
chaos struct {
delayConnect time.Duration
failConnect bool
failConnect atomic.Bool
failApply bool
}
}
Expand Down Expand Up @@ -109,7 +108,7 @@ func newConnector(state *TestState) Connector[*TestConn] {
if state.chaos.delayConnect != 0 {
time.Sleep(state.chaos.delayConnect)
}
if state.chaos.failConnect {
if state.chaos.failConnect.Load() {
return nil, fmt.Errorf("failed to connect: forced failure")
}
return &TestConn{
Expand Down Expand Up @@ -586,6 +585,45 @@ func TestUserClosing(t *testing.T) {
}
}

func TestConnReopen(t *testing.T) {
var state TestState

p := NewPool(&Config[*TestConn]{
Capacity: 1,
IdleTimeout: 200 * time.Millisecond,
MaxLifetime: 10 * time.Millisecond,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

defer p.Close()

conn, err := p.Get(context.Background(), nil)
require.NoError(t, err)
assert.EqualValues(t, 1, state.lastID.Load())
assert.EqualValues(t, 1, p.Active())

// wait enough to reach maxlifetime.
time.Sleep(50 * time.Millisecond)

p.put(conn)
assert.EqualValues(t, 2, state.lastID.Load())
assert.EqualValues(t, 1, p.Active())

// wait enough to reach idle timeout.
time.Sleep(300 * time.Millisecond)
assert.GreaterOrEqual(t, state.lastID.Load(), int64(3))
assert.EqualValues(t, 1, p.Active())
assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(1))

// mark connect to fail
state.chaos.failConnect.Store(true)
// wait enough to reach idle timeout and connect to fail.
time.Sleep(300 * time.Millisecond)
// no active connection should be left.
assert.Zero(t, p.Active())

}

func TestIdleTimeout(t *testing.T) {
testTimeout := func(t *testing.T, setting *Setting) {
var state TestState
Expand All @@ -608,6 +646,7 @@ func TestIdleTimeout(t *testing.T) {

conns = append(conns, r)
}
assert.GreaterOrEqual(t, state.open.Load(), int64(5))

// wait a long while; ensure that none of the conns have been closed
time.Sleep(1 * time.Second)
Expand All @@ -628,9 +667,15 @@ func TestIdleTimeout(t *testing.T) {
t.Fatalf("Connections remain open after 1 second")
}
}
// At least 5 connections should have been closed by now.
assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(5), "At least 5 connections should have been closed by now.")

// At any point, at least 4 connections should be open, with 1 either in the process of opening or already opened.
// The idle connection closer shuts down one connection at a time.
assert.GreaterOrEqual(t, state.open.Load(), int64(4))

// no need to assert anything: all the connections in the pool should are idle-closed
// now and if they're not the test will timeout and fail
// The number of available connections in the pool should remain at 5.
assert.EqualValues(t, 5, p.Available())
}

t.Run("WithoutSettings", func(t *testing.T) { testTimeout(t, nil) })
Expand All @@ -656,7 +701,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
// Change the factory before putting back
// to prevent race with the idle closer, who will
// try to use it.
state.chaos.failConnect = true
state.chaos.failConnect.Store(true)
p.put(r)
timeout := time.After(1 * time.Second)
for p.Active() != 0 {
Expand All @@ -667,7 +712,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
}
}
// reset factory for next run.
state.chaos.failConnect = false
state.chaos.failConnect.Store(false)
}
}

Expand Down Expand Up @@ -758,7 +803,7 @@ func TestExtendedLifetimeTimeout(t *testing.T) {

func TestCreateFail(t *testing.T) {
var state TestState
state.chaos.failConnect = true
state.chaos.failConnect.Store(true)

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Expand Down Expand Up @@ -805,12 +850,12 @@ func TestCreateFailOnPut(t *testing.T) {
require.NoError(t, err)

// change factory to fail the put.
state.chaos.failConnect = true
state.chaos.failConnect.Store(true)
p.put(nil)
assert.Zero(t, p.Active())

// change back for next iteration.
state.chaos.failConnect = false
state.chaos.failConnect.Store(false)
}
}

Expand All @@ -828,7 +873,7 @@ func TestSlowCreateFail(t *testing.T) {
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

state.chaos.failConnect = true
state.chaos.failConnect.Store(true)

for i := 0; i < 3; i++ {
go func() {
Expand All @@ -847,7 +892,7 @@ func TestSlowCreateFail(t *testing.T) {
default:
}

state.chaos.failConnect = false
state.chaos.failConnect.Store(false)
conn, err := p.Get(ctx, setting)
require.NoError(t, err)

Expand Down
Loading