Skip to content

Commit

Permalink
feat: use sqlite logstore (garethgeorge#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Oct 12, 2024
1 parent 5948c67 commit 4d557a1
Show file tree
Hide file tree
Showing 22 changed files with 1,078 additions and 894 deletions.
7 changes: 4 additions & 3 deletions cmd/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/garethgeorge/backrest/internal/auth"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/env"
"github.com/garethgeorge/backrest/internal/logwriter"
"github.com/garethgeorge/backrest/internal/logstore"
"github.com/garethgeorge/backrest/internal/metric"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/bboltstore"
Expand Down Expand Up @@ -82,10 +82,11 @@ func main() {
migrateBboltOplog(opstore)

// Create rotating log storage
logStore, err := logwriter.NewLogManager(path.Join(env.DataDir(), "rotatinglogs"), 14) // 14 days of logs
logStore, err := logstore.NewLogStore(filepath.Join(env.DataDir(), "tasklogs"))
if err != nil {
zap.S().Fatalf("error creating rotating log storage: %v", err)
zap.S().Fatalf("error creating task log store: %v", err)
}
logstore.MigrateTarLogsInDir(logStore, filepath.Join(env.DataDir(), "rotatinglogs"))

// Create orchestrator and start task loop.
orchestrator, err := orchestrator.NewOrchestrator(resticPath, cfg, log, logStore)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/getlantern/systray v1.2.2
github.com/gitploy-io/cronexpr v0.2.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/go-cmp v0.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/go-multierror v1.1.1
github.com/hectane/go-acl v0.0.0-20230122075934-ca0b05cb1adb
Expand Down
80 changes: 48 additions & 32 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"path"
"reflect"
Expand All @@ -18,7 +19,7 @@ import (
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/gen/go/v1/v1connect"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/logwriter"
"github.com/garethgeorge/backrest/internal/logstore"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/orchestrator"
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
Expand All @@ -36,12 +37,12 @@ type BackrestHandler struct {
config config.ConfigStore
orchestrator *orchestrator.Orchestrator
oplog *oplog.OpLog
logStore *logwriter.LogManager
logStore *logstore.LogStore
}

var _ v1connect.BackrestHandler = &BackrestHandler{}

func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *logwriter.LogManager) *BackrestHandler {
func NewBackrestHandler(config config.ConfigStore, orchestrator *orchestrator.Orchestrator, oplog *oplog.OpLog, logStore *logstore.LogStore) *BackrestHandler {
s := &BackrestHandler{
config: config,
orchestrator: orchestrator,
Expand Down Expand Up @@ -532,57 +533,72 @@ func (s *BackrestHandler) ClearHistory(ctx context.Context, req *connect.Request
}

func (s *BackrestHandler) GetLogs(ctx context.Context, req *connect.Request[v1.LogDataRequest], resp *connect.ServerStream[types.BytesValue]) error {
ch, err := s.logStore.Subscribe(req.Msg.Ref)
r, err := s.logStore.Open(req.Msg.Ref)
if err != nil {
if errors.Is(err, logwriter.ErrFileNotFound) {
if errors.Is(err, logstore.ErrLogNotFound) {
resp.Send(&types.BytesValue{
Value: []byte(fmt.Sprintf("file associated with log %v not found, it may have rotated out of the log history", req.Msg.GetRef())),
Value: []byte(fmt.Sprintf("file associated with log %v not found, it may have expired.", req.Msg.GetRef())),
})
return nil
}
return fmt.Errorf("get log data %v: %w", req.Msg.GetRef(), err)
}
defer s.logStore.Unsubscribe(req.Msg.Ref, ch)

doneCh := make(chan struct{})

var mu sync.Mutex
var buf bytes.Buffer
interval := time.NewTicker(250 * time.Millisecond)
defer interval.Stop()
go func() {
<-ctx.Done()
r.Close()
}()

var bufferMu sync.Mutex
var buffer bytes.Buffer
var errChan = make(chan error, 1)
go func() {
for data := range ch {
mu.Lock()
buf.Write(data)
mu.Unlock()
data := make([]byte, 4*1024)
for {
n, err := r.Read(data)
if n == 0 {
close(errChan)
break
} else if err != nil && err != io.EOF {
errChan <- fmt.Errorf("failed to read log data: %w", err)
close(errChan)
return
}
bufferMu.Lock()
buffer.Write(data[:n])
bufferMu.Unlock()
}
close(doneCh)
}()

flushHelper := func() error {
mu.Lock()
defer mu.Unlock()
if buf.Len() > 0 {
if err := resp.Send(&types.BytesValue{Value: buf.Bytes()}); err != nil {
return err
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

flush := func() error {
bufferMu.Lock()
if buffer.Len() > 0 {
if err := resp.Send(&types.BytesValue{Value: buffer.Bytes()}); err != nil {
bufferMu.Unlock()
return fmt.Errorf("failed to send log data: %w", err)
}
buf.Reset()
buffer.Reset()
}
bufferMu.Unlock()
return nil
}

for {
select {
case <-interval.C:
if err := flushHelper(); err != nil {
case <-ctx.Done():
return flush()
case err := <-errChan:
_ = flush()
return err
case <-ticker.C:
if err := flush(); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-doneCh:
return flushHelper()
}
}

}

func (s *BackrestHandler) GetDownloadURL(ctx context.Context, req *connect.Request[types.Int64Value]) (*connect.Response[types.StringValue], error) {
Expand Down
9 changes: 5 additions & 4 deletions internal/api/backresthandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/garethgeorge/backrest/gen/go/types"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/logwriter"
"github.com/garethgeorge/backrest/internal/logstore"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/bboltstore"
"github.com/garethgeorge/backrest/internal/orchestrator"
Expand Down Expand Up @@ -769,7 +769,7 @@ type systemUnderTest struct {
oplog *oplog.OpLog
opstore *bboltstore.BboltStore
orch *orchestrator.Orchestrator
logStore *logwriter.LogManager
logStore *logstore.LogStore
config *v1.Config
}

Expand All @@ -785,7 +785,7 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT
if err != nil {
t.Fatalf("Failed to find or install restic binary: %v", err)
}
opstore, err := bboltstore.NewBboltStore(dir + "/oplog.boltdb")
opstore, err := bboltstore.NewBboltStore(filepath.Join(dir, "oplog.bbolt"))
if err != nil {
t.Fatalf("Failed to create oplog store: %v", err)
}
Expand All @@ -794,10 +794,11 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT
if err != nil {
t.Fatalf("Failed to create oplog: %v", err)
}
logStore, err := logwriter.NewLogManager(dir+"/log", 10)
logStore, err := logstore.NewLogStore(filepath.Join(dir, "tasklogs"))
if err != nil {
t.Fatalf("Failed to create log store: %v", err)
}
t.Cleanup(func() { logStore.Close() })
orch, err := orchestrator.NewOrchestrator(
resticBin, cfg, oplog, logStore,
)
Expand Down
Loading

0 comments on commit 4d557a1

Please sign in to comment.