Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate oplog history from bbolt to sqlite store #515

Merged
merged 2 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 70 additions & 11 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync/atomic"
"syscall"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
"github.com/garethgeorge/backrest/internal/api"
"github.com/garethgeorge/backrest/internal/auth"
Expand All @@ -24,11 +25,11 @@ import (
"github.com/garethgeorge/backrest/internal/metric"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/bboltstore"
"github.com/garethgeorge/backrest/internal/oplog/sqlitestore"
"github.com/garethgeorge/backrest/internal/orchestrator"
"github.com/garethgeorge/backrest/internal/resticinstaller"
"github.com/garethgeorge/backrest/webui"
"github.com/mattn/go-colorable"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -66,21 +67,19 @@ func main() {
var wg sync.WaitGroup

// Create / load the operation log
oplogFile := path.Join(env.DataDir(), "oplog.boltdb")
opstore, err := bboltstore.NewBboltStore(oplogFile)
oplogFile := path.Join(env.DataDir(), "oplog.sqlite")
opstore, err := sqlitestore.NewSqliteStore(oplogFile)
if err != nil {
if !errors.Is(err, bbolt.ErrTimeout) {
zap.S().Fatalf("timeout while waiting to open database, is the database open elsewhere?")
}
zap.S().Warnf("operation log may be corrupted, if errors recur delete the file %q and restart. Your backups stored in your repos are safe.", oplogFile)
zap.S().Fatalf("error creating oplog : %v", err)
zap.S().Fatalf("error creating oplog: %v", err)
}
defer opstore.Close()

oplog, err := oplog.NewOpLog(opstore)
log, err := oplog.NewOpLog(opstore)
if err != nil {
zap.S().Fatalf("error creating oplog: %v", err)
}
migrateBboltOplog(opstore)

// Create rotating log storage
logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs
Expand All @@ -89,7 +88,7 @@ func main() {
}

// Create orchestrator and start task loop.
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, oplog, logStore)
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore)
if err != nil {
zap.S().Fatalf("error creating orchestrator: %v", err)
}
Expand All @@ -104,7 +103,7 @@ func main() {
apiBackrestHandler := api.NewBackrestHandler(
configStore,
orchestrator,
oplog,
log,
logStore,
)

Expand All @@ -116,7 +115,7 @@ func main() {
backrestHandlerPath, backrestHandler := v1connect.NewBackrestHandler(apiBackrestHandler)
mux.Handle(backrestHandlerPath, auth.RequireAuthentication(backrestHandler, authenticator))
mux.Handle("/", webui.Handler())
mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(oplog)))
mux.Handle("/download/", http.StripPrefix("/download", api.NewDownloadHandler(log)))
mux.Handle("/metrics", auth.RequireAuthentication(metric.GetRegistry().Handler(), authenticator))

// Serve the HTTP gateway
Expand Down Expand Up @@ -225,3 +224,63 @@ func installLoggers() {
zap.ReplaceGlobals(zap.New(zapcore.NewTee(pretty, ugly)))
zap.S().Infof("writing logs to: %v", logsDir)
}

func migrateBboltOplog(logstore oplog.OpStore) {
oldBboltOplogFile := path.Join(env.DataDir(), "oplog.boltdb")
if _, err := os.Stat(oldBboltOplogFile); err == nil {
zap.S().Warnf("found old bbolt oplog file %q, migrating to sqlite", oldBboltOplogFile)
oldOpstore, err := bboltstore.NewBboltStore(oldBboltOplogFile)
if err != nil {
zap.S().Fatalf("error opening old bbolt oplog: %v", err)
}

oldOplog, err := oplog.NewOpLog(oldOpstore)
if err != nil {
zap.S().Fatalf("error creating old bbolt oplog: %v", err)
}

batch := make([]*v1.Operation, 0, 32)

var errs []error

if err := oldOplog.Query(oplog.Query{}, func(op *v1.Operation) error {
batch = append(batch, op)
if len(batch) == 256 {
if err := logstore.Add(batch...); err != nil {
errs = append(errs, err)
zap.S().Warnf("error migrating %d operations: %v", len(batch), err)
} else {
zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch))
}
batch = batch[:0]
}
return nil
}); err != nil {
zap.S().Warnf("couldn't migrate all operations from the old bbolt oplog, if this recurs delete the file %q and restart", oldBboltOplogFile)
zap.S().Fatalf("error migrating old bbolt oplog: %v", err)
}

