Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test DB pool fixes #769

Merged
merged 5 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions internal/cmd/testdbman/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ func createTestDatabases(ctx context.Context, out io.Writer) error {
return nil
}

// This is the same default as pgxpool's maximum number of connections
// when not specified -- either 4 or the number of CPUs, whichever is
// greater. If changing this number, also change the similar value in
// `riverinternaltest` where it's duplicated.
dbNames := generateTestDBNames(max(4, runtime.NumCPU()))
dbNames := generateTestDBNames(runtime.GOMAXPROCS(0))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we actually want to rely on this setting since it could be removed someday. However it's at least somewhat dynamic and can be overridden, whereas runtime.NumCPU() is fixed. My thought was this option is just slightly more reliable if somebody tweaks the GOMAXPROCS and still wants things to work here.

Not set on it though.


for _, dbName := range dbNames {
if err := createDBAndMigrate(dbName); err != nil {
Expand Down
16 changes: 11 additions & 5 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func DrainContinuously[T any](drainChan <-chan T) func() []T {

// TestDB acquires a dedicated test database for the duration of the test. If an
// error occurs, the test fails. The test database will be automatically
// returned to the pool at the end of the test and the pgxpool will be closed.
// returned to the pool at the end of the test. If the pool was closed, it will
// be recreated.
func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool {
tb.Helper()

Expand All @@ -173,9 +174,6 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool {
}
tb.Cleanup(testPool.Release)

// Also close the pool just to ensure nothing is still active on it:
tb.Cleanup(testPool.Pool().Close)

return testPool.Pool()
}

Expand Down Expand Up @@ -282,8 +280,16 @@ func TruncateRiverTables(ctx context.Context, pool *pgxpool.Pool) error {
// amongst all packages. e.g. Configures a manager for test databases on setup,
// and checks for no goroutine leaks on teardown.
func WrapTestMain(m *testing.M) {
poolConfig := DatabaseConfig("river_test")
// Use a smaller number of conns per pool, because otherwise we could have
// NUM_CPU pools, each with NUM_CPU connections, and that's a lot of
// connections if there are many CPUs.
Comment on lines +284 to +286
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually realized this issue because in some of my tests I was hitting the 100 conn limit on my local Postgres, at least once I stopped closing every pool after every test. 14 cores * 14 conns per pool = 196 potential conns.

I don't think any given test ever needs to use more than 4 in the pool.

poolConfig.MaxConns = 4
// Pre-initialize 1 connection per pool.
poolConfig.MinConns = 1

var err error
dbManager, err = testdb.NewManager(DatabaseConfig("river_test"), dbPoolMaxConns, nil, TruncateRiverTables)
dbManager, err = testdb.NewManager(poolConfig, int32(runtime.GOMAXPROCS(0)), nil, TruncateRiverTables) //nolint:gosec
if err != nil {
log.Fatal(err)
}
Expand Down
33 changes: 21 additions & 12 deletions internal/testdb/db_with_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testdb

import (
"context"
"errors"
"log/slog"
"sync"
"time"
Expand All @@ -28,28 +29,36 @@ func (db *DBWithPool) Release() {
}

func (db *DBWithPool) release() {
// Close and recreate the connection pool for 2 reasons:
// 1. ensure tests don't hold on to connections
// 2. If a test happens to close the pool as a matter of course (i.e. as part of a defer)
// then we don't reuse a closed pool.
db.res.Value().pool.Close()
db.logger.Debug("DBWithPool: release called", "dbName", db.dbName)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config)
if err != nil {
db.res.Destroy()
return
if err := db.res.Value().pool.Ping(ctx); err != nil {
// If the pgx pool is already closed, Ping returns puddle.ErrClosedPool.
// When this happens, we need to re-create the pool.
if errors.Is(err, puddle.ErrClosedPool) {
db.logger.Debug("DBWithPool: pool is closed, re-creating", "dbName", db.dbName)

newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config)
if err != nil {
db.res.Destroy()
return
}
db.res.Value().pool = newPgxPool
} else {
// Log any other ping error but proceed with cleanup.
db.logger.Debug("DBWithPool: pool ping returned error", "dbName", db.dbName, "err", err)
}
}
db.res.Value().pool = newPgxPool

if db.manager.cleanup != nil {
if err := db.manager.cleanup(ctx, newPgxPool); err != nil {
db.logger.Debug("DBWithPool: release calling cleanup", "dbName", db.dbName)
if err := db.manager.cleanup(ctx, db.res.Value().pool); err != nil {
db.logger.Error("testdb.DBWithPool: Error during release cleanup", "err", err)
db.res.Destroy()
return
}
db.logger.Debug("DBWithPool: release done with cleanup", "dbName", db.dbName)
}

// Finally this resource is ready to be reused:
Expand Down
10 changes: 10 additions & 0 deletions internal/testdb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ func NewManager(config *pgxpool.Config, maxPoolSize int32, prepare PrepareFunc,
// Acquire returns a DBWithPool which contains a pgxpool.Pool. The DBWithPool
// must be released after use.
func (m *Manager) Acquire(ctx context.Context) (*DBWithPool, error) {
m.logger.Debug("DBManager: Acquire called")
res, err := m.pud.Acquire(ctx)
if err != nil {
return nil, err
}
m.logger.Debug("DBManager: Acquire returned pool", "pool", res.Value().pool, "error", err, "dbName", res.Value().dbName)

return &DBWithPool{res: res, logger: m.logger, manager: m, dbName: res.Value().dbName}, nil
}

// Close closes the Manager and all of its databases + pools. It blocks until
// all those underlying resources are unused and closed.
func (m *Manager) Close() {
m.logger.Debug("DBManager: Close called")
m.pud.Close()
m.logger.Debug("DBManager: Close returned")
}

func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {
Expand All @@ -98,17 +102,21 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {
}

if m.cleanup != nil {
m.logger.Debug("DBManager: allocatePool calling cleanup", "dbName", dbName)
if err := m.cleanup(ctx, pgxp); err != nil {
pgxp.Close()
return nil, fmt.Errorf("error during cleanup: %w", err)
}
m.logger.Debug("DBManager: allocatePool cleanup returned", "dbName", dbName)
}

if m.prepare != nil {
m.logger.Debug("DBManager: allocatePool calling prepare", "dbName", dbName)
if err = m.prepare(ctx, pgxp); err != nil {
pgxp.Close()
return nil, fmt.Errorf("error during prepare: %w", err)
}
m.logger.Debug("DBManager: allocatePool prepare returned", "dbName", dbName)
}

return &poolWithDBName{
Expand All @@ -120,7 +128,9 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) {

func (m *Manager) closePool(pwn *poolWithDBName) {
// Close the pool so that there are no active connections on the database:
m.logger.Debug("DBManager: closePool called", "pool", pwn.pool, "dbName", pwn.dbName)
pwn.pool.Close()
m.logger.Debug("DBManager: closePool returned")
}

func (m *Manager) getNextDBNum() int {
Expand Down
Loading