Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgxpool: Don't use idle connections with stdlib when doing pooling #4137

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ func NewApp(c Config, pool *pgxpool.Pool) (*App, error) {
}
}

app.db.SetMaxIdleConns(c.DBMaxIdle)
app.db.SetMaxOpenConns(c.DBMaxOpen)

app.mgr = lifecycle.NewManager(app._Run, app._Shutdown)
err = app.mgr.SetStartupFunc(app.startup)
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions app/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,24 @@ Available Flags:

var pool *pgxpool.Pool
if cfg.DBURLNext != "" {
err = migrate.VerifyIsLatest(ctx, cfg.DBURL)
if err != nil {
return errors.Wrap(err, "verify db")
}

err = doMigrations(cfg.DBURLNext)
if err != nil {
return errors.Wrap(err, "nextdb")
}

mgr, err := swo.NewManager(swo.Config{OldDBURL: cfg.DBURL, NewDBURL: cfg.DBURLNext, CanExec: !cfg.APIOnly, Logger: cfg.LegacyLogger})
mgr, err := swo.NewManager(swo.Config{
OldDBURL: cfg.DBURL,
NewDBURL: cfg.DBURLNext,
CanExec: !cfg.APIOnly,
Logger: cfg.LegacyLogger,
MaxOpen: cfg.DBMaxOpen,
MaxIdle: cfg.DBMaxIdle,
})
if err != nil {
return errors.Wrap(err, "init switchover handler")
}
Expand All @@ -162,13 +174,15 @@ Available Flags:
return errors.Wrap(err, "connect to postgres")
}

cfg, err := pgxpool.ParseConfig(appURL)
poolCfg, err := pgxpool.ParseConfig(appURL)
if err != nil {
return errors.Wrap(err, "parse db URL")
}
sqldrv.SetConfigRetries(cfg)
poolCfg.MaxConns = int32(cfg.DBMaxOpen)
poolCfg.MinConns = int32(cfg.DBMaxIdle)
sqldrv.SetConfigRetries(poolCfg)

pool, err = pgxpool.NewWithConfig(context.Background(), cfg)
pool, err = pgxpool.NewWithConfig(context.Background(), poolCfg)
if err != nil {
return errors.Wrap(err, "connect to postgres")
}
Expand Down
7 changes: 0 additions & 7 deletions app/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"context"
"time"
)

// LogBackgroundContext returns a context.Background with the application logger configured.
Expand All @@ -19,17 +18,11 @@ func (app *App) Resume(ctx context.Context) error {
}

func (app *App) _pause(ctx context.Context) error {
app.db.SetMaxIdleConns(0)
app.db.SetConnMaxLifetime(time.Second)
app.db.SetMaxOpenConns(3)
app.events.Stop()
return nil
}

func (app *App) _resume(ctx context.Context) error {
app.db.SetMaxOpenConns(app.cfg.DBMaxOpen)
app.db.SetMaxIdleConns(app.cfg.DBMaxIdle)
app.db.SetConnMaxLifetime(0)
app.events.Start()

return nil
Expand Down
32 changes: 32 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ func migrationIDs() []string {
return ids
}

// LatestID will return the ID of the latest migration.
func LatestID() string {
ids := migrationIDs()
if len(ids) == 0 {
return ""
}
return ids[len(ids)-1]
}

// Names will return all AssetNames without the timestamps and extensions
func Names() []string {
uniq := make(map[string]struct{})
Expand Down Expand Up @@ -74,6 +83,29 @@ func migrationID(name string) (int, string) {
return -1, ""
}

// VerifyIsLatest will verify the latest migration is the same as the latest available migration.
//
// This ensures the DB isn't newer than the currently running code.
func VerifyIsLatest(ctx context.Context, url string) error {
conn, err := getConn(ctx, url)
if err != nil {
return err
}
defer conn.Close(ctx)

var dbLatest string
err = conn.QueryRow(ctx, `select id from gorp_migrations order by id desc limit 1`).Scan(&dbLatest)
if err != nil {
return fmt.Errorf("read latest migration from DB: %w", err)
}

if dbLatest != LatestID() {
return errors.Errorf("latest migration in DB is '%s' but expected '%s'; local GoAlert version is likely older than the DB's latest migration (not allowed in SWO-mode)", dbLatest, LatestID())
}

return nil
}

// VerifyAll will verify all migrations have already been applied.
func VerifyAll(ctx context.Context, url string) error {
ids := migrationIDs()
Expand Down
5 changes: 4 additions & 1 deletion swo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Config struct {
OldDBURL, NewDBURL string
CanExec bool
Logger *log.Logger

MaxOpen int
MaxIdle int
}

// NewManager will create a new Manager with the given configuration.
Expand Down Expand Up @@ -97,7 +100,7 @@ func NewManager(cfg Config) (*Manager, error) {
return nil, fmt.Errorf("connect to new db: %w", err)
}

appPgx, err := NewAppPGXPool(appMainURL, appNextURL)
appPgx, err := NewAppPGXPool(appMainURL, appNextURL, cfg.MaxOpen, cfg.MaxIdle)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion swo/pgxpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ import (
//
// Until the switchover is complete, the old database will be protected with a
// shared advisory lock (4369).
func NewAppPGXPool(oldURL, nextURL string) (*pgxpool.Pool, error) {
func NewAppPGXPool(oldURL, nextURL string, maxOpen, maxIdle int) (*pgxpool.Pool, error) {
cfg, err := pgxpool.ParseConfig(oldURL)
if err != nil {
return nil, fmt.Errorf("parse old URL: %w", err)
}
sqldrv.SetConfigRetries(cfg)
cfg.MaxConns = int32(maxOpen)
cfg.MinConns = int32(maxIdle)

nextCfg, err := pgxpool.ParseConfig(nextURL)
if err != nil {
return nil, fmt.Errorf("parse next URL: %w", err)
}
sqldrv.SetConfigRetries(nextCfg)
nextCfg.MaxConns = int32(maxOpen)
nextCfg.MinConns = int32(maxIdle)

var mx sync.Mutex
var isDone bool
Expand Down
Loading