Skip to content

Commit

Permalink
Add river validate command to check for unapplied migrations
Browse files Browse the repository at this point in the history
This one's a feature request from a few months from [1] in which we add
a `river validate` command which fails with a non-zero exit code in case
there are unapplied migrations in the target database. As described in
the original discussion, this could be used in a project's CI to detect
and fail in the presence of a new migration that was part of a River
version update (giving people an easy way to check programmatically so
that upgrades aren't as dependent on a user successfully seeing a new
migration in the changelog).

[1] #68
  • Loading branch information
brandur committed Jan 22, 2024
1 parent dc90985 commit d1362ba
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 33 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,23 @@ jobs:
run: ./river migrate-up --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river validate
run: ./river validate --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river migrate-down
run: ./river migrate-down --database-url $DATABASE_URL --max-steps 100
working-directory: ./cmd/river

- name: river validate
run: |
set -e
./river validate --database-url $DATABASE_URL
exitcode="$?"
set +e
[[ "$?" != "0" ]] || echo "expected non-zero exit code" && exit 1
working-directory: ./cmd/river

golangci:
name: lint
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141) and [PR #152](https://github.com/riverqueue/river/pull/152).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117).
- Added `river validate` command which fails with a non-zero exit code unless all migrations are applied. [PR #170](https://github.com/riverqueue/river/pull/170).

### Changed

Expand Down
12 changes: 6 additions & 6 deletions cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/riverqueue/river/cmd/river

go 1.21.4

// replace github.com/riverqueue/river => ../..
replace github.com/riverqueue/river => ../..

// replace github.com/riverqueue/river/riverdriver => ../../riverdriver

Expand All @@ -11,9 +11,9 @@ go 1.21.4
// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5

require (
github.com/jackc/pgx/v5 v5.5.1
github.com/jackc/pgx/v5 v5.5.2
github.com/riverqueue/river v0.0.13
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.13
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.15
github.com/spf13/cobra v1.8.0
)

Expand All @@ -22,9 +22,9 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/riverqueue/river/riverdriver v0.0.13 // indirect
github.com/riverqueue/river/riverdriver v0.0.15 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
26 changes: 12 additions & 14 deletions cmd/river/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/pgx/v5 v5.5.2 h1:iLlpgp4Cp/gC9Xuscl7lFL1PhhW+ZLtXZcrfCt4C3tA=
github.com/jackc/pgx/v5 v5.5.2/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.0.13 h1:tVGAtSIKG3JUjRyjW0U+pTKTCM7q+p2rBar2yhDpjpo=
github.com/riverqueue/river v0.0.13/go.mod h1:eIaJjDMFAGdRvTTBZDfIBartuQbw+U6NSMaFXHueDmY=
github.com/riverqueue/river/riverdriver v0.0.13 h1:76fJqnzcTpxIn1ULxeoxlfbIlgpwskHwQM0afDpXJac=
github.com/riverqueue/river/riverdriver v0.0.13/go.mod h1:vtgL7tRTSB6rzeVEDppehd/rPx3Is+WBYb17Zj0+KsE=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.13 h1:ICJJ3trVvor/CUXBjLqewZTqXd5sjJNcbTU6ogSz3+U=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.13/go.mod h1:MR6o1oGJajhfuFfwjdvIkiC5XbmXgdv3T7tNaRU6zms=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.13 h1:NIG4MQXP8U9z4m3876jGwua3R7KTy1rpGfSJtn91N28=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.13/go.mod h1:bq/oJmLvE1VHvdwfZZlWPcUizAihP6KLdqX8Pjyg+oM=
github.com/riverqueue/river/riverdriver v0.0.15 h1:BB26eGIB+xK4dpQ9w5WUxWHbDZbk0E+tmajGRYvI/hM=
github.com/riverqueue/river/riverdriver v0.0.15/go.mod h1:vtgL7tRTSB6rzeVEDppehd/rPx3Is+WBYb17Zj0+KsE=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.15 h1:vBS22g1I3gaSRnYnk9yrvn+oTk0odVTmJw1pIDAFD5w=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.15/go.mod h1:ERxJyW2g+1oBzTn5MRfSWi6Z83I5Dumj9J+E4rCe2kc=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.15 h1:H5z4MfUWVDGvK12DTAfArBS2OCl7PjJxBHAEd3K+PJo=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.15/go.mod h1:w365SHh6QB96Yea/SsGBdUAhGGvlWhU9+v2AVwJSjBc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand All @@ -38,12 +36,12 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA=
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
89 changes: 76 additions & 13 deletions cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ Provides command line facilities for the River job queue.

ctx := context.Background()

execHandlingError := func(f func() error) {
err := f()
execHandlingError := func(f func() (bool, error)) {
ok, err := f()
if err != nil {
fmt.Printf("failed: %s\n", err)
}
if err != nil || !ok {
os.Exit(1)
}
}
Expand All @@ -59,7 +61,7 @@ Defaults to running a single down migration. This behavior can be changed with
--max-steps or --target-version.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() error { return migrateDown(ctx, &opts) })
execHandlingError(func() (bool, error) { return migrateDown(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
Expand All @@ -83,7 +85,7 @@ Defaults to running all up migrations that aren't yet run. This behavior can be
restricted with --max-steps or --target-version.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() error { return migrateUp(ctx, &opts) })
execHandlingError(func() (bool, error) { return migrateUp(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
Expand All @@ -93,7 +95,27 @@ restricted with --max-steps or --target-version.
rootCmd.AddCommand(cmd)
}

execHandlingError(rootCmd.Execute)
// validate
{
var opts validateOpts

cmd := &cobra.Command{
Use: "validate",
Short: "Validate River schema",
Long: `
Validates the current River schema, exiting with a non-zero status in case there
are outstanding migrations that still need to be run.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() (bool, error) { return validate(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to validate (should look like `postgres://...`")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

execHandlingError(func() (bool, error) { return true, rootCmd.Execute() })
}

func openDBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) {
Expand Down Expand Up @@ -143,14 +165,14 @@ func (o *migrateDownOpts) validate() error {
return nil
}

func migrateDown(ctx context.Context, opts *migrateDownOpts) error {
func migrateDown(ctx context.Context, opts *migrateDownOpts) (bool, error) {
if err := opts.validate(); err != nil {
return err
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return err
return false, err
}
defer dbPool.Close()

Expand All @@ -160,7 +182,11 @@ func migrateDown(ctx context.Context, opts *migrateDownOpts) error {
MaxSteps: opts.MaxSteps,
TargetVersion: opts.TargetVersion,
})
return err
if err != nil {
return false, err
}

return true, nil
}

type migrateUpOpts struct {
Expand All @@ -177,14 +203,14 @@ func (o *migrateUpOpts) validate() error {
return nil
}

func migrateUp(ctx context.Context, opts *migrateUpOpts) error {
func migrateUp(ctx context.Context, opts *migrateUpOpts) (bool, error) {
if err := opts.validate(); err != nil {
return err
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return err
return false, err
}
defer dbPool.Close()

Expand All @@ -194,5 +220,42 @@ func migrateUp(ctx context.Context, opts *migrateUpOpts) error {
MaxSteps: opts.MaxSteps,
TargetVersion: opts.TargetVersion,
})
return err
if err != nil {
return false, err
}

return true, nil
}

type validateOpts struct {
DatabaseURL string
}

func (o *validateOpts) validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
}

return nil
}

func validate(ctx context.Context, opts *validateOpts) (bool, error) {
if err := opts.validate(); err != nil {
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return false, err
}
defer dbPool.Close()

migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

res, err := migrator.Validate(ctx)
if err != nil {
return false, err
}

return res.OK, nil
}
55 changes: 55 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,34 @@ func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Directi
panic("invalid direction: " + direction)
}

// ValidateResult is the result of a validation operation.
type ValidateResult struct {
// Messages contain informational messages of what wasn't valid in case of a
// failed validation. Always empty if OK is true.
Messages []string

// OK is true if validation completed with no problems.
OK bool
}

// Validate validates the current state of migrations, returning an unsuccessful
// validation and usable message in case there are migrations that haven't yet
// been applied.
func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) {
return dbutil.WithExecutorTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (*ValidateResult, error) {
return m.validate(ctx, tx)
})
}

// Validate validates the current state of migrations, returning an unsuccessful
// validation and usable message in case there are migrations that haven't yet
// been applied.
//
// This variant lets a caller validate within a transaction.
func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx) (*ValidateResult, error) {
return m.validate(ctx, m.driver.UnwrapExecutor(tx))
}

// migrateDown runs down migrations.
func (m *Migrator[TTx]) migrateDown(ctx context.Context, exec riverdriver.Executor, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
existingMigrations, err := m.existingMigrations(ctx, exec)
Expand Down Expand Up @@ -285,6 +313,33 @@ func (m *Migrator[TTx]) migrateUp(ctx context.Context, exec riverdriver.Executor
return res, nil
}

// validate validates current migration state.
func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) (*ValidateResult, error) {
existingMigrations, err := m.existingMigrations(ctx, exec)
if err != nil {
return nil, err
}

targetMigrations := maps.Clone(m.migrations)
for _, migrateRow := range existingMigrations {
delete(targetMigrations, migrateRow.Version)
}

notOKWithMessage := func(message string) *ValidateResult {
m.Logger.InfoContext(ctx, m.Name+": "+message)
return &ValidateResult{Messages: []string{message}}
}

if len(targetMigrations) > 0 {
sortedTargetMigrations := maputil.Keys(targetMigrations)
slices.Sort(sortedTargetMigrations)

return notOKWithMessage(fmt.Sprintf("Unapplied migrations: %v", sortedTargetMigrations)), nil
}

return &ValidateResult{OK: true}, nil
}

// Common code shared between the up and down migration directions that walks
// through each target migration and applies it, logging appropriately.
func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Executor, direction Direction, opts *MigrateOpts, sortedTargetMigrations []*migrationBundle) (*MigrateResult, error) {
Expand Down
27 changes: 27 additions & 0 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rivermigrate
import (
"context"
"database/sql"
"fmt"
"log/slog"
"slices"
"testing"
Expand Down Expand Up @@ -400,6 +401,32 @@ func TestMigrator(t *testing.T) {
require.EqualError(t, err, "version 3 is not in target list of valid migrations to apply")
}
})

t.Run("ValidateSuccess", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

// Migrate all the way up.
_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

res, err := migrator.ValidateTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, &ValidateResult{OK: true}, res)
})

t.Run("ValidateUnappliedMigrations", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

res, err := migrator.ValidateTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, &ValidateResult{
Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", riverMigrationsMaxVersion+1, riverMigrationsMaxVersion+2)},
}, res)
})
}

// A command returning an error aborts the transaction. This is a shortcut to
Expand Down

0 comments on commit d1362ba

Please sign in to comment.