From 491a6a67254e40167b6937f6844123de704d5182 Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Mon, 9 Sep 2024 00:22:33 -0700 Subject: [PATCH] fix: apply oplog migrations correctly using new storage interface --- cmd/backrest/backrest.go | 5 ++- internal/oplog/bboltstore/bboltstore.go | 9 ++--- .../serializationutil/serializationutil.go | 4 +-- internal/oplog/memstore/memstore.go | 4 --- internal/oplog/migrations.go | 2 +- internal/oplog/oplog.go | 34 ++++++++++++++----- 6 files changed, 35 insertions(+), 23 deletions(-) diff --git a/cmd/backrest/backrest.go b/cmd/backrest/backrest.go index 14f681ae..b187d0ba 100644 --- a/cmd/backrest/backrest.go +++ b/cmd/backrest/backrest.go @@ -79,7 +79,10 @@ func main() { } defer opstore.Close() - oplog := oplog.NewOpLog(opstore) + oplog, err := oplog.NewOpLog(opstore) + if err != nil { + zap.S().Fatalf("error creating oplog: %v", err) + } // Create rotating log storage logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs diff --git a/internal/oplog/bboltstore/bboltstore.go b/internal/oplog/bboltstore/bboltstore.go index a9fe6384..1b6c0e19 100644 --- a/internal/oplog/bboltstore/bboltstore.go +++ b/internal/oplog/bboltstore/bboltstore.go @@ -83,7 +83,7 @@ func (o *BboltStore) Close() error { func (o *BboltStore) Version() (int64, error) { var version int64 - err := o.db.View(func(tx *bolt.Tx) error { + o.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(SystemBucket) if b == nil { return nil @@ -92,7 +92,7 @@ func (o *BboltStore) Version() (int64, error) { version, err = serializationutil.Btoi(b.Get([]byte("version"))) return err }) - return version, err + return version, nil } func (o *BboltStore) SetVersion(version int64) error { @@ -107,11 +107,6 @@ func (o *BboltStore) SetVersion(version int64) error { // Add adds a generic operation to the operation log. func (o *BboltStore) Add(ops ...*v1.Operation) error { - for _, op := range ops { - if op.Id != 0 { - return errors.New("operation already has an ID, OpLog.Add is expected to set the ID") - } - } return o.db.Update(func(tx *bolt.Tx) error { for _, op := range ops { diff --git a/internal/oplog/bboltstore/serializationutil/serializationutil.go b/internal/oplog/bboltstore/serializationutil/serializationutil.go index d1961895..397282bb 100644 --- a/internal/oplog/bboltstore/serializationutil/serializationutil.go +++ b/internal/oplog/bboltstore/serializationutil/serializationutil.go @@ -21,7 +21,7 @@ func Btoi(b []byte) (int64, error) { } func Stob(v string) []byte { - var b []byte + b := make([]byte, 0, len(v)+8) b = append(b, Itob(int64(len(v)))...) b = append(b, []byte(v)...) return b @@ -39,7 +39,7 @@ func Btos(b []byte) (string, int64, error) { } func BytesToKey(b []byte) []byte { - var key []byte + key := make([]byte, 0, 8+len(b)) key = append(key, Itob(int64(len(b)))...) key = append(key, b...) return key diff --git a/internal/oplog/memstore/memstore.go b/internal/oplog/memstore/memstore.go index 668bf061..a26affc0 100644 --- a/internal/oplog/memstore/memstore.go +++ b/internal/oplog/memstore/memstore.go @@ -1,7 +1,6 @@ package memstore import ( - "errors" "slices" "sync" @@ -113,9 +112,6 @@ func (m *MemStore) Add(op ...*v1.Operation) error { defer m.mu.Unlock() for _, o := range op { - if o.Id != 0 { - return errors.New("operation already has an ID, OpLog.Add is expected to set the ID") - } m.nextID++ o.Id = m.nextID if o.FlowId == 0 { diff --git a/internal/oplog/migrations.go b/internal/oplog/migrations.go index b1e5e7ae..4a299e1c 100644 --- a/internal/oplog/migrations.go +++ b/internal/oplog/migrations.go @@ -11,7 +11,7 @@ import ( var migrations = []func(*OpLog) error{ migration001FlowID, migration002InstanceID, - migrationNoop, // migration003Reset Validated, + migrationNoop, migration002InstanceID, // re-run migration002InstanceID to fix improperly set instance IDs } diff --git a/internal/oplog/oplog.go b/internal/oplog/oplog.go index 8c60d3c0..bbfec897 100644 --- a/internal/oplog/oplog.go +++ b/internal/oplog/oplog.go @@ -30,10 +30,16 @@ type OpLog struct { subscribers []*Subscription } -func NewOpLog(store OpStore) *OpLog { - return &OpLog{ +func NewOpLog(store OpStore) (*OpLog, error) { + o := &OpLog{ store: store, } + + if err := ApplyMigrations(o); err != nil { + return nil, err + } + + return o, nil } func (o *OpLog) curSubscribers() []*Subscription { @@ -64,24 +70,36 @@ func (o *OpLog) Get(opID int64) (*v1.Operation, error) { return o.store.Get(opID) } -func (o *OpLog) Add(op ...*v1.Operation) error { - if err := o.store.Add(op...); err != nil { +func (o *OpLog) Add(ops ...*v1.Operation) error { + for _, o := range ops { + if o.Id != 0 { + return errors.New("operation already has an ID, OpLog.Add is expected to set the ID") + } + } + + if err := o.store.Add(ops...); err != nil { return err } for _, sub := range o.curSubscribers() { - (*sub)(op, OPERATION_ADDED) + (*sub)(ops, OPERATION_ADDED) } return nil } -func (o *OpLog) Update(op ...*v1.Operation) error { - if err := o.store.Update(op...); err != nil { +func (o *OpLog) Update(ops ...*v1.Operation) error { + for _, o := range ops { + if o.Id == 0 { + return errors.New("operation does not have an ID, OpLog.Update is expected to have an ID") + } + } + + if err := o.store.Update(ops...); err != nil { return err } for _, sub := range o.curSubscribers() { - (*sub)(op, OPERATION_UPDATED) + (*sub)(ops, OPERATION_UPDATED) } return nil }