From 4d557a1146b064ee41d74c80667adcd78ed4240c Mon Sep 17 00:00:00 2001 From: Gareth Date: Sat, 12 Oct 2024 11:26:22 -0700 Subject: [PATCH] feat: use sqlite logstore (#514) --- cmd/backrest/backrest.go | 7 +- go.mod | 1 + internal/api/backresthandler.go | 80 +-- internal/api/backresthandler_test.go | 9 +- internal/logstore/logstore.go | 487 ++++++++++++++++++ internal/logstore/logstore_test.go | 297 +++++++++++ internal/logstore/shardedmutex.go | 43 ++ internal/logstore/tarmigrate.go | 101 ++++ internal/logwriter/errors.go | 7 - internal/logwriter/livelog.go | 205 -------- internal/logwriter/livelog_test.go | 108 ---- internal/logwriter/manager.go | 85 --- internal/logwriter/manager_test.go | 44 -- internal/logwriter/rotatinglog.go | 213 -------- internal/logwriter/rotatinglog_test.go | 98 ---- internal/oplog/oplog.go | 42 +- internal/orchestrator/orchestrator.go | 53 +- internal/orchestrator/taskrunnerimpl.go | 42 +- internal/orchestrator/tasks/task.go | 10 +- internal/orchestrator/tasks/taskcheck.go | 4 +- .../orchestrator/tasks/taskcollectgarbage.go | 32 +- internal/orchestrator/tasks/taskprune.go | 4 +- 22 files changed, 1078 insertions(+), 894 deletions(-) create mode 100644 internal/logstore/logstore.go create mode 100644 internal/logstore/logstore_test.go create mode 100644 internal/logstore/shardedmutex.go create mode 100644 internal/logstore/tarmigrate.go delete mode 100644 internal/logwriter/errors.go delete mode 100644 internal/logwriter/livelog.go delete mode 100644 internal/logwriter/livelog_test.go delete mode 100644 internal/logwriter/manager.go delete mode 100644 internal/logwriter/manager_test.go delete mode 100644 internal/logwriter/rotatinglog.go delete mode 100644 internal/logwriter/rotatinglog_test.go diff --git a/cmd/backrest/backrest.go b/cmd/backrest/backrest.go index afa7acb2..51d7974e 100644 --- a/cmd/backrest/backrest.go +++ b/cmd/backrest/backrest.go @@ -21,7 +21,7 @@ import ( "github.com/garethgeorge/backrest/internal/auth" "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/env" - "github.com/garethgeorge/backrest/internal/logwriter" + "github.com/garethgeorge/backrest/internal/logstore" "github.com/garethgeorge/backrest/internal/metric" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" @@ -82,10 +82,11 @@ func main() { migrateBboltOplog(opstore) // Create rotating log storage - logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs + logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs")) if err != nil { - zap.S().Fatalf("error creating rotating log storage: %v", err) + zap.S().Fatalf("error creating task log store: %v", err) } + logstore.MigrateTarLogsInDir(logStore, filepath.Join(env.DataDir(), "rotatinglogs")) // Create orchestrator and start task loop. orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore) diff --git a/go.mod b/go.mod index 90641668..f1d96115 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/getlantern/systray v1.2.2 github.com/gitploy-io/cronexpr v0.2.2 github.com/golang-jwt/jwt/v5 v5.2.1 + github.com/google/go-cmp v0.6.0 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/hashicorp/go-multierror v1.1.1 github.com/hectane/go-acl v0.0.0-20230122075934-ca0b05cb1adb diff --git a/internal/api/backresthandler.go b/internal/api/backresthandler.go index 9f2243a1..690702bf 100644 --- a/internal/api/backresthandler.go +++ b/internal/api/backresthandler.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "io" "os" "path" "reflect" @@ -18,7 +19,7 @@ import ( v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/gen/go/v1/v1connect" "github.com/garethgeorge/backrest/internal/config" - "github.com/garethgeorge/backrest/internal/logwriter" + "github.com/garethgeorge/backrest/internal/logstore" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator" "github.com/garethgeorge/backrest/internal/orchestrator/repo" @@ -36,12 +37,12 @@ type BackrestHandler struct { config config.ConfigStore orchestrator *orchestrator.Orchestrator oplog *oplog.OpLog - logStore *logwriter.LogManager + logStore *logstore.LogStore } var _ v1connect.BackrestHandler = &BackrestHandler{} -func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *logwriter.LogManager) *BackrestHandler { +func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *logstore.LogStore) *BackrestHandler { s := &BackrestHandler{ config: config, orchestrator: orchestrator, @@ -532,57 +533,72 @@ func (s *BackrestHandler) ClearHistory(ctx context.Context, req *connect.Request } func (s *BackrestHandler) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest], resp *connect.ServerStream[types.BytesValue]) error { - ch, err := s.logStore.Subscribe(req.Msg.Ref) + r, err := s.logStore.Open(req.Msg.Ref) if err != nil { - if errors.Is(err, logwriter.ErrFileNotFound) { + if errors.Is(err, logstore.ErrLogNotFound) { resp.Send(&types.BytesValue{ - Value: []byte(fmt.Sprintf("file associated with log %v not found, it may have rotated out of the log history", req.Msg.GetRef())), + Value: []byte(fmt.Sprintf("file associated with log %v not found, it may have expired.", req.Msg.GetRef())), }) + return nil } return fmt.Errorf("get log data %v: %w", req.Msg.GetRef(), err) } - defer s.logStore.Unsubscribe(req.Msg.Ref, ch) - - doneCh := make(chan struct{}) - - var mu sync.Mutex - var buf bytes.Buffer - interval := time.NewTicker(250 * time.Millisecond) - defer interval.Stop() + go func() { + <-ctx.Done() + r.Close() + }() + var bufferMu sync.Mutex + var buffer bytes.Buffer + var errChan = make(chan error, 1) go func() { - for data := range ch { - mu.Lock() - buf.Write(data) - mu.Unlock() + data := make([]byte, 4*1024) + for { + n, err := r.Read(data) + if n == 0 { + close(errChan) + break + } else if err != nil && err != io.EOF { + errChan <- fmt.Errorf("failed to read log data: %w", err) + close(errChan) + return + } + bufferMu.Lock() + buffer.Write(data[:n]) + bufferMu.Unlock() } - close(doneCh) }() - flushHelper := func() error { - mu.Lock() - defer mu.Unlock() - if buf.Len() > 0 { - if err := resp.Send(&types.BytesValue{Value: buf.Bytes()}); err != nil { - return err + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + flush := func() error { + bufferMu.Lock() + if buffer.Len() > 0 { + if err := resp.Send(&types.BytesValue{Value: buffer.Bytes()}); err != nil { + bufferMu.Unlock() + return fmt.Errorf("failed to send log data: %w", err) } - buf.Reset() + buffer.Reset() } + bufferMu.Unlock() return nil } for { select { - case <-interval.C: - if err := flushHelper(); err != nil { + case <-ctx.Done(): + return flush() + case err := <-errChan: + _ = flush() + return err + case <-ticker.C: + if err := flush(); err != nil { return err } - case <-ctx.Done(): - return ctx.Err() - case <-doneCh: - return flushHelper() } } + } func (s *BackrestHandler) GetDownloadURL(ctx context.Context, req *connect.Request[types.Int64Value]) (*connect.Response[types.StringValue], error) { diff --git a/internal/api/backresthandler_test.go b/internal/api/backresthandler_test.go index 88de3b26..ce25d509 100644 --- a/internal/api/backresthandler_test.go +++ b/internal/api/backresthandler_test.go @@ -18,7 +18,7 @@ import ( "github.com/garethgeorge/backrest/gen/go/types" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/config" - "github.com/garethgeorge/backrest/internal/logwriter" + "github.com/garethgeorge/backrest/internal/logstore" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/oplog/bboltstore" "github.com/garethgeorge/backrest/internal/orchestrator" @@ -769,7 +769,7 @@ type systemUnderTest struct { oplog *oplog.OpLog opstore *bboltstore.BboltStore orch *orchestrator.Orchestrator - logStore *logwriter.LogManager + logStore *logstore.LogStore config *v1.Config } @@ -785,7 +785,7 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT if err != nil { t.Fatalf("Failed to find or install restic binary: %v", err) } - opstore, err := bboltstore.NewBboltStore(dir + "/oplog.boltdb") + opstore, err := bboltstore.NewBboltStore(filepath.Join(dir, "oplog.bbolt")) if err != nil { t.Fatalf("Failed to create oplog store: %v", err) } @@ -794,10 +794,11 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT if err != nil { t.Fatalf("Failed to create oplog: %v", err) } - logStore, err := logwriter.NewLogManager(dir+"/log", 10) + logStore, err := logstore.NewLogStore(filepath.Join(dir, "tasklogs")) if err != nil { t.Fatalf("Failed to create log store: %v", err) } + t.Cleanup(func() { logStore.Close() }) orch, err := orchestrator.NewOrchestrator( resticBin, cfg, oplog, logStore, ) diff --git a/internal/logstore/logstore.go b/internal/logstore/logstore.go new file mode 100644 index 00000000..8be8e39d --- /dev/null +++ b/internal/logstore/logstore.go @@ -0,0 +1,487 @@ +package logstore + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + "go.uber.org/zap" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" +) + +var ( + ErrLogNotFound = fmt.Errorf("log not found") +) + +type LogStore struct { + dir string + inprogressDir string + mu shardedRWMutex + dbpool *sqlitex.Pool + + trackingMu sync.Mutex // guards refcount and subscribers + refcount map[string]int // id : refcount + subscribers map[string][]chan struct{} +} + +func NewLogStore(dir string) (*LogStore, error) { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("create dir: %v", err) + } + + dbpath := filepath.Join(dir, "logs.sqlite") + dbpool, err := sqlitex.NewPool(dbpath, sqlitex.PoolOptions{ + PoolSize: 16, + Flags: sqlite.OpenReadWrite | sqlite.OpenCreate | sqlite.OpenWAL, + }) + if err != nil { + return nil, fmt.Errorf("open sqlite pool: %v", err) + } + + ls := &LogStore{ + dir: dir, + inprogressDir: filepath.Join(dir, ".inprogress"), + mu: newShardedRWMutex(64), // 64 shards should be enough to avoid much contention + dbpool: dbpool, + subscribers: make(map[string][]chan struct{}), + refcount: make(map[string]int), + } + if err := ls.init(); err != nil { + return nil, fmt.Errorf("init log store: %v", err) + } + + return ls, nil +} + +func (ls *LogStore) init() error { + if err := os.MkdirAll(ls.inprogressDir, 0755); err != nil { + return fmt.Errorf("create inprogress dir: %v", err) + } + + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + if err := sqlitex.ExecuteScript(conn, ` + PRAGMA auto_vacuum = 1; + PRAGMA journal_mode=WAL; + + CREATE TABLE IF NOT EXISTS logs ( + id TEXT PRIMARY KEY, + expiration_ts_unix INTEGER DEFAULT 0, -- unix timestamp of when the log will expire + owner_opid INTEGER DEFAULT 0, -- id of the operation that owns this log; will be used for cleanup. + data_fname TEXT, -- relative path to the file containing the log data + data_gz BLOB -- compressed log data as an alternative to data_fname + ); + + CREATE INDEX IF NOT EXISTS logs_data_fname_idx ON logs (data_fname); + CREATE INDEX IF NOT EXISTS logs_expiration_ts_unix_idx ON logs (expiration_ts_unix); + + CREATE TABLE IF NOT EXISTS version_info ( + version INTEGER NOT NULL + ); + + -- Create a table to store the schema version, will be used for migrations in the future + INSERT INTO version_info (version) + SELECT 0 WHERE NOT EXISTS (SELECT 1 FROM version_info); + `, nil); err != nil { + return fmt.Errorf("execute init script: %v", err) + } + + // loop through all inprogress files and finalize them if they are in the database + files, err := os.ReadDir(ls.inprogressDir) + if err != nil { + return fmt.Errorf("read inprogress dir: %v", err) + } + + for _, file := range files { + if file.IsDir() { + continue + } + + fname := file.Name() + var id string + if err := sqlitex.ExecuteTransient(conn, "SELECT id FROM logs WHERE data_fname = ?", &sqlitex.ExecOptions{ + Args: []any{fname}, + ResultFunc: func(stmt *sqlite.Stmt) error { + id = stmt.ColumnText(0) + return nil + }, + }); err != nil { + return fmt.Errorf("select log: %v", err) + } + + if id != "" { + err := ls.finalizeLogFile(id, fname) + if err != nil { + zap.S().Warnf("sqlite log writer couldn't finalize dangling inprogress log file %v: %v", fname, err) + continue + } + } + if err := os.Remove(filepath.Join(ls.inprogressDir, fname)); err != nil { + zap.S().Warnf("sqlite log writer couldn't remove dangling inprogress log file %v: %v", fname, err) + } + } + + return nil +} + +func (ls *LogStore) Close() error { + return ls.dbpool.Close() +} + +func (ls *LogStore) Create(id string, parentOpID int64, ttl time.Duration) (io.WriteCloser, error) { + ls.mu.Lock(id) + defer ls.mu.Unlock(id) + + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return nil, fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + // potentially prune any expired logs + if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE expiration_ts_unix < ? AND expiration_ts_unix != 0", &sqlitex.ExecOptions{ + Args: []any{time.Now().Unix()}, + }); err != nil { + return nil, fmt.Errorf("prune expired logs: %v", err) + } + + // create a random file for the log while it's being written + randBytes := make([]byte, 16) + if _, err := rand.Read(randBytes); err != nil { + return nil, fmt.Errorf("generate random bytes: %v", err) + } + fname := hex.EncodeToString(randBytes) + ".log" + f, err := os.Create(filepath.Join(ls.inprogressDir, fname)) + if err != nil { + return nil, fmt.Errorf("create temp file: %v", err) + } + + expire_ts_unix := time.Unix(0, 0) + if ttl != 0 { + expire_ts_unix = time.Now().Add(ttl) + } + + // fmt.Printf("INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (%v, %v, %v, %v)\n", id, expire_ts_unix.Unix(), parentOpID, fname) + + if err := sqlitex.ExecuteTransient(conn, "INSERT INTO logs (id, expiration_ts_unix, owner_opid, data_fname) VALUES (?, ?, ?, ?)", &sqlitex.ExecOptions{ + Args: []any{id, expire_ts_unix.Unix(), parentOpID, fname}, + }); err != nil { + return nil, fmt.Errorf("insert log: %v", err) + } + + ls.trackingMu.Lock() + ls.subscribers[id] = make([]chan struct{}, 0) + ls.refcount[id] = 1 + ls.trackingMu.Unlock() + + return &writer{ + ls: ls, + f: f, + fname: fname, + id: id, + }, nil +} + +func (ls *LogStore) Open(id string) (io.ReadCloser, error) { + ls.mu.Lock(id) + defer ls.mu.Unlock(id) + + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return nil, fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + var found bool + var fname string + var dataGz []byte + if err := sqlitex.ExecuteTransient(conn, "SELECT data_fname, data_gz FROM logs WHERE id = ?", &sqlitex.ExecOptions{ + Args: []any{id}, + ResultFunc: func(stmt *sqlite.Stmt) error { + found = true + if !stmt.ColumnIsNull(0) { + fname = stmt.ColumnText(0) + } + if !stmt.ColumnIsNull(1) { + dataGz = make([]byte, stmt.ColumnLen(1)) + stmt.ColumnBytes(1, dataGz) + } + return nil + }, + }); err != nil { + return nil, fmt.Errorf("select log: %v", err) + } else if !found { + return nil, ErrLogNotFound + } + + if fname != "" { + f, err := os.Open(filepath.Join(ls.inprogressDir, fname)) + if err != nil { + return nil, fmt.Errorf("open data file: %v", err) + } + ls.trackingMu.Lock() + ls.refcount[id]++ + ls.trackingMu.Unlock() + + return &reader{ + ls: ls, + f: f, + id: id, + fname: fname, + closed: make(chan struct{}), + }, nil + } else if dataGz != nil { + gzr, err := gzip.NewReader(bytes.NewReader(dataGz)) + if err != nil { + return nil, fmt.Errorf("create gzip reader: %v", err) + } + + return gzr, nil + } else { + return nil, errors.New("log has no associated data. This shouldn't be possible") + } +} + +func (ls *LogStore) Delete(id string) error { + ls.mu.Lock(id) + defer ls.mu.Unlock(id) + + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE id = ?", &sqlitex.ExecOptions{ + Args: []any{id}, + }); err != nil { + return fmt.Errorf("delete log: %v", err) + } + + if conn.Changes() == 0 { + return ErrLogNotFound + } + return nil +} + +func (ls *LogStore) DeleteWithParent(parentOpID int64) error { + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + if err := sqlitex.ExecuteTransient(conn, "DELETE FROM logs WHERE owner_opid = ?", &sqlitex.ExecOptions{ + Args: []any{parentOpID}, + }); err != nil { + return fmt.Errorf("delete log: %v", err) + } + + return nil +} + +func (ls *LogStore) SelectAll(f func(id string, parentID int64)) error { + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + return sqlitex.ExecuteTransient(conn, "SELECT id, owner_opid FROM logs ORDER BY owner_opid", &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + f(stmt.ColumnText(0), stmt.ColumnInt64(1)) + return nil + }, + }) +} + +func (ls *LogStore) subscribe(id string) chan struct{} { + ls.trackingMu.Lock() + defer ls.trackingMu.Unlock() + + subs, ok := ls.subscribers[id] + if !ok { + return nil + } + + ch := make(chan struct{}) + ls.subscribers[id] = append(subs, ch) + return ch +} + +func (ls *LogStore) notify(id string) { + ls.trackingMu.Lock() + defer ls.trackingMu.Unlock() + subs, ok := ls.subscribers[id] + if !ok { + return + } + for _, ch := range subs { + close(ch) + } + ls.subscribers[id] = subs[:0] +} + +func (ls *LogStore) finalizeLogFile(id string, fname string) error { + conn, err := ls.dbpool.Take(context.Background()) + if err != nil { + return fmt.Errorf("take connection: %v", err) + } + defer ls.dbpool.Put(conn) + + f, err := os.Open(filepath.Join(ls.inprogressDir, fname)) + if err != nil { + return err + } + defer f.Close() + + var dataGz bytes.Buffer + gzw := gzip.NewWriter(&dataGz) + if _, e := io.Copy(gzw, f); e != nil { + return fmt.Errorf("compress log: %v", e) + } + if e := gzw.Close(); e != nil { + return fmt.Errorf("close gzip writer: %v", err) + } + + if e := sqlitex.ExecuteTransient(conn, "UPDATE logs SET data_fname = NULL, data_gz = ? WHERE id = ?", &sqlitex.ExecOptions{ + Args: []any{dataGz.Bytes(), id}, + }); e != nil { + return fmt.Errorf("update log: %v", e) + } else if conn.Changes() != 1 { + return fmt.Errorf("expected 1 row to be updated, got %d", conn.Changes()) + } + + return nil +} + +func (ls *LogStore) maybeReleaseTempFile(fname string) error { + ls.trackingMu.Lock() + defer ls.trackingMu.Unlock() + + _, ok := ls.refcount[fname] + if ok { + return nil + } + return os.Remove(filepath.Join(ls.inprogressDir, fname)) +} + +type writer struct { + ls *LogStore + id string + fname string + f *os.File + onClose sync.Once +} + +var _ io.WriteCloser = (*writer)(nil) + +func (w *writer) Write(p []byte) (n int, err error) { + w.ls.mu.Lock(w.id) + defer w.ls.mu.Unlock(w.id) + n, err = w.f.Write(p) + if n != 0 { + w.ls.notify(w.id) + } + return +} + +func (w *writer) Close() error { + err := w.f.Close() + + w.onClose.Do(func() { + w.ls.mu.Lock(w.id) + defer w.ls.mu.Unlock(w.id) + defer w.ls.notify(w.id) + + // manually close all subscribers and delete the subscriber entry from the map; there are no more writes coming. + w.ls.trackingMu.Lock() + subs := w.ls.subscribers[w.id] + for _, ch := range subs { + close(ch) + } + delete(w.ls.subscribers, w.id) + + // try to finalize the log file + if e := w.ls.finalizeLogFile(w.id, w.fname); e != nil { + err = multierror.Append(err, fmt.Errorf("finalize %v: %w", w.fname, e)) + } else { + w.ls.refcount[w.id]-- + if w.ls.refcount[w.id] == 0 { + delete(w.ls.refcount, w.id) + } + } + w.ls.trackingMu.Unlock() + + w.ls.maybeReleaseTempFile(w.fname) + }) + + return err +} + +type reader struct { + ls *LogStore + id string + fname string + f *os.File + onClose sync.Once + closed chan struct{} // unblocks any read calls e.g. can be used for early cancellation +} + +var _ io.ReadCloser = (*reader)(nil) + +func (r *reader) Read(p []byte) (n int, err error) { + r.ls.mu.RLock(r.id) + n, err = r.f.Read(p) + if err == io.EOF { + waiter := r.ls.subscribe(r.id) + r.ls.mu.RUnlock(r.id) + if waiter != nil { + select { + case <-waiter: + case <-r.closed: + return 0, io.EOF + } + } + r.ls.mu.RLock(r.id) + n, err = r.f.Read(p) + } + r.ls.mu.RUnlock(r.id) + + return +} + +func (r *reader) Close() error { + r.ls.mu.Lock(r.id) + defer r.ls.mu.Unlock(r.id) + + err := r.f.Close() + + r.onClose.Do(func() { + r.ls.trackingMu.Lock() + r.ls.refcount[r.id]-- + if r.ls.refcount[r.id] == 0 { + delete(r.ls.refcount, r.id) + } + r.ls.trackingMu.Unlock() + r.ls.maybeReleaseTempFile(r.fname) + close(r.closed) + }) + + return err +} diff --git a/internal/logstore/logstore_test.go b/internal/logstore/logstore_test.go new file mode 100644 index 00000000..3e82eed3 --- /dev/null +++ b/internal/logstore/logstore_test.go @@ -0,0 +1,297 @@ +package logstore + +import ( + "bytes" + "fmt" + "io" + "os" + "slices" + "sync" + "testing" + "time" +) + +func TestReadWrite(t *testing.T) { + t.Parallel() + + ls, err := NewLogStore(t.TempDir()) + if err != nil { + t.Fatalf("new log writer failed: %v", err) + } + defer ls.Close() + + w, err := ls.Create("test", 0, 0) + if err != nil { + t.Fatalf("create failed: %v", err) + } + if _, err := w.Write([]byte("hello, world")); err != nil { + t.Fatalf("write failed: %v", err) + } + + // assert that the file is on disk at this point + entries := getInprogressEntries(t, ls) + if len(entries) != 1 { + t.Fatalf("unexpected number of inprogress entries: %d", len(entries)) + } + + if err := w.Close(); err != nil { + t.Fatalf("close writer failed: %v", err) + } + + r, err := ls.Open("test") + if err != nil { + t.Fatalf("open failed: %v", err) + } + + data, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if string(data) != "hello, world" { + t.Fatalf("unexpected content: %s", data) + } + + entries = getInprogressEntries(t, ls) + if len(entries) != 0 { + t.Fatalf("unexpected number of inprogress entries: %d", len(entries)) + } +} + +func TestHugeReadWrite(t *testing.T) { + t.Parallel() + + ls, err := NewLogStore(t.TempDir()) + if err != nil { + t.Fatalf("new log writer failed: %v", err) + } + defer ls.Close() + + w, err := ls.Create("test", 0, 0) + if err != nil { + t.Fatalf("create failed: %v", err) + } + + data := bytes.Repeat([]byte("hello, world\n"), 1<<15) + if _, err := w.Write(data); err != nil { + t.Fatalf("write failed: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("close writer failed: %v", err) + } + + r, err := ls.Open("test") + if err != nil { + t.Fatalf("open failed: %v", err) + } + + readData := bytes.NewBuffer(nil) + if _, err := io.Copy(readData, r); err != nil { + t.Fatalf("read failed: %v", err) + } + if !bytes.Equal(readData.Bytes(), data) { + t.Fatalf("unexpected content") + } + + if err := r.Close(); err != nil { + t.Fatalf("close reader failed: %v", err) + } +} + +func TestReadWhileWrite(t *testing.T) { + t.Parallel() + + ls, err := NewLogStore(t.TempDir()) + if err != nil { + t.Fatalf("new log writer failed: %v", err) + } + defer ls.Close() + + w, err := ls.Create("test", 0, 0) + if err != nil { + t.Fatalf("create failed: %v", err) + } + r, err := ls.Open("test") + if err != nil { + t.Fatalf("open failed: %v", err) + } + data := bytes.NewBuffer(nil) + wantData := bytes.NewBuffer(nil) + + var wg sync.WaitGroup + var readn int64 + var readerr error + wg.Add(1) + go func() { + defer r.Close() + readn, readerr = io.Copy(data, r) + wg.Done() + }() + + for i := 0; i < 100; i++ { + str := fmt.Sprintf("hello, world %d\n", i) + wantData.WriteString(str) + + if _, err := w.Write([]byte(str)); err != nil { + t.Fatalf("write failed: %v", err) + } + + if i%2 == 0 { + time.Sleep(2 * time.Millisecond) + } + } + + fmt.Printf("trying to close writer from test...") + if err := w.Close(); err != nil { + t.Fatalf("close writer failed: %v", err) + } + + wg.Wait() + + // check that the asynchronous read completed successfully + if readerr != nil { + t.Fatalf("read failed: %v", readerr) + } + if readn == 0 || readn != int64(wantData.Len()) { + t.Fatalf("unexpected read length: %d", readn) + } + if !bytes.Equal(data.Bytes(), wantData.Bytes()) { + t.Fatalf("unexpected content: %s", data.Bytes()) + } + + // check that the finalized data matches expectations + var finalizedData bytes.Buffer + r2, err := ls.Open("test") + if err != nil { + t.Fatalf("open failed: %v", err) + } + if _, err := io.Copy(&finalizedData, r2); err != nil { + t.Fatalf("read failed: %v", err) + } + if !bytes.Equal(finalizedData.Bytes(), wantData.Bytes()) { + t.Fatalf("unexpected content: %s", finalizedData.Bytes()) + } + + if err := r2.Close(); err != nil { + t.Fatalf("close reader failed: %v", err) + } +} + +func TestCreateMany(t *testing.T) { + t.Parallel() + + ls, err := NewLogStore(t.TempDir()) + if err != nil { + t.Fatalf("new log writer failed: %v", err) + } + defer ls.Close() + + const n = 10 + for i := 0; i < n; i++ { + name := fmt.Sprintf("test%d", i) + w, err := ls.Create(name, 0, 0) + if err != nil { + t.Fatalf("create %q failed: %v", name, err) + } + if _, err := w.Write([]byte(fmt.Sprintf("hello, world %d", i))); err != nil { + t.Fatalf("write failed: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("close writer failed: %v", err) + } + } + + entries := getInprogressEntries(t, ls) + if len(entries) != 0 { + t.Fatalf("unexpected number of inprogress entries: %d", len(entries)) + } + + for i := 0; i < n; i++ { + name := fmt.Sprintf("test%d", i) + r, err := ls.Open(name) + if err != nil { + t.Fatalf("open %q failed: %v", name, err) + } + data, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if string(data) != fmt.Sprintf("hello, world %d", i) { + t.Fatalf("unexpected content: %s", data) + } + if err := r.Close(); err != nil { + t.Fatalf("close reader failed: %v", err) + } + } +} + +func TestReopenStore(t *testing.T) { + d := t.TempDir() + { + ls, err := NewLogStore(d) + if err != nil { + t.Fatalf("new log writer failed: %v", err) + } + + w, err := ls.Create("test", 0, 0) + if err != nil { + t.Fatalf("create failed: %v", err) + } + + if _, err := w.Write([]byte("hello, world")); err != nil { + t.Fatalf("write failed: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("close writer failed: %v", err) + } + + // confirm that the file is on disk + r, err := ls.Open("test") + if err != nil { + t.Fatalf("open first store failed: %v", err) + } + r.Close() + + if err := ls.Close(); err != nil { + t.Fatalf("close log store failed: %v", err) + } + + } + + { + ls, err := NewLogStore(d) + if err != nil { + t.Fatalf("new log writer failed: %v", err) + } + + r, err := ls.Open("test") + if err != nil { + t.Fatalf("open failed: %v", err) + } + + data, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read failed: %v", err) + } + if string(data) != "hello, world" { + t.Fatalf("unexpected content: %s", data) + } + if err := r.Close(); err != nil { + t.Fatalf("close reader failed: %v", err) + } + + if err := ls.Close(); err != nil { + t.Fatalf("close log store failed: %v", err) + } + } +} + +func getInprogressEntries(t *testing.T, ls *LogStore) []os.DirEntry { + entries, err := os.ReadDir(ls.inprogressDir) + if err != nil { + t.Fatalf("read dir failed: %v", err) + } + + entries = slices.DeleteFunc(entries, func(e os.DirEntry) bool { return e.IsDir() }) + return entries +} diff --git a/internal/logstore/shardedmutex.go b/internal/logstore/shardedmutex.go new file mode 100644 index 00000000..239e233b --- /dev/null +++ b/internal/logstore/shardedmutex.go @@ -0,0 +1,43 @@ +package logstore + +import ( + "hash/fnv" + "sync" +) + +type shardedRWMutex struct { + mu []sync.RWMutex +} + +func newShardedRWMutex(n int) shardedRWMutex { + mu := make([]sync.RWMutex, n) + return shardedRWMutex{ + mu: mu, + } +} + +func (sm *shardedRWMutex) Lock(key string) { + idx := hash(key) % uint32(len(sm.mu)) + sm.mu[idx].Lock() +} + +func (sm *shardedRWMutex) Unlock(key string) { + idx := hash(key) % uint32(len(sm.mu)) + sm.mu[idx].Unlock() +} + +func (sm *shardedRWMutex) RLock(key string) { + idx := hash(key) % uint32(len(sm.mu)) + sm.mu[idx].RLock() +} + +func (sm *shardedRWMutex) RUnlock(key string) { + idx := hash(key) % uint32(len(sm.mu)) + sm.mu[idx].RUnlock() +} + +func hash(s string) uint32 { + h := fnv.New32a() + h.Write([]byte(s)) + return h.Sum32() +} diff --git a/internal/logstore/tarmigrate.go b/internal/logstore/tarmigrate.go new file mode 100644 index 00000000..31b819a3 --- /dev/null +++ b/internal/logstore/tarmigrate.go @@ -0,0 +1,101 @@ +package logstore + +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "go.uber.org/zap" +) + +func MigrateTarLogsInDir(ls *LogStore, dir string) { + files, err := os.ReadDir(dir) + if err != nil { + zap.L().Fatal("failed to read directory", zap.String("dir", dir), zap.Error(err)) + } + + for _, file := range files { + if file.IsDir() { + continue + } + + if filepath.Ext(file.Name()) != ".tar" { + continue + } + + if err := MigrateTarLog(ls, filepath.Join(dir, file.Name())); err != nil { + zap.S().Warnf("failed to migrate tar log %q: %v", file.Name(), err) + } else { + if err := os.Remove(filepath.Join(dir, file.Name())); err != nil { + zap.S().Warnf("failed to remove fully migrated tar log %q: %v", file.Name(), err) + } + } + } +} + +func MigrateTarLog(ls *LogStore, logTar string) error { + baseName := filepath.Base(logTar) + + f, err := os.Open(logTar) + if err != nil { + return fmt.Errorf("failed to open tar file: %v", err) + } + + tarReader := tar.NewReader(f) + + var count int64 + var bytes int64 + for { + header, err := tarReader.Next() + if err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("failed to read tar header: %v", err) + } + + if header.Typeflag != tar.TypeReg { + continue + } + + w, err := ls.Create(baseName+"/"+strings.TrimSuffix(header.Name, ".gz"), 0, 14*24*time.Hour) + if err != nil { + return fmt.Errorf("failed to create log writer: %v", err) + } + + var r io.ReadCloser = io.NopCloser(tarReader) + if strings.HasSuffix(header.Name, ".gz") { + r, err = gzip.NewReader(tarReader) + if err != nil { + return fmt.Errorf("failed to create gzip reader: %v", err) + } + } + + if n, err := io.Copy(w, r); err != nil { + return fmt.Errorf("failed to copy tar entry: %v", err) + } else { + bytes += n + count++ + } + + if err := r.Close(); err != nil { + return fmt.Errorf("failed to close tar entry reader: %v", err) + } + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close log writer: %v", err) + } + } + + if err := f.Close(); err != nil { + return fmt.Errorf("failed to close tar file: %v", err) + } + + zap.L().Info("migrated tar log", zap.String("log", logTar), zap.Int64("entriesCopied", count), zap.Int64("bytesCopied", bytes)) + + return nil +} diff --git a/internal/logwriter/errors.go b/internal/logwriter/errors.go deleted file mode 100644 index 4e810238..00000000 --- a/internal/logwriter/errors.go +++ /dev/null @@ -1,7 +0,0 @@ -package logwriter - -import "errors" - -var ErrFileNotFound = errors.New("file not found") -var ErrNotFound = errors.New("entry not found") -var ErrBadName = errors.New("bad name") diff --git a/internal/logwriter/livelog.go b/internal/logwriter/livelog.go deleted file mode 100644 index 244fbdfb..00000000 --- a/internal/logwriter/livelog.go +++ /dev/null @@ -1,205 +0,0 @@ -package logwriter - -import ( - "bytes" - "errors" - "io" - "os" - "path" - "slices" - "sync" -) - -var ErrAlreadyExists = errors.New("already exists") - -type LiveLog struct { - mu sync.Mutex - dir string - writers map[string]*LiveLogWriter -} - -func NewLiveLogger(dir string) (*LiveLog, error) { - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, err - } - return &LiveLog{dir: dir, writers: make(map[string]*LiveLogWriter)}, nil -} - -func (t *LiveLog) ListIDs() []string { - t.mu.Lock() - defer t.mu.Unlock() - - files, err := os.ReadDir(t.dir) - if err != nil { - return nil - } - - ids := make([]string, 0, len(files)) - for _, f := range files { - if !f.IsDir() { - ids = append(ids, f.Name()) - } - } - return ids -} - -func (t *LiveLog) NewWriter(id string) (*LiveLogWriter, error) { - t.mu.Lock() - defer t.mu.Unlock() - if _, ok := t.writers[id]; ok { - return nil, ErrAlreadyExists - } - fh, err := os.Create(path.Join(t.dir, id)) - if err != nil { - return nil, err - } - w := &LiveLogWriter{ - fh: fh, - id: id, - ll: t, - path: path.Join(t.dir, id), - } - t.writers[id] = w - return w, nil -} - -func (t *LiveLog) Unsubscribe(id string, ch chan []byte) { - t.mu.Lock() - defer t.mu.Unlock() - - if w, ok := t.writers[id]; ok { - w.mu.Lock() - defer w.mu.Unlock() - - idx := slices.IndexFunc(w.subscribers, func(c chan []byte) bool { - return c == ch - }) - if idx >= 0 { - close(ch) - w.subscribers = append(w.subscribers[:idx], w.subscribers[idx+1:]...) - } - } -} - -func (t *LiveLog) Subscribe(id string) (chan []byte, error) { - t.mu.Lock() - defer t.mu.Unlock() - - if w, ok := t.writers[id]; ok { - // If there is a writer, block writes until we are done opening the file - w.mu.Lock() - defer w.mu.Unlock() - } - - fh, err := os.Open(path.Join(t.dir, id)) - if err != nil { - if os.IsNotExist(err) { - return nil, ErrFileNotFound - } - return nil, err - } - - ch := make(chan []byte, 1) - go func() { - buf := make([]byte, 4096) - for { - n, err := fh.Read(buf) - if err == io.EOF { - break - } else if err != nil { - return - } - ch <- bytes.Clone(buf[:n]) - } - - // Lock the writer to prevent writes while we switch subscription modes - t.mu.Lock() - if w, ok := t.writers[id]; ok { - w.mu.Lock() - defer w.mu.Unlock() - } - t.mu.Unlock() - - // Read anything written while we were acquiring the lock - for { - n, err := fh.Read(buf) - if err == io.EOF { - break - } - if err != nil { - close(ch) - fh.Close() - return - } - ch <- bytes.Clone(buf[:n]) - } - fh.Close() - - // Install subscription in the writer OR close the channel if the writer is gone - t.mu.Lock() - if w, ok := t.writers[id]; ok { - w.subscribers = append(w.subscribers, ch) - } else { - close(ch) - } - t.mu.Unlock() - }() - - return ch, nil -} - -func (t *LiveLog) Remove(id string) error { - t.mu.Lock() - defer t.mu.Unlock() - delete(t.writers, id) - return os.Remove(path.Join(t.dir, id)) -} - -func (t *LiveLog) IsAlive(id string) bool { - t.mu.Lock() - defer t.mu.Unlock() - _, ok := t.writers[id] - return ok -} - -type LiveLogWriter struct { - mu sync.Mutex - ll *LiveLog - fh *os.File - id string - path string - subscribers []chan []byte -} - -func (t *LiveLogWriter) Write(data []byte) (int, error) { - t.mu.Lock() - defer t.mu.Unlock() - - n, err := t.fh.Write(data) - if err != nil { - return 0, err - } - if n != len(data) { - return n, errors.New("short write") - } - for _, ch := range t.subscribers { - ch <- bytes.Clone(data) - } - return n, nil -} - -func (t *LiveLogWriter) Close() error { - t.mu.Lock() - defer t.mu.Unlock() - - t.ll.mu.Lock() - defer t.ll.mu.Unlock() - delete(t.ll.writers, t.id) - - for _, ch := range t.subscribers { - close(ch) - } - t.subscribers = nil - - return t.fh.Close() -} diff --git a/internal/logwriter/livelog_test.go b/internal/logwriter/livelog_test.go deleted file mode 100644 index 9d136c10..00000000 --- a/internal/logwriter/livelog_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package logwriter - -import ( - "bytes" - "testing" -) - -func TestWriteThenRead(t *testing.T) { - t.TempDir() - - logger, err := NewLiveLogger(t.TempDir()) - if err != nil { - t.Fatalf("NewLiveLogger failed: %v", err) - } - - writer, err := logger.NewWriter("test") - if err != nil { - t.Fatalf("NewWriter failed: %v", err) - } - - data := []byte("test") - if _, err := writer.Write(data); err != nil { - t.Fatalf("Write failed: %v", err) - } - writer.Close() - - ch, err := logger.Subscribe("test") - if err != nil { - t.Fatalf("Subscribe failed: %v", err) - } - d := <-ch - if string(d) != "test" { - t.Fatalf("Read failed: expected test, got %s", string(d)) - } -} - -func TestBigWriteThenRead(t *testing.T) { - bigtext := genbytes(32 * 1000) - logger, err := NewLiveLogger(t.TempDir()) - if err != nil { - t.Fatalf("NewLiveLogger failed: %v", err) - } - - writer, err := logger.NewWriter("test") - if err != nil { - t.Fatalf("NewWriter failed: %v", err) - } - - if _, err := writer.Write([]byte(bigtext)); err != nil { - t.Fatalf("Write failed: %v", err) - } - writer.Close() - - ch, err := logger.Subscribe("test") - if err != nil { - t.Fatalf("Subscribe failed: %v", err) - } - - data := make([]byte, 0) - for d := range ch { - data = append(data, d...) - } - if !bytes.Equal(data, bigtext) { - t.Fatalf("Read failed: expected %d bytes, got %d", len(bigtext), len(data)) - } -} - -func TestWritingWhileReading(t *testing.T) { - logger, err := NewLiveLogger(t.TempDir()) - if err != nil { - t.Fatalf("NewLiveLogger failed: %v", err) - } - - writer, err := logger.NewWriter("test") - if err != nil { - t.Fatalf("NewWriter failed: %v", err) - } - - if _, err := writer.Write([]byte("test")); err != nil { - t.Fatalf("Write failed: %v", err) - } - - ch, err := logger.Subscribe("test") - if err != nil { - t.Fatalf("Subscribe failed: %v", err) - } - - if r1 := <-ch; string(r1) != "test" { - t.Fatalf("Read failed: expected test, got %s", string(r1)) - } - - go func() { - writer.Write([]byte("test2")) - writer.Close() - }() - - if r2 := <-ch; string(r2) != "test2" { - t.Fatalf("Read failed: expected test2, got %s", string(r2)) - } -} - -func genbytes(length int) []byte { - data := make([]byte, length) - for i := 0; i < length; i++ { - data[i] = 'a' - } - return data -} diff --git a/internal/logwriter/manager.go b/internal/logwriter/manager.go deleted file mode 100644 index 56bf90d4..00000000 --- a/internal/logwriter/manager.go +++ /dev/null @@ -1,85 +0,0 @@ -package logwriter - -import ( - "errors" - "fmt" - "io" - "path" - "strings" -) - -type LogManager struct { - llm *LiveLog - rlm *RotatingLog -} - -func NewLogManager(dir string, maxLogFiles int) (*LogManager, error) { - ll, err := NewLiveLogger(path.Join(dir, ".live")) - if err != nil { - return nil, err - } - - rl := NewRotatingLog(path.Join(dir), maxLogFiles) - if err != nil { - return nil, err - } - - return &LogManager{ - llm: ll, - rlm: rl, - }, nil -} - -// NewLiveWriter creates a new live log writer. The ID is the base name of the log file, a transformed ID is returned. -func (lm *LogManager) NewLiveWriter(idbase string) (string, io.WriteCloser, error) { - id := fmt.Sprintf("%s.livelog", idbase) - w, err := lm.llm.NewWriter(id) - return id, w, err -} - -func (lm *LogManager) Subscribe(id string) (chan []byte, error) { - if strings.HasSuffix(id, ".livelog") { - return lm.llm.Subscribe(id) - } else { - // TODO: implement streaming from rotating log storage - ch := make(chan []byte, 1) - data, err := lm.rlm.Read(id) - if err != nil { - return nil, err - } - ch <- data - close(ch) - return ch, nil - } -} - -func (lm *LogManager) Unsubscribe(id string, ch chan []byte) { - lm.llm.Unsubscribe(id, ch) -} - -// LiveLogIDs returns the list of IDs of live logs e.g. with writes in progress. -func (lm *LogManager) LiveLogIDs() []string { - return lm.llm.ListIDs() -} - -func (lm *LogManager) Finalize(id string) (frozenID string, err error) { - if lm.llm.IsAlive(id) { - return "", errors.New("live log still being written") - } - - ch, err := lm.llm.Subscribe(id) - if err != nil { - return "", err - } - - bytes := make([]byte, 0) - for data := range ch { - bytes = append(bytes, data...) - } - - if err := lm.llm.Remove(id); err != nil { - return "", err - } - - return lm.rlm.Write(bytes) -} diff --git a/internal/logwriter/manager_test.go b/internal/logwriter/manager_test.go deleted file mode 100644 index 6043931a..00000000 --- a/internal/logwriter/manager_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package logwriter - -import "testing" - -func TestLogLifecycle(t *testing.T) { - mgr, err := NewLogManager(t.TempDir(), 10) - if err != nil { - t.Fatalf("NewLogManager failed: %v", err) - } - - id, w, err := mgr.NewLiveWriter("test") - if err != nil { - t.Fatalf("NewLiveWriter failed: %v", err) - } - - ch, err := mgr.Subscribe(id) - if err != nil { - t.Fatalf("Subscribe to live log %q failed: %v", id, err) - } - - contents := "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum." - if _, err := w.Write([]byte(contents)); err != nil { - t.Fatalf("Write failed: %v", err) - } - w.Close() - - if data := <-ch; string(data) != contents { - t.Fatalf("Read failed: expected %q, got %q", contents, string(data)) - } - - finalID, err := mgr.Finalize(id) - if err != nil { - t.Fatalf("Finalize failed: %v", err) - } - - finalCh, err := mgr.Subscribe(finalID) - if err != nil { - t.Fatalf("Subscribe to finalized log %q failed: %v", finalID, err) - } - - if data := <-finalCh; string(data) != contents { - t.Fatalf("Read failed: expected %q, got %q", contents, string(data)) - } -} diff --git a/internal/logwriter/rotatinglog.go b/internal/logwriter/rotatinglog.go deleted file mode 100644 index 098f9f6f..00000000 --- a/internal/logwriter/rotatinglog.go +++ /dev/null @@ -1,213 +0,0 @@ -package logwriter - -import ( - "archive/tar" - "bytes" - "compress/gzip" - "fmt" - "io" - "io/fs" - "os" - "path" - "slices" - "sort" - "strconv" - "strings" - "sync" - "time" - - "go.uber.org/zap" -) - -type RotatingLog struct { - mu sync.Mutex - dir string - lastFile string - maxLogFiles int - now func() time.Time -} - -func NewRotatingLog(dir string, maxLogFiles int) *RotatingLog { - return &RotatingLog{dir: dir, maxLogFiles: maxLogFiles} -} - -func (r *RotatingLog) curfile() string { - t := time.Now() - if r.now != nil { - t = r.now() // for testing - } - return path.Join(r.dir, t.Format("2006-01-02-logs.tar")) -} - -func (r *RotatingLog) removeExpiredFiles() error { - if r.maxLogFiles < 0 { - return nil - } - files, err := r.files() - if err != nil { - return fmt.Errorf("list files: %w", err) - } - if len(files) >= r.maxLogFiles { - for i := 0; i < len(files)-r.maxLogFiles+1; i++ { - if err := os.Remove(path.Join(r.dir, files[i])); err != nil { - return err - } - } - } - return nil -} - -func (r *RotatingLog) Write(data []byte) (string, error) { - r.mu.Lock() - defer r.mu.Unlock() - - data, err := compress(data) - if err != nil { - return "", err - } - - file := r.curfile() - if file != r.lastFile { - if err := os.MkdirAll(r.dir, os.ModePerm); err != nil { - return "", err - } - r.lastFile = file - if err := r.removeExpiredFiles(); err != nil { - zap.L().Error("failed to remove expired files for rotatinglog", zap.Error(err), zap.String("dir", r.dir)) - } - } - f, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, os.ModePerm) - if err != nil { - return "", err - } - defer f.Close() - - size, err := f.Seek(0, io.SeekEnd) - if err != nil { - return "", err - } - pos := int64(0) - if size != 0 { - pos, err = f.Seek(-1024, io.SeekEnd) - if err != nil { - return "", err - } - } - tw := tar.NewWriter(f) - defer tw.Close() - - tw.WriteHeader(&tar.Header{ - Name: fmt.Sprintf("%d.gz", pos), - Size: int64(len(data)), - Mode: 0600, - Typeflag: tar.TypeReg, - ModTime: time.Now(), - }) - - _, err = tw.Write(data) - if err != nil { - return "", err - } - - return fmt.Sprintf("%s/%d", path.Base(file), pos), nil -} - -func (r *RotatingLog) Read(name string) ([]byte, error) { - r.mu.Lock() - defer r.mu.Unlock() - - // parse name e.g. of the form "2006-01-02-15-04-05.tar/1234" - splitAt := strings.Index(name, "/") - if splitAt == -1 { - return nil, ErrBadName - } - - offset, err := strconv.Atoi(name[splitAt+1:]) - if err != nil { - return nil, ErrBadName - } - - // open file and seek to the offset where the tarball segment should start - f, err := os.Open(path.Join(r.dir, name[:splitAt])) - if err != nil { - if os.IsNotExist(err) { - return nil, ErrFileNotFound - } - return nil, fmt.Errorf("open failed: %w", err) - } - defer f.Close() - f.Seek(int64(offset), io.SeekStart) - - // search for the tarball segment in the tarball and read + decompress it if found - seekName := fmt.Sprintf("%d.gz", offset) - tr := tar.NewReader(f) - for { - hdr, err := tr.Next() - if err == io.EOF { - break - } - if err != nil { - return nil, fmt.Errorf("next failed: %v", err) - } - if hdr.Name == seekName { - buf := make([]byte, hdr.Size) - _, err := io.ReadFull(tr, buf) - if err != nil { - return nil, fmt.Errorf("read failed: %v", err) - } - return decompress(buf) - } - } - return nil, ErrNotFound -} - -func (r *RotatingLog) files() ([]string, error) { - files, err := os.ReadDir(r.dir) - if err != nil { - return nil, err - } - files = slices.DeleteFunc(files, func(f fs.DirEntry) bool { - return f.IsDir() || !strings.HasSuffix(f.Name(), "-logs.tar") - }) - sort.Slice(files, func(i, j int) bool { - return files[i].Name() < files[j].Name() - }) - var result []string - for _, f := range files { - result = append(result, f.Name()) - } - return result, nil -} - -func compress(data []byte) ([]byte, error) { - var buf bytes.Buffer - zw := gzip.NewWriter(&buf) - - if _, err := zw.Write(data); err != nil { - return nil, err - } - - if err := zw.Close(); err != nil { - return nil, err - } - - return buf.Bytes(), nil -} - -func decompress(compressedData []byte) ([]byte, error) { - var buf bytes.Buffer - zr, err := gzip.NewReader(bytes.NewReader(compressedData)) - if err != nil { - return nil, err - } - - if _, err := io.Copy(&buf, zr); err != nil { - return nil, err - } - - if err := zr.Close(); err != nil { - return nil, err - } - - return buf.Bytes(), nil -} diff --git a/internal/logwriter/rotatinglog_test.go b/internal/logwriter/rotatinglog_test.go deleted file mode 100644 index 12c3e7f8..00000000 --- a/internal/logwriter/rotatinglog_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package logwriter - -import ( - "fmt" - "strings" - "testing" - "time" -) - -func TestRotatingLog(t *testing.T) { - log := NewRotatingLog(t.TempDir()+"/rotatinglog", 10) - name, err := log.Write([]byte("test")) - if err != nil { - t.Fatalf("Write failed: %v", err) - } - data, err := log.Read(name) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - if string(data) != "test" { - t.Fatalf("Read failed: expected test, got %s", string(data)) - } -} - -func TestRotatingLogMultipleEntries(t *testing.T) { - log := NewRotatingLog(t.TempDir()+"/rotatinglog", 10) - refs := make([]string, 10) - for i := 0; i < 10; i++ { - name, err := log.Write([]byte(fmt.Sprintf("%d", i))) - if err != nil { - t.Fatalf("Write failed: %v", err) - } - data, err := log.Read(name) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - if fmt.Sprintf("%d", i) != string(data) { - t.Fatalf("Read failed: expected %d, got %s", i, string(data)) - } - refs[i] = name - } - - for i := 0; i < 10; i++ { - data, err := log.Read(refs[i]) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - if fmt.Sprintf("%d", i) != string(data) { - t.Fatalf("Read failed: expected %d, got %s", i, string(data)) - } - } -} - -func TestBigEntries(t *testing.T) { - log := NewRotatingLog(t.TempDir()+"/rotatinglog", 10) - for size := range []int{10, 100, 1234, 5938, 1023, 1025} { - data := genstr(size) - name, err := log.Write([]byte(data)) - if err != nil { - t.Fatalf("Write failed: %v", err) - } - read, err := log.Read(name) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - if string(read) != data { - t.Fatalf("Read failed: expected %s, got %s", data, string(read)) - } - } -} - -func TestLogRotate(t *testing.T) { - curTime := time.Unix(0, 0) - curTime = curTime.Add(time.Hour * 24) - - log := NewRotatingLog(t.TempDir()+"/rotatinglog", 3) - log.now = func() time.Time { return curTime } - - for i := 0; i < 10; i++ { - _, err := log.Write([]byte(fmt.Sprintf("%d", i))) - if err != nil { - t.Fatalf("Write failed: %v", err) - } - curTime = curTime.Add(time.Hour * 24) - } - - files, err := log.files() - if err != nil { - t.Fatalf("files failed: %v", err) - } - if len(files) != 3 { - t.Fatalf("files failed: expected 3, got %d", len(files)) - } -} - -func genstr(size int) string { - return strings.Repeat("a", size) -} diff --git a/internal/oplog/oplog.go b/internal/oplog/oplog.go index e8abce23..05c4280c 100644 --- a/internal/oplog/oplog.go +++ b/internal/oplog/oplog.go @@ -20,15 +20,22 @@ var ( ErrStopIteration = errors.New("stop iteration") ErrNotExist = errors.New("operation does not exist") ErrExist = errors.New("operation already exists") + + NullOPID = int64(0) ) type Subscription = func(ops []*v1.Operation, event OperationEvent) +type subAndQuery struct { + f *Subscription + q Query +} + type OpLog struct { store OpStore subscribersMu sync.Mutex - subscribers []*Subscription + subscribers []subAndQuery } func NewOpLog(store OpStore) (*OpLog, error) { @@ -43,23 +50,37 @@ func NewOpLog(store OpStore) (*OpLog, error) { return o, nil } -func (o *OpLog) curSubscribers() []*Subscription { +func (o *OpLog) curSubscribers() []subAndQuery { o.subscribersMu.Lock() defer o.subscribersMu.Unlock() return slices.Clone(o.subscribers) } +func (o *OpLog) notify(ops []*v1.Operation, event OperationEvent) { + for _, sub := range o.curSubscribers() { + notifyOps := make([]*v1.Operation, 0, len(ops)) + for _, op := range ops { + if sub.q.Match(op) { + notifyOps = append(notifyOps, op) + } + } + if len(notifyOps) > 0 { + (*sub.f)(notifyOps, event) + } + } +} + func (o *OpLog) Query(q Query, f func(*v1.Operation) error) error { return o.store.Query(q, f) } func (o *OpLog) Subscribe(q Query, f *Subscription) { - o.subscribers = append(o.subscribers, f) + o.subscribers = append(o.subscribers, subAndQuery{f: f, q: q}) } func (o *OpLog) Unsubscribe(f *Subscription) error { for i, sub := range o.subscribers { - if sub == f { + if sub.f == f { o.subscribers = append(o.subscribers[:i], o.subscribers[i+1:]...) return nil } @@ -82,9 +103,7 @@ func (o *OpLog) Add(ops ...*v1.Operation) error { return err } - for _, sub := range o.curSubscribers() { - (*sub)(ops, OPERATION_ADDED) - } + o.notify(ops, OPERATION_ADDED) return nil } @@ -99,9 +118,7 @@ func (o *OpLog) Update(ops ...*v1.Operation) error { return err } - for _, sub := range o.curSubscribers() { - (*sub)(ops, OPERATION_UPDATED) - } + o.notify(ops, OPERATION_UPDATED) return nil } @@ -111,10 +128,7 @@ func (o *OpLog) Delete(opID ...int64) error { return err } - for _, sub := range o.curSubscribers() { - (*sub)(removedOps, OPERATION_DELETED) - } - + o.notify(removedOps, OPERATION_DELETED) return nil } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index da36e19f..acf97460 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "crypto/rand" "errors" "fmt" "io" @@ -11,7 +12,7 @@ import ( v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/config" - "github.com/garethgeorge/backrest/internal/logwriter" + "github.com/garethgeorge/backrest/internal/logstore" "github.com/garethgeorge/backrest/internal/metric" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator/logging" @@ -26,6 +27,8 @@ var ErrRepoNotFound = errors.New("repo not found") var ErrRepoInitializationFailed = errors.New("repo initialization failed") var ErrPlanNotFound = errors.New("plan not found") +const defaultTaskLogDuration = 14 * 24 * time.Hour + // Orchestrator is responsible for managing repos and backups. type Orchestrator struct { mu sync.Mutex @@ -33,7 +36,7 @@ type Orchestrator struct { OpLog *oplog.OpLog repoPool *resticRepoPool taskQueue *queue.TimePriorityQueue[stContainer] - logStore *logwriter.LogManager + logStore *logstore.LogStore // cancelNotify is a list of channels that are notified when a task should be cancelled. cancelNotify []chan int64 @@ -59,7 +62,7 @@ func (st stContainer) Less(other stContainer) bool { return st.ScheduledTask.Less(other.ScheduledTask) } -func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStore *logwriter.LogManager) (*Orchestrator, error) { +func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStore *logstore.LogStore) (*Orchestrator, error) { cfg = proto.Clone(cfg).(*v1.Config) // create the orchestrator. @@ -96,14 +99,6 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStor } for _, op := range incompleteOps { - // check for logs to finalize - if op.Logref != "" { - if frozenID, err := logStore.Finalize(op.Logref); err != nil { - zap.L().Warn("failed to finalize livelog ref for incomplete operation", zap.String("logref", op.Logref), zap.Error(err)) - } else { - op.Logref = frozenID - } - } op.Status = v1.OperationStatus_STATUS_ERROR op.DisplayMessage = "Operation was incomplete when orchestrator was restarted." op.UnixTimeEndMs = op.UnixTimeStartMs @@ -130,12 +125,6 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, log *oplog.OpLog, logStor } } - for _, id := range logStore.LiveLogIDs() { - if _, err := logStore.Finalize(id); err != nil { - zap.L().Warn("failed to finalize unassociated live log", zap.String("id", id), zap.Error(err)) - } - } - zap.L().Info("scrubbed operation log for incomplete operations", zap.Duration("duration", time.Since(startTime)), zap.Int("incomplete_ops", len(incompleteOps)), @@ -187,7 +176,7 @@ func (o *Orchestrator) ScheduleDefaultTasks(config *v1.Config) error { zap.L().Info("reset task queue, scheduling new task set", zap.String("timezone", time.Now().Location().String())) // Requeue tasks that are affected by the config change. - if err := o.ScheduleTask(tasks.NewCollectGarbageTask(), tasks.TaskPriorityDefault); err != nil { + if err := o.ScheduleTask(tasks.NewCollectGarbageTask(o.logStore), tasks.TaskPriorityDefault); err != nil { return fmt.Errorf("schedule collect garbage task: %w", err) } @@ -393,18 +382,23 @@ func (o *Orchestrator) Run(ctx context.Context) { func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) error { zap.L().Info("running task", zap.String("task", st.Task.Name()), zap.String("runAt", st.RunAt.Format(time.RFC3339))) - var liveLogID string var logWriter io.WriteCloser op := st.Op if op != nil { var err error - liveLogID, logWriter, err = o.logStore.NewLiveWriter(fmt.Sprintf("%x", op.GetId())) + + randBytes := make([]byte, 8) + if _, err := rand.Read(randBytes); err != nil { + panic(err) + } + logID := fmt.Sprintf("op%d-tasklog-%x", op.Id, randBytes) + logWriter, err = o.logStore.Create(logID, op.Id, defaultTaskLogDuration) if err != nil { zap.S().Errorf("failed to create live log writer: %v", err) } ctx = logging.ContextWithWriter(ctx, logWriter) - op.Logref = liveLogID // set the logref to the live log. + op.Logref = logID op.UnixTimeStartMs = time.Now().UnixMilli() if op.Status == v1.OperationStatus_STATUS_PENDING || op.Status == v1.OperationStatus_STATUS_UNKNOWN { op.Status = v1.OperationStatus_STATUS_INPROGRESS @@ -430,19 +424,14 @@ func (o *Orchestrator) RunTask(ctx context.Context, st tasks.ScheduledTask) erro metric.GetRegistry().RecordTaskRun(st.Task.RepoID(), st.Task.PlanID(), st.Task.Type(), time.Since(start).Seconds(), "success") } - if op != nil { - // write logs to log storage for this task. - if logWriter != nil { - if err := logWriter.Close(); err != nil { - zap.S().Errorf("failed to close live log writer: %v", err) - } - if finalID, err := o.logStore.Finalize(liveLogID); err != nil { - zap.S().Errorf("failed to finalize live log: %v", err) - } else { - op.Logref = finalID - } + // write logs to log storage for this task. + if logWriter != nil { + if err := logWriter.Close(); err != nil { + zap.S().Warnf("failed to close log writer for %q, logs may be partial: %v", st.Task.Name(), err) } + } + if op != nil { if err != nil { var taskCancelledError *tasks.TaskCancelledError var taskRetryError *tasks.TaskRetryError diff --git a/internal/orchestrator/taskrunnerimpl.go b/internal/orchestrator/taskrunnerimpl.go index 87fb73e7..8788cc69 100644 --- a/internal/orchestrator/taskrunnerimpl.go +++ b/internal/orchestrator/taskrunnerimpl.go @@ -3,7 +3,6 @@ package orchestrator import ( "context" "crypto/rand" - "encoding/hex" "errors" "fmt" "io" @@ -11,7 +10,6 @@ import ( v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/internal/hook" - "github.com/garethgeorge/backrest/internal/logwriter" "github.com/garethgeorge/backrest/internal/oplog" "github.com/garethgeorge/backrest/internal/orchestrator/logging" "github.com/garethgeorge/backrest/internal/orchestrator/repo" @@ -162,38 +160,12 @@ func (t *taskRunnerImpl) Logger(ctx context.Context) *zap.Logger { return logging.Logger(ctx, "[tasklog] ").Named(t.t.Name()) } -func (t *taskRunnerImpl) LogrefWriter() (string, tasks.LogrefWriter, error) { - id := make([]byte, 16) - if _, err := rand.Read(id); err != nil { - return "", nil, fmt.Errorf("read random: %w", err) +func (t *taskRunnerImpl) LogrefWriter() (string, io.WriteCloser, error) { + randBytes := make([]byte, 8) + if _, err := rand.Read(randBytes); err != nil { + return "", nil, err } - idStr := hex.EncodeToString(id) - liveID, writer, err := t.orchestrator.logStore.NewLiveWriter(idStr) - if err != nil { - return "", nil, fmt.Errorf("new log writer: %w", err) - } - return liveID, &logrefWriter{ - logmgr: t.orchestrator.logStore, - id: liveID, - writer: writer, - }, nil -} - -type logrefWriter struct { - logmgr *logwriter.LogManager - id string - writer io.WriteCloser -} - -var _ tasks.LogrefWriter = &logrefWriter{} - -func (l *logrefWriter) Write(p []byte) (n int, err error) { - return l.writer.Write(p) -} - -func (l *logrefWriter) Close() (string, error) { - if err := l.writer.Close(); err != nil { - return "", err - } - return l.logmgr.Finalize(l.id) + id := fmt.Sprintf("op%d-logref-%x", t.op.Id, randBytes) + writer, err := t.orchestrator.logStore.Create(id, t.op.GetId(), time.Duration(0)) + return id, writer, err } diff --git a/internal/orchestrator/tasks/task.go b/internal/orchestrator/tasks/task.go index b3488639..4020c66b 100644 --- a/internal/orchestrator/tasks/task.go +++ b/internal/orchestrator/tasks/task.go @@ -3,6 +3,7 @@ package tasks import ( "context" "errors" + "io" "testing" "time" @@ -53,12 +54,7 @@ type TaskRunner interface { // Logger returns the logger. Logger(ctx context.Context) *zap.Logger // LogrefWriter returns a writer that can be used to track streaming operation output. - LogrefWriter() (liveID string, w LogrefWriter, err error) -} - -type LogrefWriter interface { - Write(data []byte) (int, error) - Close() (frozenID string, err error) + LogrefWriter() (id string, w io.WriteCloser, err error) } type TaskExecutor interface { @@ -225,6 +221,6 @@ func (t *testTaskRunner) Logger(ctx context.Context) *zap.Logger { return zap.L() } -func (t *testTaskRunner) LogrefWriter() (liveID string, w LogrefWriter, err error) { +func (t *testTaskRunner) LogrefWriter() (id string, w io.WriteCloser, err error) { panic("not implemented") } diff --git a/internal/orchestrator/tasks/taskcheck.go b/internal/orchestrator/tasks/taskcheck.go index 753f6aee..a0e5752e 100644 --- a/internal/orchestrator/tasks/taskcheck.go +++ b/internal/orchestrator/tasks/taskcheck.go @@ -136,11 +136,9 @@ func (t *CheckTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner return fmt.Errorf("check: %w", err) } - frozenID, err := writer.Close() - if err != nil { + if err := writer.Close(); err != nil { return fmt.Errorf("close logref writer: %w", err) } - opCheck.OperationCheck.OutputLogref = frozenID if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{ v1.Hook_CONDITION_CHECK_SUCCESS, diff --git a/internal/orchestrator/tasks/taskcollectgarbage.go b/internal/orchestrator/tasks/taskcollectgarbage.go index 29e6573d..ff7af267 100644 --- a/internal/orchestrator/tasks/taskcollectgarbage.go +++ b/internal/orchestrator/tasks/taskcollectgarbage.go @@ -6,6 +6,7 @@ import ( "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" + "github.com/garethgeorge/backrest/internal/logstore" "github.com/garethgeorge/backrest/internal/oplog" "go.uber.org/zap" ) @@ -30,14 +31,16 @@ func gcAgeForOperation(op *v1.Operation) time.Duration { type CollectGarbageTask struct { BaseTask firstRun bool + logstore *logstore.LogStore } -func NewCollectGarbageTask() *CollectGarbageTask { +func NewCollectGarbageTask(logstore *logstore.LogStore) *CollectGarbageTask { return &CollectGarbageTask{ BaseTask: BaseTask{ TaskType: "collect_garbage", TaskName: "collect garbage", }, + logstore: logstore, } } @@ -82,9 +85,12 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error { return fmt.Errorf("identifying forgotten snapshots: %w", err) } + validIDs := make(map[int64]struct{}) forgetIDs := []int64{} curTime := curTimeMillis() if err := log.Query(oplog.SelectAll, func(op *v1.Operation) error { + validIDs[op.Id] = struct{}{} + forgot, ok := snapshotForgottenForFlow[op.FlowId] if !ok { // no snapshot associated with this flow; check if it's old enough to be gc'd @@ -103,9 +109,33 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error { if err := log.Delete(forgetIDs...); err != nil { return fmt.Errorf("removing gc eligible operations: %w", err) + } else if len(forgetIDs) > 0 { + for _, id := range forgetIDs { + delete(validIDs, id) + } } zap.L().Info("collecting garbage", zap.Any("operations_removed", len(forgetIDs))) + + // cleaning up logstore + toDelete := []string{} + if err := t.logstore.SelectAll(func(id string, parentID int64) { + if parentID == 0 { + return + } + if _, ok := validIDs[parentID]; !ok { + toDelete = append(toDelete, id) + } + }); err != nil { + return fmt.Errorf("selecting all logstore entries: %w", err) + } + for _, id := range toDelete { + if err := t.logstore.Delete(id); err != nil { + zap.L().Error("deleting logstore entry", zap.String("id", id), zap.Error(err)) + } + } + zap.L().Info("collecting garbage logs", zap.Any("logs_removed", len(toDelete))) + return nil } diff --git a/internal/orchestrator/tasks/taskprune.go b/internal/orchestrator/tasks/taskprune.go index 63eba4cd..b4d7d1ce 100644 --- a/internal/orchestrator/tasks/taskprune.go +++ b/internal/orchestrator/tasks/taskprune.go @@ -136,11 +136,9 @@ func (t *PruneTask) Run(ctx context.Context, st ScheduledTask, runner TaskRunner return fmt.Errorf("prune: %w", err) } - frozenID, err := writer.Close() - if err != nil { + if err := writer.Close(); err != nil { return fmt.Errorf("close logref writer: %w", err) } - opPrune.OperationPrune.OutputLogref = frozenID // Run a stats task after a successful prune if err := runner.ScheduleTask(NewStatsTask(t.RepoID(), PlanForSystemTasks, false), TaskPriorityStats); err != nil {