Skip to content

Commit

Permalink
pgxpool: Don't use idle connections with stdlib when doing pooling (#…
Browse files Browse the repository at this point in the history
…4137)

* manage idle connections in the pgx pool

* migrate: add verification for latest migration consistency
  • Loading branch information
mastercactapus authored Nov 11, 2024
1 parent a0c680e commit 2d74e28
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 16 deletions.
3 changes: 0 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ func NewApp(c Config, pool *pgxpool.Pool) (*App, error) {
}
}

app.db.SetMaxIdleConns(c.DBMaxIdle)
app.db.SetMaxOpenConns(c.DBMaxOpen)

app.mgr = lifecycle.NewManager(app._Run, app._Shutdown)
err = app.mgr.SetStartupFunc(app.startup)
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions app/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,24 @@ Available Flags:

var pool *pgxpool.Pool
if cfg.DBURLNext != "" {
err = migrate.VerifyIsLatest(ctx, cfg.DBURL)
if err != nil {
return errors.Wrap(err, "verify db")
}

err = doMigrations(cfg.DBURLNext)
if err != nil {
return errors.Wrap(err, "nextdb")
}

mgr, err := swo.NewManager(swo.Config{OldDBURL: cfg.DBURL, NewDBURL: cfg.DBURLNext, CanExec: !cfg.APIOnly, Logger: cfg.LegacyLogger})
mgr, err := swo.NewManager(swo.Config{
OldDBURL: cfg.DBURL,
NewDBURL: cfg.DBURLNext,
CanExec: !cfg.APIOnly,
Logger: cfg.LegacyLogger,
MaxOpen: cfg.DBMaxOpen,
MaxIdle: cfg.DBMaxIdle,
})
if err != nil {
return errors.Wrap(err, "init switchover handler")
}
Expand All @@ -162,13 +174,15 @@ Available Flags:
return errors.Wrap(err, "connect to postgres")
}

cfg, err := pgxpool.ParseConfig(appURL)
poolCfg, err := pgxpool.ParseConfig(appURL)
if err != nil {
return errors.Wrap(err, "parse db URL")
}
sqldrv.SetConfigRetries(cfg)
poolCfg.MaxConns = int32(cfg.DBMaxOpen)
poolCfg.MinConns = int32(cfg.DBMaxIdle)
sqldrv.SetConfigRetries(poolCfg)

pool, err = pgxpool.NewWithConfig(context.Background(), cfg)
pool, err = pgxpool.NewWithConfig(context.Background(), poolCfg)
if err != nil {
return errors.Wrap(err, "connect to postgres")
}
Expand Down
7 changes: 0 additions & 7 deletions app/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"context"
"time"
)

// LogBackgroundContext returns a context.Background with the application logger configured.
Expand All @@ -19,17 +18,11 @@ func (app *App) Resume(ctx context.Context) error {
}

func (app *App) _pause(ctx context.Context) error {
app.db.SetMaxIdleConns(0)
app.db.SetConnMaxLifetime(time.Second)
app.db.SetMaxOpenConns(3)
app.events.Stop()
return nil
}

func (app *App) _resume(ctx context.Context) error {
app.db.SetMaxOpenConns(app.cfg.DBMaxOpen)
app.db.SetMaxIdleConns(app.cfg.DBMaxIdle)
app.db.SetConnMaxLifetime(0)
app.events.Start()

return nil
Expand Down
32 changes: 32 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ func migrationIDs() []string {
return ids
}

// LatestID will return the ID of the latest migration.
func LatestID() string {
ids := migrationIDs()
if len(ids) == 0 {
return ""
}
return ids[len(ids)-1]
}

// Names will return all AssetNames without the timestamps and extensions
func Names() []string {
uniq := make(map[string]struct{})
Expand Down Expand Up @@ -74,6 +83,29 @@ func migrationID(name string) (int, string) {
return -1, ""
}

// VerifyIsLatest will verify the latest migration is the same as the latest available migration.
//
// This ensures the DB isn't newer than the currently running code.
func VerifyIsLatest(ctx context.Context, url string) error {
conn, err := getConn(ctx, url)
if err != nil {
return err
}
defer conn.Close(ctx)

var dbLatest string
err = conn.QueryRow(ctx, `select id from gorp_migrations order by id desc limit 1`).Scan(&dbLatest)
if err != nil {
return fmt.Errorf("read latest migration from DB: %w", err)
}

if dbLatest != LatestID() {
return errors.Errorf("latest migration in DB is '%s' but expected '%s'; local GoAlert version is likely older than the DB's latest migration (not allowed in SWO-mode)", dbLatest, LatestID())
}

return nil
}

// VerifyAll will verify all migrations have already been applied.
func VerifyAll(ctx context.Context, url string) error {
ids := migrationIDs()
Expand Down
5 changes: 4 additions & 1 deletion swo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Config struct {
OldDBURL, NewDBURL string
CanExec bool
Logger *log.Logger

MaxOpen int
MaxIdle int
}

// NewManager will create a new Manager with the given configuration.
Expand Down Expand Up @@ -97,7 +100,7 @@ func NewManager(cfg Config) (*Manager, error) {
return nil, fmt.Errorf("connect to new db: %w", err)
}

appPgx, err := NewAppPGXPool(appMainURL, appNextURL)
appPgx, err := NewAppPGXPool(appMainURL, appNextURL, cfg.MaxOpen, cfg.MaxIdle)
if err != nil {
return nil, fmt.Errorf("create pool: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion swo/pgxpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ import (
//
// Until the switchover is complete, the old database will be protected with a
// shared advisory lock (4369).
func NewAppPGXPool(oldURL, nextURL string) (*pgxpool.Pool, error) {
func NewAppPGXPool(oldURL, nextURL string, maxOpen, maxIdle int) (*pgxpool.Pool, error) {
cfg, err := pgxpool.ParseConfig(oldURL)
if err != nil {
return nil, fmt.Errorf("parse old URL: %w", err)
}
sqldrv.SetConfigRetries(cfg)
cfg.MaxConns = int32(maxOpen)
cfg.MinConns = int32(maxIdle)

nextCfg, err := pgxpool.ParseConfig(nextURL)
if err != nil {
return nil, fmt.Errorf("parse next URL: %w", err)
}
sqldrv.SetConfigRetries(nextCfg)
nextCfg.MaxConns = int32(maxOpen)
nextCfg.MinConns = int32(maxIdle)

var mx sync.Mutex
var isDone bool
Expand Down

0 comments on commit 2d74e28

Please sign in to comment.