From 8b5afc3e38fc10db0ea4074c9d5d53cbb787906d Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 17 Feb 2025 20:51:16 -0600 Subject: [PATCH 1/5] limit number of conns in TestDB pools --- internal/riverinternaltest/riverinternaltest.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index fe7808f1..3f43f1c0 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -282,8 +282,14 @@ func TruncateRiverTables(ctx context.Context, pool *pgxpool.Pool) error { // amongst all packages. e.g. Configures a manager for test databases on setup, // and checks for no goroutine leaks on teardown. func WrapTestMain(m *testing.M) { + poolConfig := DatabaseConfig("river_test") + // Use a smaller number of conns per pool, because otherwise we could have + // NUM_CPU pools, each with NUM_CPU connections, and that's a lot of + // connections if there are many CPUs. + poolConfig.MaxConns = 4 + var err error - dbManager, err = testdb.NewManager(DatabaseConfig("river_test"), dbPoolMaxConns, nil, TruncateRiverTables) + dbManager, err = testdb.NewManager(poolConfig, int32(runtime.GOMAXPROCS(0)), nil, TruncateRiverTables) //nolint:gosec if err != nil { log.Fatal(err) } From 4153a4e6d3edef7c94b6196bf65b9d3c10a432a4 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 17 Feb 2025 21:04:29 -0600 Subject: [PATCH 2/5] max of GOMAXPROCS dbs from testdbman --- internal/cmd/testdbman/main.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/cmd/testdbman/main.go b/internal/cmd/testdbman/main.go index f1266511..931fc8f5 100644 --- a/internal/cmd/testdbman/main.go +++ b/internal/cmd/testdbman/main.go @@ -139,11 +139,7 @@ func createTestDatabases(ctx context.Context, out io.Writer) error { return nil } - // This is the same default as pgxpool's maximum number of connections - // when not specified -- either 4 or the number of CPUs, whichever is - // greater. If changing this number, also change the similar value in - // `riverinternaltest` where it's duplicated. - dbNames := generateTestDBNames(max(4, runtime.NumCPU())) + dbNames := generateTestDBNames(runtime.GOMAXPROCS(0)) for _, dbName := range dbNames { if err := createDBAndMigrate(dbName); err != nil { From 8372cbc418d6883ca83215e72af447170b5e8a46 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 17 Feb 2025 21:04:44 -0600 Subject: [PATCH 3/5] minimum of 1 conn per testdb pool --- internal/riverinternaltest/riverinternaltest.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 3f43f1c0..9ac00ebe 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -287,6 +287,8 @@ func WrapTestMain(m *testing.M) { // NUM_CPU pools, each with NUM_CPU connections, and that's a lot of // connections if there are many CPUs. poolConfig.MaxConns = 4 + // Pre-initialize 1 connection per pool. + poolConfig.MinConns = 1 var err error dbManager, err = testdb.NewManager(poolConfig, int32(runtime.GOMAXPROCS(0)), nil, TruncateRiverTables) //nolint:gosec From b7a71693b57357ebba847b0b9ac5d32a05989531 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 15 Feb 2025 13:57:04 -0600 Subject: [PATCH 4/5] debug logging for testdb.Manager --- internal/testdb/db_with_pool.go | 3 +++ internal/testdb/manager.go | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/internal/testdb/db_with_pool.go b/internal/testdb/db_with_pool.go index 8f720117..709a1dbc 100644 --- a/internal/testdb/db_with_pool.go +++ b/internal/testdb/db_with_pool.go @@ -28,6 +28,7 @@ func (db *DBWithPool) Release() { } func (db *DBWithPool) release() { + db.logger.Debug("DBWithPool: release called", "dbName", db.dbName) // Close and recreate the connection pool for 2 reasons: // 1. ensure tests don't hold on to connections // 2. If a test happens to close the pool as a matter of course (i.e. as part of a defer) @@ -45,11 +46,13 @@ func (db *DBWithPool) release() { db.res.Value().pool = newPgxPool if db.manager.cleanup != nil { + db.logger.Debug("DBWithPool: release calling cleanup", "dbName", db.dbName) if err := db.manager.cleanup(ctx, newPgxPool); err != nil { db.logger.Error("testdb.DBWithPool: Error during release cleanup", "err", err) db.res.Destroy() return } + db.logger.Debug("DBWithPool: release done with cleanup", "dbName", db.dbName) } // Finally this resource is ready to be reused: diff --git a/internal/testdb/manager.go b/internal/testdb/manager.go index c2335021..d358a84f 100644 --- a/internal/testdb/manager.go +++ b/internal/testdb/manager.go @@ -69,10 +69,12 @@ func NewManager(config *pgxpool.Config, maxPoolSize int32, prepare PrepareFunc, // Acquire returns a DBWithPool which contains a pgxpool.Pool. The DBWithPool // must be released after use. func (m *Manager) Acquire(ctx context.Context) (*DBWithPool, error) { + m.logger.Debug("DBManager: Acquire called") res, err := m.pud.Acquire(ctx) if err != nil { return nil, err } + m.logger.Debug("DBManager: Acquire returned pool", "pool", res.Value().pool, "error", err, "dbName", res.Value().dbName) return &DBWithPool{res: res, logger: m.logger, manager: m, dbName: res.Value().dbName}, nil } @@ -80,7 +82,9 @@ func (m *Manager) Acquire(ctx context.Context) (*DBWithPool, error) { // Close closes the Manager and all of its databases + pools. It blocks until // all those underlying resources are unused and closed. func (m *Manager) Close() { + m.logger.Debug("DBManager: Close called") m.pud.Close() + m.logger.Debug("DBManager: Close returned") } func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { @@ -98,17 +102,21 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { } if m.cleanup != nil { + m.logger.Debug("DBManager: allocatePool calling cleanup", "dbName", dbName) if err := m.cleanup(ctx, pgxp); err != nil { pgxp.Close() return nil, fmt.Errorf("error during cleanup: %w", err) } + m.logger.Debug("DBManager: allocatePool cleanup returned", "dbName", dbName) } if m.prepare != nil { + m.logger.Debug("DBManager: allocatePool calling prepare", "dbName", dbName) if err = m.prepare(ctx, pgxp); err != nil { pgxp.Close() return nil, fmt.Errorf("error during prepare: %w", err) } + m.logger.Debug("DBManager: allocatePool prepare returned", "dbName", dbName) } return &poolWithDBName{ @@ -120,7 +128,9 @@ func (m *Manager) allocatePool(ctx context.Context) (*poolWithDBName, error) { func (m *Manager) closePool(pwn *poolWithDBName) { // Close the pool so that there are no active connections on the database: + m.logger.Debug("DBManager: closePool called", "pool", pwn.pool, "dbName", pwn.dbName) pwn.pool.Close() + m.logger.Debug("DBManager: closePool returned") } func (m *Manager) getNextDBNum() int { From c1f5b53bdd5e7fb7d65e0893485da70f2149e6bb Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 15 Feb 2025 14:49:59 -0600 Subject: [PATCH 5/5] only re-create db pool if it's already closed --- .../riverinternaltest/riverinternaltest.go | 6 ++-- internal/testdb/db_with_pool.go | 30 +++++++++++-------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 9ac00ebe..a62acde3 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -160,7 +160,8 @@ func DrainContinuously[T any](drainChan <-chan T) func() []T { // TestDB acquires a dedicated test database for the duration of the test. If an // error occurs, the test fails. The test database will be automatically -// returned to the pool at the end of the test and the pgxpool will be closed. +// returned to the pool at the end of the test. If the pool was closed, it will +// be recreated. func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { tb.Helper() @@ -173,9 +174,6 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { } tb.Cleanup(testPool.Release) - // Also close the pool just to ensure nothing is still active on it: - tb.Cleanup(testPool.Pool().Close) - return testPool.Pool() } diff --git a/internal/testdb/db_with_pool.go b/internal/testdb/db_with_pool.go index 709a1dbc..1a2dee5f 100644 --- a/internal/testdb/db_with_pool.go +++ b/internal/testdb/db_with_pool.go @@ -2,6 +2,7 @@ package testdb import ( "context" + "errors" "log/slog" "sync" "time" @@ -29,28 +30,33 @@ func (db *DBWithPool) Release() { func (db *DBWithPool) release() { db.logger.Debug("DBWithPool: release called", "dbName", db.dbName) - // Close and recreate the connection pool for 2 reasons: - // 1. ensure tests don't hold on to connections - // 2. If a test happens to close the pool as a matter of course (i.e. as part of a defer) - // then we don't reuse a closed pool. - db.res.Value().pool.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config) - if err != nil { - db.res.Destroy() - return + if err := db.res.Value().pool.Ping(ctx); err != nil { + // If the pgx pool is already closed, Ping returns puddle.ErrClosedPool. + // When this happens, we need to re-create the pool. + if errors.Is(err, puddle.ErrClosedPool) { + db.logger.Debug("DBWithPool: pool is closed, re-creating", "dbName", db.dbName) + + newPgxPool, err := pgxpool.NewWithConfig(ctx, db.res.Value().config) + if err != nil { + db.res.Destroy() + return + } + db.res.Value().pool = newPgxPool + } else { + // Log any other ping error but proceed with cleanup. + db.logger.Debug("DBWithPool: pool ping returned error", "dbName", db.dbName, "err", err) + } } - db.res.Value().pool = newPgxPool if db.manager.cleanup != nil { db.logger.Debug("DBWithPool: release calling cleanup", "dbName", db.dbName) - if err := db.manager.cleanup(ctx, newPgxPool); err != nil { + if err := db.manager.cleanup(ctx, db.res.Value().pool); err != nil { db.logger.Error("testdb.DBWithPool: Error during release cleanup", "err", err) db.res.Destroy() - return } db.logger.Debug("DBWithPool: release done with cleanup", "dbName", db.dbName) }