Skip to content

Commit

Permalink
Test DB pool fixes (#769)
Browse files Browse the repository at this point in the history
* limit number of conns in TestDB pools

* max of GOMAXPROCS dbs from testdbman

* minimum of 1 conn per testdb pool

* debug logging for testdb.Manager

* only re-create db pool if it's already closed
  • Loading branch information
bgentry authored Feb 18, 2025
1 parent ded1d06 commit fb6b3e9
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 22 deletions.
6 changes: 1 addition & 5 deletions internal/cmd/testdbman/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ func createTestDatabases(ctx context.Context, out io.Writer) error {
return nil
}

// This is the same default as pgxpool's maximum number of connections
// when not specified -- either 4 or the number of CPUs, whichever is
// greater. If changing this number, also change the similar value in
// `riverinternaltest` where it's duplicated.
dbNames := generateTestDBNames(max(4, runtime.NumCPU()))
dbNames := generateTestDBNames(runtime.GOMAXPROCS(0))

for _, dbName := range dbNames {
if err := createDBAndMigrate(dbName); err != nil {
Expand Down
16 changes: 11 additions & 5 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func DrainContinuously[T any](drainChan <-chan T) func() []T {

// TestDB acquires a dedicated test database for the duration of the test. If an
// error occurs, the test fails. The test database will be automatically
// returned to the pool at the end of the test and the pgxpool will be closed.
// returned to the pool at the end of the test. If the pool was closed, it will
// be recreated.
func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool {
tb.Helper()

Expand All @@ -173,9 +174,6 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool {
}
tb.Cleanup(testPool.Release)

// Also close the pool just to ensure nothing is still active on it:
tb.Cleanup(testPool.Pool().Close)

return testPool.Pool()
}

Expand Down Expand Up @@ -282,8 +280,16 @@ func TruncateRiverTables(ctx context.Context, pool *pgxpool.Pool) error {
// amongst all packages. e.g. Configures a manager for test databases on setup,
// and checks for no goroutine leaks on teardown.
func WrapTestMain(m *testing.M) {
poolConfig := DatabaseConfig("river_test")
// Use a smaller number of conns per pool, because otherwise we could have
// NUM_CPU pools, each with NUM_CPU connections, and that's a lot of
// connections if there are many CPUs.
poolConfig.MaxConns = 4
// Pre-initialize 1 connection per pool.
poolConfig.MinConns = 1

var err error
dbManager, err = testdb.NewManager(DatabaseConfig("river_test"), dbPoolMaxConns, nil, TruncateRiverTables)
dbManager, err = testdb.NewManager(poolConfig, int32(runtime.GOMAXPROCS(0)), nil, TruncateRiverTables) //nolint:gosec
if err != nil {
log.Fatal(err)
}
Expand Down
33 changes: 21 additions & 12 deletions internal/testdb/db_with_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testdb

import (
"context"
"errors"
"log/slog"
"sync"
"time"
Expand All @@ -28,28 +29,36 @@ func (db *DBWithPool) Release() {
}

func (db *DBWithPool) release() {
// Close and recreate the connection pool for 2 reasons:
// 1. ensure tests don't hold on to connections
// 2. If a test happens to close the pool as a matter of course (i.e. as part of a defer)
// then we don't reuse a closed pool.
db.res.Value().pool.Close()
db.logger.Debug("DBWithPool: release called", "dbName", db.dbName)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config)
if err != nil {
db.res.Destroy()
return
if err := db.res.Value().pool.Ping(ctx); err != nil {
// If the pgx pool is already closed, Ping returns puddle.ErrClosedPool.
// When this happens, we need to re-create the pool.
if errors.Is(err, puddle.ErrClosedPool) {
db.logger.Debug("DBWithPool: pool is closed, re-creating", "dbName", db.dbName)

newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config)
if err != nil {
db.res.Destroy()
return
}
db.res.Value().pool = newPgxPool
} else {
// Log any other ping error but proceed with cleanup.
db.logger.Debug("DBWithPool: pool ping returned error", "dbName", db.dbName, "err", err)
}
}
db.res.Value().pool = newPgxPool

if db.manager.cleanup != nil {
if err := db.manager.cleanup(ctx, newPgxPool); err != nil {
db.logger.Debug("DBWithPool: release calling cleanup", "dbName", db.dbName)
if err := db.manager.cleanup(ctx, db.res.Value().pool); err != nil {
db.logger.Error("testdb.DBWithPool: Error during release cleanup", "err", err)
db.res.Destroy()
return
}
db.logger.Debug("DBWithPool: release done with cleanup", "dbName", db.dbName)
}

// Finally this resource is ready to be reused:
Expand Down
10 changes: 10 additions & 0 deletions internal/testdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ func NewManager(config *pgxpool.Config, maxPoolSize int32, prepare PrepareFunc,
// Acquire returns a DBWithPool which contains a pgxpool.Pool. The DBWithPool
// must be released after use.
func (m *Manager) Acquire(ctx context.Context) (*DBWithPool, error) {
m.logger.Debug("DBManager: Acquire called")
res, err := m.pud.Acquire(ctx)
if err != nil {
return nil, err
}
m.logger.Debug("DBManager: Acquire returned pool", "pool", res.Value().pool, "error", err, "dbName", res.Value().dbName)

return &DBWithPool{res: res, logger: m.logger, manager: m, dbName: res.Value().dbName}, nil
}

// Close closes the Manager and all of its databases + pools. It blocks until
// all those underlying resources are unused and closed.
func (m *Manager) Close() {
m.logger.Debug("DBManager: Close called")
m.pud.Close()
m.logger.Debug("DBManager: Close returned")
}

func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {
Expand All @@ -98,17 +102,21 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {
}

if m.cleanup != nil {
m.logger.Debug("DBManager: allocatePool calling cleanup", "dbName", dbName)
if err := m.cleanup(ctx, pgxp); err != nil {
pgxp.Close()
return nil, fmt.Errorf("error during cleanup: %w", err)
}
m.logger.Debug("DBManager: allocatePool cleanup returned", "dbName", dbName)
}

if m.prepare != nil {
m.logger.Debug("DBManager: allocatePool calling prepare", "dbName", dbName)
if err = m.prepare(ctx, pgxp); err != nil {
pgxp.Close()
return nil, fmt.Errorf("error during prepare: %w", err)
}
m.logger.Debug("DBManager: allocatePool prepare returned", "dbName", dbName)
}

return &poolWithDBName{
Expand All @@ -120,7 +128,9 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {

func (m *Manager) closePool(pwn *poolWithDBName) {
// Close the pool so that there are no active connections on the database:
m.logger.Debug("DBManager: closePool called", "pool", pwn.pool, "dbName", pwn.dbName)
pwn.pool.Close()
m.logger.Debug("DBManager: closePool returned")
}

func (m *Manager) getNextDBNum() int {
Expand Down

0 comments on commit fb6b3e9

Please sign in to comment.