Skip to content

Commit

Permalink
feat(dataobj): cardinality estimation (#16233)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Feb 13, 2025
1 parent 35e2b4a commit 66889ec
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 133 deletions.
80 changes: 28 additions & 52 deletions pkg/dataobj/internal/dataset/column_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,19 @@ type BuilderOptions struct {
// CompressionOptions holds optional configuration for compression.
CompressionOptions CompressionOptions

// StatisticsOptions holds optional configuration for statistics.
Statistics StatisticsOptions
}

// StatisticsOptions customizes the collection of statistics for a column.
type StatisticsOptions struct {
// StoreRangeStats indicates whether to store value range statistics for the
// column and pages.
StoreRangeStats bool

// StoreCardinalityStats indicates whether to store cardinality estimations,
// facilitated by hyperloglog
StoreCardinalityStats bool
}

// CompressionOptions customizes the compressor used when building pages.
Expand All @@ -48,8 +58,9 @@ type ColumnBuilder struct {

rows int // Total number of rows in the column.

pages []*MemPage
builder *pageBuilder
pages []*MemPage
statsBuilder *columnStatsBuilder
pageBuilder *pageBuilder
}

// NewColumnBuilder creates a new ColumnBuilder from the optional name and
Expand All @@ -61,11 +72,17 @@ func NewColumnBuilder(name string, opts BuilderOptions) (*ColumnBuilder, error)
return nil, fmt.Errorf("creating page builder: %w", err)
}

statsBuilder, err := newColumnStatsBuilder(opts.Statistics)
if err != nil {
return nil, fmt.Errorf("creating stats builder: %w", err)
}

return &ColumnBuilder{
name: name,
opts: opts,

builder: builder,
pageBuilder: builder,
statsBuilder: statsBuilder,
}, nil
}

