Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dataobj): cardinality estimation #16233

Merged
merged 11 commits into from
Feb 13, 2025
80 changes: 28 additions & 52 deletions pkg/dataobj/internal/dataset/column_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,19 @@ type BuilderOptions struct {
// CompressionOptions holds optional configuration for compression.
CompressionOptions CompressionOptions

// StatisticsOptions holds optional configuration for statistics.
Statistics StatisticsOptions
}

// StatisticsOptions customizes the collection of statistics for a column.
type StatisticsOptions struct {
// StoreRangeStats indicates whether to store value range statistics for the
// column and pages.
StoreRangeStats bool

// StoreCardinalityStats indicates whether to store cardinality estimations,
// facilitated by hyperloglog
StoreCardinalityStats bool
}

// CompressionOptions customizes the compressor used when building pages.
Expand All @@ -48,8 +58,9 @@ type ColumnBuilder struct {

rows int // Total number of rows in the column.

pages []*MemPage
builder *pageBuilder
pages []*MemPage
statsBuilder *columnStatsBuilder
pageBuilder *pageBuilder
}

// NewColumnBuilder creates a new ColumnBuilder from the optional name and
Expand All @@ -61,11 +72,17 @@ func NewColumnBuilder(name string, opts BuilderOptions) (*ColumnBuilder, error)
return nil, fmt.Errorf("creating page builder: %w", err)
}

statsBuilder, err := newColumnStatsBuilder(opts.Statistics)
if err != nil {
return nil, fmt.Errorf("creating stats builder: %w", err)
}

return &ColumnBuilder{
name: name,
opts: opts,

builder: builder,
pageBuilder: builder,
statsBuilder: statsBuilder,
}, nil
}

