Skip to content

Commit

Permalink
ensure that sqlite store applies migrations successfully
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Oct 13, 2024
1 parent d0fbf41 commit ccc8aca
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 33 deletions.
6 changes: 5 additions & 1 deletion internal/config/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"fmt"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

var migrations = []*func(*v1.Config){
&noop, // migration001PrunePolicy
&noop, // migration001PrunePolicy is deprecated
&noop, // migration002Schedules is deprecated
&migration003RelativeScheduling,
}
Expand All @@ -27,6 +28,9 @@ func ApplyMigrations(config *v1.Config) error {
startMigration := int(config.Version)
if startMigration < 0 {
startMigration = 0
} else if startMigration > int(CurrentVersion) {
zap.S().Warnf("config version %d is greater than the latest known spec %d. Were you previously running a newer version of backrest? Ensure that your install is up to date.", startMigration, CurrentVersion)
return fmt.Errorf("config version %d is greater than the latest known config format %d", startMigration, CurrentVersion)
}

for idx := startMigration; idx < len(migrations); idx += 1 {
Expand Down
3 changes: 3 additions & 0 deletions internal/oplog/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func ApplyMigrations(oplog *OpLog) error {
}
if startMigration < 0 {
startMigration = 0
} else if startMigration > CurrentVersion {
zap.S().Warnf("oplog spec %d is greater than the latest known spec %d. Were you previously running a newer version of backrest? Ensure that your install is up to date.", startMigration, CurrentVersion)
return fmt.Errorf("oplog spec %d is greater than the latest known spec %d", startMigration, CurrentVersion)
}

for idx := startMigration; idx < int64(len(migrations)); idx += 1 {
Expand Down
4 changes: 4 additions & 0 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (q *Query) Match(op *v1.Operation) bool {
}
}

if q.InstanceID != "" && op.InstanceId != q.InstanceID {
return false
}

if q.PlanID != "" && op.PlanId != q.PlanID {
return false
}
Expand Down
60 changes: 28 additions & 32 deletions internal/oplog/sqlitestore/sqlitestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (m *SqliteStore) Query(q oplog.Query, f func(*v1.Operation) error) error {
Args: args,
ResultFunc: func(stmt *sqlite.Stmt) error {
opBytes := make([]byte, stmt.ColumnLen(0))
n := stmt.GetBytes("operation", opBytes)
n := stmt.ColumnBytes(0, opBytes)
opBytes = opBytes[:n]

var op v1.Operation
Expand All @@ -236,14 +236,14 @@ func (m *SqliteStore) Transform(q oplog.Query, f func(*v1.Operation) (*v1.Operat
}
defer m.dbpool.Put(conn)

query, args := m.buildQuery(q, false)
query, args := m.buildQuery(q, true)

return withSqliteTransaction(conn, func() error {
return sqlitex.ExecuteTransient(conn, query, &sqlitex.ExecOptions{
Args: args,
ResultFunc: func(stmt *sqlite.Stmt) error {
opBytes := make([]byte, stmt.ColumnLen(0))
n := stmt.GetBytes("operation", opBytes)
n := stmt.ColumnBytes(0, opBytes)
opBytes = opBytes[:n]

var op v1.Operation
Expand All @@ -254,19 +254,11 @@ func (m *SqliteStore) Transform(q oplog.Query, f func(*v1.Operation) (*v1.Operat
newOp, err := f(&op)
if err != nil {
return err
} else if newOp == nil {
return nil
}

newOpBytes, err := proto.Marshal(newOp)
if err != nil {
return fmt.Errorf("marshal operation: %v", err)
}

if err := sqlitex.Execute(conn, "UPDATE operations SET operation = ? WHERE id = ?", &sqlitex.ExecOptions{
Args: []any{newOpBytes, stmt.GetInt64("id")},
}); err != nil {
return fmt.Errorf("update operation: %v", err)
}
return nil
return m.updateInternal(conn, newOp)
},
})
})
Expand Down Expand Up @@ -317,27 +309,31 @@ func (m *SqliteStore) Update(op ...*v1.Operation) error {
defer m.dbpool.Put(conn)

return withSqliteTransaction(conn, func() error {
for _, o := range op {
if err := protoutil.ValidateOperation(o); err != nil {
return err
}
bytes, err := proto.Marshal(o)
if err != nil {
return fmt.Errorf("marshal operation: %v", err)
}
if err := sqlitex.Execute(conn, "UPDATE operations SET operation = ?, flow_id = ?, instance_id = ?, plan_id = ?, repo_id = ?, snapshot_id = ? WHERE id = ?", &sqlitex.ExecOptions{
Args: []any{bytes, o.FlowId, o.InstanceId, o.PlanId, o.RepoId, o.SnapshotId, o.Id},
}); err != nil {
return fmt.Errorf("update operation: %v", err)
}
if conn.Changes() == 0 {
return fmt.Errorf("couldn't update %d: %w", o.Id, oplog.ErrNotExist)
}
}
return nil
return m.updateInternal(conn, op...)
})
}

func (m *SqliteStore) updateInternal(conn *sqlite.Conn, op ...*v1.Operation) error {
for _, o := range op {
if err := protoutil.ValidateOperation(o); err != nil {
return err
}
bytes, err := proto.Marshal(o)
if err != nil {
return fmt.Errorf("marshal operation: %v", err)
}
if err := sqlitex.Execute(conn, "UPDATE operations SET operation = ?, flow_id = ?, instance_id = ?, plan_id = ?, repo_id = ?, snapshot_id = ? WHERE id = ?", &sqlitex.ExecOptions{
Args: []any{bytes, o.FlowId, o.InstanceId, o.PlanId, o.RepoId, o.SnapshotId, o.Id},
}); err != nil {
return fmt.Errorf("update operation: %v", err)
}
if conn.Changes() == 0 {
return fmt.Errorf("couldn't update %d: %w", o.Id, oplog.ErrNotExist)
}
}
return nil
}

func (m *SqliteStore) Get(opID int64) (*v1.Operation, error) {
conn, err := m.dbpool.Take(context.Background())
if err != nil {
Expand Down
131 changes: 131 additions & 0 deletions internal/oplog/storetests/storecontract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,137 @@ func TestUpdateOperation(t *testing.T) {
}
}

func TestTransform(t *testing.T) {
ops := []*v1.Operation{
{
InstanceId: "foo",
PlanId: "plan1",
RepoId: "repo1",
UnixTimeStartMs: 1234,
UnixTimeEndMs: 5678,
},
{
InstanceId: "bar",
PlanId: "plan1",
RepoId: "repo1",
UnixTimeStartMs: 1234,
UnixTimeEndMs: 5678,
},
}

tcs := []struct {
name string
f func(*v1.Operation) (*v1.Operation, error)
ops []*v1.Operation
want []*v1.Operation
query oplog.Query
}{
{
name: "no change",
f: func(op *v1.Operation) (*v1.Operation, error) {
return nil, nil
},
ops: ops,
want: ops,
},
{
name: "no change by copy",
f: func(op *v1.Operation) (*v1.Operation, error) {
return proto.Clone(op).(*v1.Operation), nil
},
ops: ops,
want: ops,
},
{
name: "change plan",
f: func(op *v1.Operation) (*v1.Operation, error) {
op.PlanId = "newplan"
return op, nil
},
ops: []*v1.Operation{
{
InstanceId: "foo",
PlanId: "oldplan",
RepoId: "repo1",
UnixTimeStartMs: 1234,
UnixTimeEndMs: 5678,
},
},
want: []*v1.Operation{
{
InstanceId: "foo",
PlanId: "newplan",
RepoId: "repo1",
UnixTimeStartMs: 1234,
UnixTimeEndMs: 5678,
},
},
},
{
name: "change plan with query",
f: func(op *v1.Operation) (*v1.Operation, error) {
op.PlanId = "newplan"
return op, nil
},
ops: ops,
want: []*v1.Operation{
{
InstanceId: "foo",
PlanId: "newplan",
RepoId: "repo1",
UnixTimeStartMs: 1234,
UnixTimeEndMs: 5678,
},
ops[1],
},
query: oplog.Query{InstanceID: "foo"},
},
}
for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
for name, store := range StoresForTest(t) {
store := store
t.Run(name, func(t *testing.T) {
log, err := oplog.NewOpLog(store)
if err != nil {
t.Fatalf("error creating oplog: %v", err)
}
for _, op := range tc.ops {
copy := proto.Clone(op).(*v1.Operation)
if err := log.Add(copy); err != nil {
t.Fatalf("error adding operation: %s", err)
}
}

if err := log.Transform(tc.query, tc.f); err != nil {
t.Fatalf("error transforming operations: %s", err)
}

var got []*v1.Operation
if err := log.Query(oplog.Query{}, func(op *v1.Operation) error {
op.Id = 0
op.FlowId = 0
got = append(got, op)
return nil
}); err != nil {
t.Fatalf("error listing operations: %s", err)
}

if slices.CompareFunc(got, tc.want, func(a, b *v1.Operation) int {
if proto.Equal(a, b) {
return 0
}
return 1
}) != 0 {
t.Errorf("want operations: %v, got unexpected operations: %v", tc.want, got)
}
})
}
})
}
}

func collectMessages(ops []*v1.Operation) []string {
var messages []string
for _, op := range ops {
Expand Down

0 comments on commit ccc8aca

Please sign in to comment.