Skip to content

Commit

Permalink
fix: apply oplog migrations correctly using new storage interface
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Sep 9, 2024
1 parent 426af29 commit 491a6a6
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 23 deletions.
5 changes: 4 additions & 1 deletion cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func main() {
}
defer opstore.Close()

oplog := oplog.NewOpLog(opstore)
oplog, err := oplog.NewOpLog(opstore)
if err != nil {
zap.S().Fatalf("error creating oplog: %v", err)
}

// Create rotating log storage
logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs
Expand Down
9 changes: 2 additions & 7 deletions internal/oplog/bboltstore/bboltstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (o *BboltStore) Close() error {

func (o *BboltStore) Version() (int64, error) {
var version int64
err := o.db.View(func(tx *bolt.Tx) error {
o.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(SystemBucket)
if b == nil {
return nil
Expand All @@ -92,7 +92,7 @@ func (o *BboltStore) Version() (int64, error) {
version, err = serializationutil.Btoi(b.Get([]byte("version")))
return err
})
return version, err
return version, nil
}

func (o *BboltStore) SetVersion(version int64) error {
Expand All @@ -107,11 +107,6 @@ func (o *BboltStore) SetVersion(version int64) error {

// Add adds a generic operation to the operation log.
func (o *BboltStore) Add(ops ...*v1.Operation) error {
for _, op := range ops {
if op.Id != 0 {
return errors.New("operation already has an ID, OpLog.Add is expected to set the ID")
}
}

return o.db.Update(func(tx *bolt.Tx) error {
for _, op := range ops {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func Btoi(b []byte) (int64, error) {
}

func Stob(v string) []byte {
var b []byte
b := make([]byte, 0, len(v)+8)
b = append(b, Itob(int64(len(v)))...)
b = append(b, []byte(v)...)
return b
Expand All @@ -39,7 +39,7 @@ func Btos(b []byte) (string, int64, error) {
}

func BytesToKey(b []byte) []byte {
var key []byte
key := make([]byte, 0, 8+len(b))
key = append(key, Itob(int64(len(b)))...)
key = append(key, b...)
return key
Expand Down
4 changes: 0 additions & 4 deletions internal/oplog/memstore/memstore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package memstore

import (
"errors"
"slices"
"sync"

Expand Down Expand Up @@ -113,9 +112,6 @@ func (m *MemStore) Add(op ...*v1.Operation) error {
defer m.mu.Unlock()

for _, o := range op {
if o.Id != 0 {
return errors.New("operation already has an ID, OpLog.Add is expected to set the ID")
}
m.nextID++
o.Id = m.nextID
if o.FlowId == 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/oplog/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
var migrations = []func(*OpLog) error{
migration001FlowID,
migration002InstanceID,
migrationNoop, // migration003Reset Validated,
migrationNoop,
migration002InstanceID, // re-run migration002InstanceID to fix improperly set instance IDs
}

Expand Down
34 changes: 26 additions & 8 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ type OpLog struct {
subscribers []*Subscription
}

func NewOpLog(store OpStore) *OpLog {
return &OpLog{
func NewOpLog(store OpStore) (*OpLog, error) {
o := &OpLog{
store: store,
}

if err := ApplyMigrations(o); err != nil {
return nil, err
}

return o, nil
}

func (o *OpLog) curSubscribers() []*Subscription {
Expand Down Expand Up @@ -64,24 +70,36 @@ func (o *OpLog) Get(opID int64) (*v1.Operation, error) {
return o.store.Get(opID)
}

func (o *OpLog) Add(op ...*v1.Operation) error {
if err := o.store.Add(op...); err != nil {
func (o *OpLog) Add(ops ...*v1.Operation) error {
for _, o := range ops {
if o.Id != 0 {
return errors.New("operation already has an ID, OpLog.Add is expected to set the ID")
}
}

if err := o.store.Add(ops...); err != nil {
return err
}

for _, sub := range o.curSubscribers() {
(*sub)(op, OPERATION_ADDED)
(*sub)(ops, OPERATION_ADDED)
}
return nil
}

func (o *OpLog) Update(op ...*v1.Operation) error {
if err := o.store.Update(op...); err != nil {
func (o *OpLog) Update(ops ...*v1.Operation) error {
for _, o := range ops {
if o.Id == 0 {
return errors.New("operation does not have an ID, OpLog.Update is expected to have an ID")
}
}

if err := o.store.Update(ops...); err != nil {
return err
}

for _, sub := range o.curSubscribers() {
(*sub)(op, OPERATION_UPDATED)
(*sub)(ops, OPERATION_UPDATED)
}
return nil
}
Expand Down

0 comments on commit 491a6a6

Please sign in to comment.