Expand All @@ -87,6 +104,7 @@ func (cb *ColumnBuilder) Append(row int, value Value) error {
for range 2 {
if cb.append(row, value) {
cb.rows = row + 1
cb.statsBuilder.Append(value)
return nil
}

Expand All @@ -107,7 +125,7 @@ func (cb *ColumnBuilder) EstimatedSize() int {
for _, p := range cb.pages {
size += p.Info.CompressedSize
}
size += cb.builder.EstimatedSize()
size += cb.pageBuilder.EstimatedSize()
return size
}

Expand All @@ -132,7 +150,7 @@ func (cb *ColumnBuilder) Backfill(row int) {

func (cb *ColumnBuilder) backfill(row int) bool {
for row > cb.rows {
if !cb.builder.AppendNull() {
if !cb.pageBuilder.AppendNull() {
return false
}
cb.rows++
Expand All @@ -146,7 +164,7 @@ func (cb *ColumnBuilder) append(row int, value Value) bool {
if !cb.backfill(row) {
return false
}
return cb.builder.Append(value)
return cb.pageBuilder.Append(value)
}

// Flush converts data in cb into a [MemColumn]. Afterwards, cb is reset to a
Expand All @@ -159,7 +177,7 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
Type: cb.opts.Value,

Compression: cb.opts.Compression,
Statistics: cb.buildStats(),
Statistics: cb.statsBuilder.Flush(cb.pages),
}

for _, page := range cb.pages {
Expand All @@ -178,54 +196,12 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
return column, nil
}

func (cb *ColumnBuilder) buildStats() *datasetmd.Statistics {
if !cb.opts.StoreRangeStats {
return nil
}

var stats datasetmd.Statistics

var minValue, maxValue Value

for i, page := range cb.pages {
if page.Info.Stats == nil {
// This should never hit; if cb.opts.StoreRangeStats is true, then
// page.Info.Stats will be populated.
panic("ColumnBuilder.buildStats: page missing stats")
}

var pageMin, pageMax Value

if err := pageMin.UnmarshalBinary(page.Info.Stats.MinValue); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal min value: %s", err))
} else if err := pageMax.UnmarshalBinary(page.Info.Stats.MaxValue); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to unmarshal max value: %s", err))
}

if i == 0 || CompareValues(pageMin, minValue) < 0 {
minValue = pageMin
}
if i == 0 || CompareValues(pageMax, maxValue) > 0 {
maxValue = pageMax
}
}

var err error
if stats.MinValue, err = minValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal min value: %s", err))
}
if stats.MaxValue, err = maxValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnBuilder.buildStats: failed to marshal max value: %s", err))
}
return &stats
}

func (cb *ColumnBuilder) flushPage() {
if cb.builder.Rows() == 0 {
if cb.pageBuilder.Rows() == 0 {
return
}

page, err := cb.builder.Flush()
page, err := cb.pageBuilder.Flush()
if err != nil {
// Flush should only return an error when it's empty, which we already
// ensure it's not in the lines above.
Expand All @@ -238,5 +214,5 @@ func (cb *ColumnBuilder) flushPage() {
func (cb *ColumnBuilder) Reset() {
cb.rows = 0
cb.pages = nil
cb.builder.Reset()
cb.pageBuilder.Reset()
}
115 changes: 115 additions & 0 deletions pkg/dataobj/internal/dataset/column_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package dataset

import (
"fmt"

"github.com/axiomhq/hyperloglog"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)

// NB: https://engineering.fb.com/2018/12/13/data-infrastructure/hyperloglog/
// Standard error (SE) = 1.04/sqrt(2^n_registers)
// so
//
// 65% of estimates will be within ±1SE of true value
// 95% of estimates will be within ±2SE of true value
// 99% of estimates will be within ±3SE of true value
//
// e.g. given 2^12 registers
// SE = 1.04/sqrt(2^12) = 0.01625
// 65% of estimates will be within ±1SE of true value (1.625%)
// 95% of estimates will be within ±2SE of true value (3.25%)
// 99% of estimates will be within ±3SE of true value (4.875%)
// and with 8-bit registers, this is 2^12 = 4KB size.
func newHyperLogLog() (*hyperloglog.Sketch, error) {
return hyperloglog.NewSketch(12, true)
}

type columnStatsBuilder struct {
opts StatisticsOptions

// for cardinality
hll *hyperloglog.Sketch
}

// ColumnStatsBuilder is for column-level statistics
func newColumnStatsBuilder(opts StatisticsOptions) (*columnStatsBuilder, error) {
result := &columnStatsBuilder{
opts: opts,
}

if opts.StoreCardinalityStats {
var err error
if result.hll, err = newHyperLogLog(); err != nil {
return nil, fmt.Errorf("failed to create hll: %w", err)
}
}

return result, nil
}

func (csb *columnStatsBuilder) Append(value Value) {
if csb.opts.StoreCardinalityStats && !value.IsNil() && !value.IsZero() {
buf, err := value.MarshalBinary()
if err != nil {
panic(fmt.Sprintf(
"failed to marshal value for cardinality stats of type %s: %s",
value.Type(), err,
))
}

// TODO(owen-d): improve efficiency, ideally we don't need to marshal
// into an intermediate buffer.
csb.hll.Insert(buf)
}
}

// Flush builds the column-level stats both from the given pages and any internal
// state
func (csb *columnStatsBuilder) Flush(pages []*MemPage) *datasetmd.Statistics {
var dst datasetmd.Statistics
if csb.opts.StoreCardinalityStats {
dst.CardinalityCount = csb.hll.Estimate()
}
if csb.opts.StoreRangeStats {
csb.buildRangeStats(pages, &dst)
}

return &dst
}

func (csb *columnStatsBuilder) buildRangeStats(pages []*MemPage, dst *datasetmd.Statistics) {
var minValue, maxValue Value

for i, page := range pages {
if page.Info.Stats == nil {
// This should never hit; if cb.opts.StoreRangeStats is true, then
// page.Info.Stats will be populated.
panic("ColumnStatsBuilder.buildStats: page missing stats")
}

var pageMin, pageMax Value

if err := pageMin.UnmarshalBinary(page.Info.Stats.MinValue); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to unmarshal min value: %s", err))
} else if err := pageMax.UnmarshalBinary(page.Info.Stats.MaxValue); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to unmarshal max value: %s", err))
}

if i == 0 || CompareValues(pageMin, minValue) < 0 {
minValue = pageMin
}
if i == 0 || CompareValues(pageMax, maxValue) > 0 {
maxValue = pageMax
}
}

var err error
if dst.MinValue, err = minValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to marshal min value: %s", err))
}
if dst.MaxValue, err = maxValue.MarshalBinary(); err != nil {
panic(fmt.Sprintf("ColumnStatsBuilder.buildStats: failed to marshal max value: %s", err))
}
}
54 changes: 53 additions & 1 deletion pkg/dataobj/internal/dataset/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TestColumnBuilder_MinMax(t *testing.T) {
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,

StoreRangeStats: true,
Statistics: StatisticsOptions{
StoreRangeStats: true,
},
}
b, err := NewColumnBuilder("", opts)
require.NoError(t, err)
Expand Down Expand Up @@ -132,6 +134,56 @@ func TestColumnBuilder_MinMax(t *testing.T) {
require.Equal(t, fString, page1Max.String())
}

func TestColumnBuilder_Cardinality(t *testing.T) {
var (
// We include the null string in the test to ensure that it's never
// considered in min/max ranges.
nullString = ""

aString = strings.Repeat("a", 100)
bString = strings.Repeat("b", 100)
cString = strings.Repeat("c", 100)
)

// We store nulls and duplicates (should not be counted in cardinality count)
in := []string{
nullString,

aString,

bString,
bString,
bString,

cString,
}

opts := BuilderOptions{
PageSizeHint: 301, // Slightly larger than the string length of 3 strings per page.
Value: datasetmd.VALUE_TYPE_STRING,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,

Statistics: StatisticsOptions{
StoreCardinalityStats: true,
},
}
b, err := NewColumnBuilder("", opts)
require.NoError(t, err)

for i, s := range in {
require.NoError(t, b.Append(i, StringValue(s)))
}

col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.NotNil(t, col.Info.Statistics)
// we use sparse hyperloglog reprs until a certain cardinality is reached,
// so this should not be approximate at low counts.
require.Equal(t, uint64(3), col.Info.Statistics.CardinalityCount)
}

func getMinMax(t *testing.T, stats *datasetmd.Statistics) (min, max Value) {
t.Helper()
require.NotNil(t, stats)
Expand Down
Loading
Loading