Skip to content

Commit

Permalink
feat(metastore): Implement listing data object for a given tenant and…
Browse files Browse the repository at this point in the history
… timerange (#16180)
  • Loading branch information
cyriltovena authored Feb 11, 2025
1 parent 9a356a1 commit fa790e1
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 2 deletions.
138 changes: 136 additions & 2 deletions pkg/dataobj/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"iter"
"sort"
"strconv"
"sync"
"time"

Expand All @@ -14,7 +16,9 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -192,8 +196,8 @@ func metastorePath(tenantID string, window time.Time) string {
}

func Iter(tenantID string, start, end time.Time) iter.Seq[string] {
minMetastoreWindow := start.Truncate(metastoreWindowSize)
maxMetastoreWindow := end.Truncate(metastoreWindowSize)
minMetastoreWindow := start.Truncate(metastoreWindowSize).UTC()
maxMetastoreWindow := end.Truncate(metastoreWindowSize).UTC()

return func(yield func(t string) bool) {
for metastoreWindow := minMetastoreWindow; !metastoreWindow.After(maxMetastoreWindow); metastoreWindow = metastoreWindow.Add(metastoreWindowSize) {
Expand All @@ -203,3 +207,133 @@ func Iter(tenantID string, start, end time.Time) iter.Seq[string] {
}
}
}

// ListDataObjects returns a list of all dataobj paths for the given tenant and time range.
func ListDataObjects(ctx context.Context, bucket objstore.Bucket, tenantID string, start, end time.Time) ([]string, error) {
// Get all metastore paths for the time range
var storePaths []string
for path := range Iter(tenantID, start, end) {
storePaths = append(storePaths, path)
}

// List objects from all stores concurrently
paths, err := listObjectsFromStores(ctx, bucket, storePaths, start, end)
if err != nil {
return nil, err
}

return paths, nil
}

// listObjectsFromStores concurrently lists objects from multiple metastore files
func listObjectsFromStores(ctx context.Context, bucket objstore.Bucket, storePaths []string, start, end time.Time) ([]string, error) {
objects := make([][]string, len(storePaths))
g, ctx := errgroup.WithContext(ctx)

for i, path := range storePaths {
g.Go(func() error {
var err error
objects[i], err = listObjects(ctx, bucket, path, start, end)
if err != nil {
return fmt.Errorf("listing objects from metastore %s: %w", path, err)
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

return dedupeAndSort(objects), nil
}

func listObjects(ctx context.Context, bucket objstore.Bucket, path string, start, end time.Time) ([]string, error) {
var buf bytes.Buffer
objectReader, err := bucket.Get(ctx, path)
if err != nil {
return nil, fmt.Errorf("getting metastore object: %w", err)
}
n, err := buf.ReadFrom(objectReader)
if err != nil {
return nil, fmt.Errorf("reading metastore object: %w", err)
}
object := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), n)
si, err := object.Metadata(ctx)
if err != nil {
return nil, fmt.Errorf("resolving object metadata: %w", err)
}

var objectPaths []string
streams := make([]dataobj.Stream, 1024)
for i := 0; i < si.StreamsSections; i++ {
streamsReader := dataobj.NewStreamsReader(object, i)
for {
n, err := streamsReader.Read(ctx, streams)
if err != nil && err != io.EOF {
return nil, fmt.Errorf("reading streams: %w", err)
}
if n == 0 {
break
}
for _, stream := range streams[:n] {
ok, objPath := objectOverlapsRange(stream.Labels, start, end)
if ok {
objectPaths = append(objectPaths, objPath)
}
}
}
}
return objectPaths, nil
}

// dedupeAndSort takes a slice of string slices and returns a sorted slice of unique strings
func dedupeAndSort(objects [][]string) []string {
uniquePaths := make(map[string]struct{})
for _, batch := range objects {
for _, path := range batch {
uniquePaths[path] = struct{}{}
}
}

paths := make([]string, 0, len(uniquePaths))
for path := range uniquePaths {
paths = append(paths, path)
}
sort.Strings(paths)
return paths
}

// objectOverlapsRange checks if an object's time range overlaps with the query range
func objectOverlapsRange(lbs labels.Labels, start, end time.Time) (bool, string) {
var (
objStart, objEnd time.Time
objPath string
)
for _, lb := range lbs {
if lb.Name == "__start__" {
tsNano, err := strconv.ParseInt(lb.Value, 10, 64)
if err != nil {
panic(err)
}
objStart = time.Unix(0, tsNano).UTC()
}
if lb.Name == "__end__" {
tsNano, err := strconv.ParseInt(lb.Value, 10, 64)
if err != nil {
panic(err)
}
objEnd = time.Unix(0, tsNano).UTC()
}
if lb.Name == "__path__" {
objPath = lb.Value
}
}
if objStart.IsZero() || objEnd.IsZero() {
return false, ""
}
if objStart.Before(start) || objEnd.After(end) {
return false, ""
}
return true, objPath
}
123 changes: 123 additions & 0 deletions pkg/dataobj/metastore/metastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,126 @@ func TestIter(t *testing.T) {
})
}
}

func TestDataObjectsPaths(t *testing.T) {
ctx := context.Background()
bucket := objstore.NewInMemBucket()
tenantID := "test-tenant"

m := NewManager(bucket, tenantID, log.NewNopLogger())

// Set limits for the test
m.backoff = backoff.New(context.TODO(), backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
MaxRetries: 3,
})

// Create test data spanning multiple metastore windows
now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC)