if len(batch) > 0 {
if err := logstore.Add(batch...); err != nil {
errs = append(errs, err)
zap.S().Warnf("error migrating %d operations: %v", len(batch), err)
} else {
zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch))
}
zap.S().Debugf("migrated %d oplog operations from bbolt to sqlite store", len(batch))
}

if len(errs) > 0 {
zap.S().Fatalf("encountered %d errors migrating old bbolt oplog, see logs for details. If this probelem recurs delete the file %q and restart", len(errs), oldBboltOplogFile)
}

if err := oldOpstore.Close(); err != nil {
zap.S().Warnf("error closing old bbolt oplog: %v", err)
}
if err := os.Remove(oldBboltOplogFile); err != nil {
zap.S().Warnf("error removing old bbolt oplog: %v", err)
}

zap.S().Info("migrated old bbolt oplog to sqlite")
}
}
32 changes: 17 additions & 15 deletions internal/oplog/sqlitestore/sqlitestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package sqlitestore

import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"strings"
"sync/atomic"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/oplog"
Expand All @@ -26,7 +29,7 @@ var _ oplog.OpStore = (*SqliteStore)(nil)
func NewSqliteStore(db string) (*SqliteStore, error) {
dbpool, err := sqlitex.NewPool(db, sqlitex.PoolOptions{
PoolSize: 16,
Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL | sqlite.OpenSharedCache,
Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL,
})
if err != nil {
return nil, fmt.Errorf("open sqlite pool: %v", err)
Expand Down Expand Up @@ -72,22 +75,18 @@ SELECT 0 WHERE NOT EXISTS (SELECT 1 FROM system_info);
return fmt.Errorf("init sqlite: %v", err)
}

// find the next id value
if err := sqlitex.ExecuteTransient(conn, "SELECT MAX(id) FROM operations", &sqlitex.ExecOptions{
ResultFunc: func(stmt *sqlite.Stmt) error {
m.nextIDVal.Store(stmt.GetInt64("MAX(id)") + 1)
return nil
},
}); err != nil {
return fmt.Errorf("get max ID: %v", err)
}
if m.nextIDVal.Load() == 0 {
m.nextIDVal.Store(1)
}
// rand init value
n, _ := rand.Int(rand.Reader, big.NewInt(1<<20))
m.nextIDVal.Store(n.Int64())

return nil
}

func (o *SqliteStore) nextID(unixTimeMs int64) (int64, error) {
seq := o.nextIDVal.Add(1)
return int64(unixTimeMs<<20) | int64(seq&((1<<20)-1)), nil
}

func (m *SqliteStore) Version() (int64, error) {
conn, err := m.dbpool.Take(context.Background())
if err != nil {
Expand Down Expand Up @@ -262,7 +261,10 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error {

return withSqliteTransaction(conn, func() error {
for _, o := range op {
o.Id = m.nextIDVal.Add(1)
o.Id, err = m.nextID(time.Now().UnixMilli())
if err != nil {
return fmt.Errorf("generate operation id: %v", err)
}
if o.FlowId == 0 {
o.FlowId = o.Id
}
Expand All @@ -281,7 +283,7 @@ func (m *SqliteStore) Add(op ...*v1.Operation) error {
Args: []any{o.Id, o.FlowId, o.InstanceId, o.PlanId, o.RepoId, o.SnapshotId, bytes},
}); err != nil {
if sqlite.ErrCode(err) == sqlite.ResultConstraintUnique {
return fmt.Errorf("operation already exists: %w", oplog.ErrExist)
return fmt.Errorf("operation already exists %v: %w", o.Id, oplog.ErrExist)
}
return fmt.Errorf("add operation: %v", err)
}
Expand Down
Loading