Expand All @@ -87,6 +104,7 @@ func (cb *ColumnBuilder) Append(row int, value Value) error {
for range 2 {
if cb.append(row, value) {
cb.rows = row + 1
cb.statsBuilder.Append(value)
return nil
}

Expand All @@ -107,7 +125,7 @@ func (cb *ColumnBuilder) EstimatedSize() int {
for _, p := range cb.pages {
size += p.Info.CompressedSize
}
size += cb.builder.EstimatedSize()
size += cb.pageBuilder.EstimatedSize()
return size
}

Expand All @@ -132,7 +150,7 @@ func (cb *ColumnBuilder) Backfill(row int) {

func (cb *ColumnBuilder) backfill(row int) bool {
for row > cb.rows {
if !cb.builder.AppendNull() {
if !cb.pageBuilder.AppendNull() {
return false
}
cb.rows++
Expand All @@ -146,7 +164,7 @@ func (cb *ColumnBuilder) append(row int, value Value) bool {
if !cb.backfill(row) {
return false
}
return cb.builder.Append(value)
return cb.pageBuilder.Append(value)
}

// Flush converts data in cb into a [MemColumn]. Afterwards, cb is reset to a
Expand All @@ -159,7 +177,7 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
Type: cb.opts.Value,

Compression: cb.opts.Compression,
Statistics: cb.buildStats(),
Statistics: cb.statsBuilder.Flush(cb.pages),
}

for _, page := range cb.pages {
Expand All @@ -178,54 +196,12 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
return column, nil
}

func (cb *ColumnBuilder) buildStats() *datasetmd.Statistics {
if !cb.opts.StoreRangeStats {
return nil
}

var stats datasetmd.Statistics

var minValue, maxValue Value

for i, page := range cb.pages {
if page.Info.Stats == nil {
// This should never hit; if cb.opts.StoreRangeStats is true, then
// page.Info.Stats will be populated.
panic("ColumnBuilder.buildStats: page missing stats")
}

var pageMin, pageMax Value

if err := pageMin.UnmarshalBinary(page.Info.Stats.MinValue); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal min value: %s", err))
} else if err := pageMax.UnmarshalBinary(page.Info.Stats.MaxValue); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal max value: %s", err))
}

if i == 0 || CompareValues(pageMin, minValue) < 0 {
minValue = pageMin
}
if i == 0 || CompareValues(pageMax, maxValue) > 0 {
maxValue = pageMax
}
}

var err error
if stats.MinValue, err = minValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal min value: %s", err))
}
if stats.MaxValue, err = maxValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal max value: %s", err))
}
return &stats
}

func (cb *ColumnBuilder) flushPage() {
if cb.builder.Rows() == 0 {
if cb.pageBuilder.Rows() == 0 {
return
}

page, err := cb.builder.Flush()
page, err := cb.pageBuilder.Flush()
if err != nil {
// Flush should only return an error when it's empty, which we already
// ensure it's not in the lines above.
Expand All @@ -238,5 +214,5 @@ func (cb *ColumnBuilder) flushPage() {
func (cb *ColumnBuilder) Reset() {
cb.rows = 0
cb.pages = nil
cb.builder.Reset()
cb.pageBuilder.Reset()
}
115 changes: 115 additions & 0 deletions pkg/dataobj/internal/dataset/column_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package dataset

import (
"fmt"

"github.com/axiomhq/hyperloglog"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)

// NB: https://engineering.fb.com/2018/12/13/data-infrastructure/hyperloglog/
// Standard error (SE) = 1.04/sqrt(2^n_registers)
// so
//
// 65% of estimates will be within ±1SE of true value
// 95% of estimates will be within ±2SE of true value
// 99% of estimates will be within ±3SE of true value
//
// e.g. given 2^12 registers
// SE = 1.04/sqrt(2^12) = 0.01625
// 65% of estimates will be within ±1SE of true value (1.625%)
// 95% of estimates will be within ±2SE of true value (3.25%)
// 99% of estimates will be within ±3SE of true value (4.875%)
// and with 8-bit registers, this is 2^12 = 4KB size.
func newHyperLogLog() (*hyperloglog.Sketch, error) {
return hyperloglog.NewSketch(12, true)
}

type columnStatsBuilder struct {
opts StatisticsOptions

// for cardinality
hll *hyperloglog.Sketch
}

// ColumnStatsBuilder is for column-level statistics
func newColumnStatsBuilder(opts StatisticsOptions) (*columnStatsBuilder, error) {
result := &columnStatsBuilder{
opts: opts,
}

if opts.StoreCardinalityStats {
var err error
if result.hll, err = newHyperLogLog(); err != nil {
return nil, fmt.Errorf("failed to create hll: %w", err)
}
}

return result, nil
}

func (csb *columnStatsBuilder) Append(value Value) {
if csb.opts.StoreCardinalityStats && !value.IsNil() && !value.IsZero() {
buf, err := value.MarshalBinary()
if err != nil {
panic(fmt.Sprintf(
"failed to marshal value for cardinality stats of type %s: %s",
value.Type(), err,
))
}

// TODO(owen-d): improve efficiency, ideally we don't need to marshal
// into an intermediate buffer.
csb.hll.Insert(buf)
}
}

// Flush builds the column-level stats both from the given pages and any internal
// state
func (csb *columnStatsBuilder) Flush(pages []*MemPage) *datasetmd.Statistics {
var dst datasetmd.Statistics
if csb.opts.StoreCardinalityStats {
dst.CardinalityCount = csb.hll.Estimate()
}
if csb.opts.StoreRangeStats {
csb.buildRangeStats(pages, &dst)
}

return &dst
}

func (csb *columnStatsBuilder) buildRangeStats(pages []*MemPage, dst *datasetmd.Statistics) {
var minValue, maxValue Value

for i, page := range pages {
if page.Info.Stats == nil {
// This should never hit; if cb.opts.StoreRangeStats is true, then
// page.Info.Stats will be populated.
panic("ColumnStatsBuilder.buildStats: page missing stats")
}

var pageMin, pageMax Value

if err := pageMin.UnmarshalBinary(page.Info.Stats.MinValue); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to unmarshal min value: %s", err))
} else if err := pageMax.UnmarshalBinary(page.Info.Stats.MaxValue); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to unmarshal max value: %s", err))
}

if i == 0 || CompareValues(pageMin, minValue) < 0 {
minValue = pageMin
}
if i == 0 || CompareValues(pageMax, maxValue) > 0 {
maxValue = pageMax
}
}

var err error
if dst.MinValue, err = minValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to marshal min value: %s", err))
}
if dst.MaxValue, err = maxValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to marshal max value: %s", err))
}
}
54 changes: 53 additions & 1 deletion pkg/dataobj/internal/dataset/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TestColumnBuilder_MinMax(t *testing.T) {
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,

StoreRangeStats: true,
Statistics: StatisticsOptions{
StoreRangeStats: true,
},
}
b, err := NewColumnBuilder("", opts)
require.NoError(t, err)
Expand Down Expand Up @@ -132,6 +134,56 @@ func TestColumnBuilder_MinMax(t *testing.T) {
require.Equal(t, fString, page1Max.String())
}

func TestColumnBuilder_Cardinality(t *testing.T) {
var (
// We include the null string in the test to ensure that it's never
// considered in min/max ranges.
nullString = ""

aString = strings.Repeat("a", 100)
bString = strings.Repeat("b", 100)
cString = strings.Repeat("c", 100)
)

// We store nulls and duplicates (should not be counted in cardinality count)
in := []string{
nullString,

aString,

bString,
bString,
bString,

cString,
}

opts := BuilderOptions{
PageSizeHint: 301, // Slightly larger than the string length of 3 strings per page.
Value: datasetmd.VALUE_TYPE_STRING,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,

Statistics: StatisticsOptions{
StoreCardinalityStats: true,
},
}
b, err := NewColumnBuilder("", opts)
require.NoError(t, err)

for i, s := range in {
require.NoError(t, b.Append(i, StringValue(s)))
}

col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.NotNil(t, col.Info.Statistics)
// we use sparse hyperloglog reprs until a certain cardinality is reached,
// so this should not be approximate at low counts.
require.Equal(t, uint64(3), col.Info.Statistics.CardinalityCount)
}

func getMinMax(t *testing.T, stats *datasetmd.Statistics) (min, max Value) {
t.Helper()
require.NotNil(t, stats)
Expand Down
Loading

0 comments on commit 66889ec

Please sign in to comment.