diff --git a/internal/cmd/testdbman/main.go b/internal/cmd/testdbman/main.go index f1266511..931fc8f5 100644 --- a/internal/cmd/testdbman/main.go +++ b/internal/cmd/testdbman/main.go @@ -139,11 +139,7 @@ func createTestDatabases(ctx context.Context, out io.Writer) error { return nil } - // This is the same default as pgxpool's maximum number of connections - // when not specified -- either 4 or the number of CPUs, whichever is - // greater. If changing this number, also change the similar value in - // `riverinternaltest` where it's duplicated. - dbNames := generateTestDBNames(max(4, runtime.NumCPU())) + dbNames := generateTestDBNames(runtime.GOMAXPROCS(0)) for _, dbName := range dbNames { if err := createDBAndMigrate(dbName); err != nil { diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index fe7808f1..a62acde3 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -160,7 +160,8 @@ func DrainContinuously[T any](drainChan <-chan T) func() []T { // TestDB acquires a dedicated test database for the duration of the test. If an // error occurs, the test fails. The test database will be automatically -// returned to the pool at the end of the test and the pgxpool will be closed. +// returned to the pool at the end of the test. If the pool was closed, it will +// be recreated. func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { tb.Helper() @@ -173,9 +174,6 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { } tb.Cleanup(testPool.Release) - // Also close the pool just to ensure nothing is still active on it: - tb.Cleanup(testPool.Pool().Close) - return testPool.Pool() } @@ -282,8 +280,16 @@ func TruncateRiverTables(ctx context.Context, pool *pgxpool.Pool) error { // amongst all packages. e.g. Configures a manager for test databases on setup, // and checks for no goroutine leaks on teardown. func WrapTestMain(m *testing.M) { + poolConfig := DatabaseConfig("river_test") + // Use a smaller number of conns per pool, because otherwise we could have + // NUM_CPU pools, each with NUM_CPU connections, and that's a lot of + // connections if there are many CPUs. + poolConfig.MaxConns = 4 + // Pre-initialize 1 connection per pool. + poolConfig.MinConns = 1 + var err error - dbManager, err = testdb.NewManager(DatabaseConfig("river_test"), dbPoolMaxConns, nil, TruncateRiverTables) + dbManager, err = testdb.NewManager(poolConfig, int32(runtime.GOMAXPROCS(0)), nil, TruncateRiverTables) //nolint:gosec if err != nil { log.Fatal(err) } diff --git a/internal/testdb/db_with_pool.go b/internal/testdb/db_with_pool.go index 8f720117..1a2dee5f 100644 --- a/internal/testdb/db_with_pool.go +++ b/internal/testdb/db_with_pool.go @@ -2,6 +2,7 @@ package testdb import ( "context" + "errors" "log/slog" "sync" "time" @@ -28,28 +29,36 @@ func (db *DBWithPool) Release() { } func (db *DBWithPool) release() { - // Close and recreate the connection pool for 2 reasons: - // 1. ensure tests don't hold on to connections - // 2. If a test happens to close the pool as a matter of course (i.e. as part of a defer) - // then we don't reuse a closed pool. - db.res.Value().pool.Close() + db.logger.Debug("DBWithPool: release called", "dbName", db.dbName) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config) - if err != nil { - db.res.Destroy() - return + if err := db.res.Value().pool.Ping(ctx); err != nil { + // If the pgx pool is already closed, Ping returns puddle.ErrClosedPool. + // When this happens, we need to re-create the pool. + if errors.Is(err, puddle.ErrClosedPool) { + db.logger.Debug("DBWithPool: pool is closed, re-creating", "dbName", db.dbName) + + newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config) + if err != nil { + db.res.Destroy() + return + } + db.res.Value().pool = newPgxPool + } else { + // Log any other ping error but proceed with cleanup. + db.logger.Debug("DBWithPool: pool ping returned error", "dbName", db.dbName, "err", err) + } } - db.res.Value().pool = newPgxPool if db.manager.cleanup != nil { - if err := db.manager.cleanup(ctx, newPgxPool); err != nil { + db.logger.Debug("DBWithPool: release calling cleanup", "dbName", db.dbName) + if err := db.manager.cleanup(ctx, db.res.Value().pool); err != nil { db.logger.Error("testdb.DBWithPool: Error during release cleanup", "err", err) db.res.Destroy() - return } + db.logger.Debug("DBWithPool: release done with cleanup", "dbName", db.dbName) } // Finally this resource is ready to be reused: diff --git a/internal/testdb/manager.go b/internal/testdb/manager.go index c2335021..d358a84f 100644 --- a/internal/testdb/manager.go +++ b/internal/testdb/manager.go @@ -69,10 +69,12 @@ func NewManager(config *pgxpool.Config, maxPoolSize int32, prepare PrepareFunc, // Acquire returns a DBWithPool which contains a pgxpool.Pool. The DBWithPool // must be released after use. func (m *Manager) Acquire(ctx context.Context) (*DBWithPool, error) { + m.logger.Debug("DBManager: Acquire called") res, err := m.pud.Acquire(ctx) if err != nil { return nil, err } + m.logger.Debug("DBManager: Acquire returned pool", "pool", res.Value().pool, "error", err, "dbName", res.Value().dbName) return &DBWithPool{res: res, logger: m.logger, manager: m, dbName: res.Value().dbName}, nil } @@ -80,7 +82,9 @@ func (m *Manager) Acquire(ctx context.Context) (*DBWithPool, error) { // Close closes the Manager and all of its databases + pools. It blocks until // all those underlying resources are unused and closed. func (m *Manager) Close() { + m.logger.Debug("DBManager: Close called") m.pud.Close() + m.logger.Debug("DBManager: Close returned") } func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { @@ -98,17 +102,21 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { } if m.cleanup != nil { + m.logger.Debug("DBManager: allocatePool calling cleanup", "dbName", dbName) if err := m.cleanup(ctx, pgxp); err != nil { pgxp.Close() return nil, fmt.Errorf("error during cleanup: %w", err) } + m.logger.Debug("DBManager: allocatePool cleanup returned", "dbName", dbName) } if m.prepare != nil { + m.logger.Debug("DBManager: allocatePool calling prepare", "dbName", dbName) if err = m.prepare(ctx, pgxp); err != nil { pgxp.Close() return nil, fmt.Errorf("error during prepare: %w", err) } + m.logger.Debug("DBManager: allocatePool prepare returned", "dbName", dbName) } return &poolWithDBName{ @@ -120,7 +128,9 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { func (m *Manager) closePool(pwn *poolWithDBName) { // Close the pool so that there are no active connections on the database: + m.logger.Debug("DBManager: closePool called", "pool", pwn.pool, "dbName", pwn.dbName) pwn.pool.Close() + m.logger.Debug("DBManager: closePool returned") } func (m *Manager) getNextDBNum() int {