// Add files in different time windows spanning multiple 12h periods
testCases := []struct {
path string
startTime time.Time
endTime time.Time
}{
{
path: "path1",
startTime: now.Add(-1 * time.Hour),
endTime: now,
},
{
path: "path2",
startTime: now.Add(-30 * time.Minute),
endTime: now,
},
{
path: "path3",
startTime: now.Add(-13 * time.Hour), // Previous 12h window
endTime: now.Add(-12 * time.Hour),
},
{
path: "path4",
startTime: now.Add(-14 * time.Hour), // Previous 12h window
endTime: now.Add(-13 * time.Hour),
},
{
path: "path5",
startTime: now.Add(-25 * time.Hour), // Two windows back
endTime: now.Add(-24 * time.Hour),
},
{
path: "path6",
startTime: now.Add(-36 * time.Hour), // Three windows back
endTime: now.Add(-35 * time.Hour),
},
}

for _, tc := range testCases {
err := m.UpdateMetastore(ctx, tc.path, dataobj.FlushStats{
MinTimestamp: tc.startTime,
MaxTimestamp: tc.endTime,
})
require.NoError(t, err)
}

t.Run("finds objects within current window", func(t *testing.T) {
paths, err := ListDataObjects(ctx, bucket, tenantID, now.Add(-1*time.Hour), now)
require.NoError(t, err)
require.Len(t, paths, 2)
require.Contains(t, paths, "path1")
require.Contains(t, paths, "path2")
})

t.Run("finds objects across two 12h windows", func(t *testing.T) {
paths, err := ListDataObjects(ctx, bucket, tenantID, now.Add(-14*time.Hour), now)
require.NoError(t, err)
require.Len(t, paths, 4)
require.Contains(t, paths, "path1")
require.Contains(t, paths, "path2")
require.Contains(t, paths, "path3")
require.Contains(t, paths, "path4")
})

t.Run("finds objects across three 12h windows", func(t *testing.T) {
paths, err := ListDataObjects(ctx, bucket, tenantID, now.Add(-25*time.Hour), now)
require.NoError(t, err)
require.Len(t, paths, 5)
require.Contains(t, paths, "path1")
require.Contains(t, paths, "path2")
require.Contains(t, paths, "path3")
require.Contains(t, paths, "path4")
require.Contains(t, paths, "path5")
})

t.Run("finds all objects across all windows", func(t *testing.T) {
paths, err := ListDataObjects(ctx, bucket, tenantID, now.Add(-36*time.Hour), now)
require.NoError(t, err)
require.Len(t, paths, 6)
require.Contains(t, paths, "path1")
require.Contains(t, paths, "path2")
require.Contains(t, paths, "path3")
require.Contains(t, paths, "path4")
require.Contains(t, paths, "path5")
require.Contains(t, paths, "path6")
})

t.Run("returns empty list when no objects in range", func(t *testing.T) {
paths, err := ListDataObjects(ctx, bucket, tenantID, now.Add(1*time.Hour), now.Add(2*time.Hour))
require.NoError(t, err)
require.Empty(t, paths)
})

t.Run("finds half of objects with partial window overlap", func(t *testing.T) {
// Query starting from middle of first window to current time
paths, err := ListDataObjects(ctx, bucket, tenantID, now.Add(-30*time.Hour), now)
require.NoError(t, err)
require.Len(t, paths, 5) // Should exclude path6 which is before -30h
require.Contains(t, paths, "path1")
require.Contains(t, paths, "path2")
require.Contains(t, paths, "path3")
require.Contains(t, paths, "path4")
require.Contains(t, paths, "path5")
})
}

0 comments on commit fa790e1

Please sign in to comment.