Skip to content

Commit

Permalink
Add river validate command to check for unapplied migrations
Browse files Browse the repository at this point in the history
This one's a feature request from a few months from [1] in which we add
a `river validate` command which fails with a non-zero exit code in case
there are unapplied migrations in the target database. As described in
the original discussion, this could be used in a project's CI to detect
and fail in the presence of a new migration that was part of a River
version update (giving people an easy way to check programmatically so
that upgrades aren't as dependent on a user successfully seeing a new
migration in the changelog).

[1] #68
  • Loading branch information
brandur committed Jan 22, 2024
1 parent dc90985 commit 187eb95
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 14 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,20 @@ jobs:
run: ./river migrate-up --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river validate
run: ./river validate --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river migrate-down
run: ./river migrate-down --database-url $DATABASE_URL --max-steps 100
working-directory: ./cmd/river

- name: river validate
run: |
./river validate --database-url $DATABASE_URL
[[ $? != 0 ]] || echo "expected non-zero exit code!" && exit 1
working-directory: ./cmd/river

golangci:
name: lint
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `JobCancel` and `JobCancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141) and [PR #152](https://github.com/riverqueue/river/pull/152).
- Added `ClientFromContext` and `ClientWithContextSafely` helpers to extract the `Client` from the worker's context where it is now available to workers. This simplifies making the River client available within your workers for i.e. enqueueing additional jobs. [PR #145](https://github.com/riverqueue/river/pull/145).
- Add `JobList` API for listing jobs. [PR #117](https://github.com/riverqueue/river/pull/117).
- Added `river validate` command which fails with a non-zero exit code unless all migrations are applied. [PR #170](https://github.com/riverqueue/river/pull/170).

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/riverqueue/river/cmd/river

go 1.21.4

// replace github.com/riverqueue/river => ../..
replace github.com/riverqueue/river => ../..

// replace github.com/riverqueue/river/riverdriver => ../../riverdriver

Expand Down
89 changes: 76 additions & 13 deletions cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ Provides command line facilities for the River job queue.

ctx := context.Background()

execHandlingError := func(f func() error) {
err := f()
execHandlingError := func(f func() (bool, error)) {
ok, err := f()
if err != nil {
fmt.Printf("failed: %s\n", err)
}
if err != nil || !ok {
os.Exit(1)
}
}
Expand All @@ -59,7 +61,7 @@ Defaults to running a single down migration. This behavior can be changed with
--max-steps or --target-version.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() error { return migrateDown(ctx, &opts) })
execHandlingError(func() (bool, error) { return migrateDown(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
Expand All @@ -83,7 +85,7 @@ Defaults to running all up migrations that aren't yet run. This behavior can be
restricted with --max-steps or --target-version.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() error { return migrateUp(ctx, &opts) })
execHandlingError(func() (bool, error) { return migrateUp(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
Expand All @@ -93,7 +95,27 @@ restricted with --max-steps or --target-version.
rootCmd.AddCommand(cmd)
}

execHandlingError(rootCmd.Execute)
// validate
{
var opts validateOpts

cmd := &cobra.Command{
Use: "validate",
Short: "Validate River schema",
Long: `
Validates the current River schema, exiting with a non-zero status in case there
are outstanding migrations that still need to be run.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() (bool, error) { return validate(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to validate (should look like `postgres://...`")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

execHandlingError(func() (bool, error) { return true, rootCmd.Execute() })
}

func openDBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) {
Expand Down Expand Up @@ -143,14 +165,14 @@ func (o *migrateDownOpts) validate() error {
return nil
}

func migrateDown(ctx context.Context, opts *migrateDownOpts) error {
func migrateDown(ctx context.Context, opts *migrateDownOpts) (bool, error) {
if err := opts.validate(); err != nil {
return err
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return err
return false, err
}
defer dbPool.Close()

Expand All @@ -160,7 +182,11 @@ func migrateDown(ctx context.Context, opts *migrateDownOpts) error {
MaxSteps: opts.MaxSteps,
TargetVersion: opts.TargetVersion,
})
return err
if err != nil {
return false, err
}

return true, nil
}

type migrateUpOpts struct {
Expand All @@ -177,14 +203,14 @@ func (o *migrateUpOpts) validate() error {
return nil
}

func migrateUp(ctx context.Context, opts *migrateUpOpts) error {
func migrateUp(ctx context.Context, opts *migrateUpOpts) (bool, error) {
if err := opts.validate(); err != nil {
return err
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return err
return false, err
}
defer dbPool.Close()

Expand All @@ -194,5 +220,42 @@ func migrateUp(ctx context.Context, opts *migrateUpOpts) error {
MaxSteps: opts.MaxSteps,
TargetVersion: opts.TargetVersion,
})
return err
if err != nil {
return false, err
}

return true, nil
}

type validateOpts struct {
DatabaseURL string
}

func (o *validateOpts) validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
}

return nil
}

func validate(ctx context.Context, opts *validateOpts) (bool, error) {
if err := opts.validate(); err != nil {
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return false, err
}
defer dbPool.Close()

migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

res, err := migrator.Validate(ctx)
if err != nil {
return false, err
}

return res.OK, nil
}
55 changes: 55 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,34 @@ func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Directi
panic("invalid direction: " + direction)
}

// ValidateResult is the result of a validation operation.
type ValidateResult struct {
// Messages contain informational messages of what wasn't valid in case of a
// failed validation. Always empty if OK is true.
Messages []string

// OK is true if validation completed with no problems.
OK bool
}

// Validate validates the current state of migrations, returning an unsuccessful
// validation and usable message in case there are migrations that haven't yet
// been applied.
func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error) {
return dbutil.WithExecutorTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (*ValidateResult, error) {
return m.validate(ctx, tx)
})
}

// Validate validates the current state of migrations, returning an unsuccessful
// validation and usable message in case there are migrations that haven't yet
// been applied.
//
// This variant lets a caller validate within a transaction.
func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx) (*ValidateResult, error) {
return m.validate(ctx, m.driver.UnwrapExecutor(tx))
}

// migrateDown runs down migrations.
func (m *Migrator[TTx]) migrateDown(ctx context.Context, exec riverdriver.Executor, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
existingMigrations, err := m.existingMigrations(ctx, exec)
Expand Down Expand Up @@ -285,6 +313,33 @@ func (m *Migrator[TTx]) migrateUp(ctx context.Context, exec riverdriver.Executor
return res, nil
}

// validate validates current migration state.
func (m *Migrator[TTx]) validate(ctx context.Context, exec riverdriver.Executor) (*ValidateResult, error) {
existingMigrations, err := m.existingMigrations(ctx, exec)
if err != nil {
return nil, err
}

targetMigrations := maps.Clone(m.migrations)
for _, migrateRow := range existingMigrations {
delete(targetMigrations, migrateRow.Version)
}

notOKWithMessage := func(message string) *ValidateResult {
m.Logger.InfoContext(ctx, m.Name+": "+message)
return &ValidateResult{Messages: []string{message}}
}

if len(targetMigrations) > 0 {
sortedTargetMigrations := maputil.Keys(targetMigrations)
slices.Sort(sortedTargetMigrations)

return notOKWithMessage(fmt.Sprintf("Unapplied migrations: %v", sortedTargetMigrations)), nil
}

return &ValidateResult{OK: true}, nil
}

// Common code shared between the up and down migration directions that walks
// through each target migration and applies it, logging appropriately.
func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Executor, direction Direction, opts *MigrateOpts, sortedTargetMigrations []*migrationBundle) (*MigrateResult, error) {
Expand Down
27 changes: 27 additions & 0 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rivermigrate
import (
"context"
"database/sql"
"fmt"
"log/slog"
"slices"
"testing"
Expand Down Expand Up @@ -400,6 +401,32 @@ func TestMigrator(t *testing.T) {
require.EqualError(t, err, "version 3 is not in target list of valid migrations to apply")
}
})

t.Run("ValidateSuccess", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

// Migrate all the way up.
_, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{})
require.NoError(t, err)

res, err := migrator.ValidateTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, &ValidateResult{OK: true}, res)
})

t.Run("ValidateUnappliedMigrations", func(t *testing.T) {
t.Parallel()

migrator, bundle := setup(t)

res, err := migrator.ValidateTx(ctx, bundle.tx)
require.NoError(t, err)
require.Equal(t, &ValidateResult{
Messages: []string{fmt.Sprintf("Unapplied migrations: [%d %d]", riverMigrationsMaxVersion+1, riverMigrationsMaxVersion+2)},
}, res)
})
}

// A command returning an error aborts the transaction. This is a shortcut to
Expand Down

0 comments on commit 187eb95

Please sign in to comment.