diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index c4ca9a5a3574a..6270711bc99d3 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -20,7 +20,7 @@ func TestMappingEquivalence(t *testing.T) { shards = 3 nStreams = 60 rounds = 20 - streams = randomStreams(nStreams, rounds, shards, []string{"a", "b", "c", "d"}) + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}) start = time.Unix(0, 0) end = time.Unix(0, int64(time.Second*time.Duration(rounds))) step = time.Second diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index f4eacb9d2659a..8430355205314 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -378,7 +378,7 @@ func (t *Loki) setupModuleManager() error { QueryFrontend: {Server, Overrides, TenantConfigs}, Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, TableManager: {Server}, - Compactor: {Server}, + Compactor: {Server, Overrides}, IngesterQuerier: {Ring}, All: {Querier, Ingester, Distributor, TableManager, Ruler}, } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 241d521e1a7f3..344926ce560ea 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -580,8 +580,11 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { } func (t *Loki) initCompactor() (services.Service, error) { - var err error - t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, prometheus.DefaultRegisterer) + err := t.Cfg.SchemaConfig.Load() + if err != nil { + return nil, err + } + t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, t.Cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/loki/runtime_config.go b/pkg/loki/runtime_config.go index 8ffa5098b9738..a79c23f6edc81 100644 --- a/pkg/loki/runtime_config.go +++ b/pkg/loki/runtime_config.go @@ -1,6 +1,7 @@ package loki import ( + "fmt" "io" "github.com/cortexproject/cortex/pkg/ring/kv" @@ -21,15 +22,26 @@ type runtimeConfigValues struct { Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` } +func (r runtimeConfigValues) validate() error { + for t, c := range r.TenantLimits { + if err := c.Validate(); err != nil { + return fmt.Errorf("invalid override for tenant %s: %w", t, err) + } + } + return nil +} + func loadRuntimeConfig(r io.Reader) (interface{}, error) { - var overrides = &runtimeConfigValues{} + overrides := &runtimeConfigValues{} decoder := yaml.NewDecoder(r) decoder.SetStrict(true) if err := decoder.Decode(&overrides); err != nil { return nil, err } - + if err := overrides.validate(); err != nil { + return nil, err + } return overrides, nil } diff --git a/pkg/loki/runtime_config_test.go b/pkg/loki/runtime_config_test.go new file mode 100644 index 0000000000000..0ed35de32ffea --- /dev/null +++ b/pkg/loki/runtime_config_test.go @@ -0,0 +1,117 @@ +package loki + +import ( + "context" + "flag" + "io" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/validation" +) + +func Test_LoadRetentionRules(t *testing.T) { + overrides := newTestOverrides(t, + ` +overrides: + "1": + creation_grace_period: 48h + "29": + creation_grace_period: 48h + ingestion_burst_size_mb: 140 + ingestion_rate_mb: 120 + max_concurrent_tail_requests: 1000 + max_global_streams_per_user: 100000 + max_label_names_per_series: 30 + max_query_parallelism: 256 + split_queries_by_interval: 15m + retention_period: 1440h + retention_stream: + - selector: '{app="foo"}' + period: 48h + priority: 10 + - selector: '{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}' + period: 24h + priority: 5 +`) + require.Equal(t, 31*24*time.Hour, overrides.RetentionPeriod("1")) // default + require.Equal(t, 2*30*24*time.Hour, overrides.RetentionPeriod("29")) // overrides + require.Equal(t, []validation.StreamRetention(nil), overrides.StreamRetention("1")) + require.Equal(t, []validation.StreamRetention{ + {Period: 48 * time.Hour, Priority: 10, Selector: `{app="foo"}`, Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + }}, + {Period: 24 * time.Hour, Priority: 5, Selector: `{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}`, Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "namespace", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "cluster", "fo.*|b.+|[1-2]"), + }}, + }, overrides.StreamRetention("29")) +} + +func Test_ValidateRules(t *testing.T) { + _, err := loadRuntimeConfig(strings.NewReader( + ` +overrides: + "29": + retention_stream: + - selector: '{app=foo"}' + period: 48h + priority: 10 + - selector: '{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}' + period: 24h + priority: 10 +`)) + require.Equal(t, "invalid override for tenant 29: invalid labels matchers: parse error at line 1, col 6: syntax error: unexpected IDENTIFIER, expecting STRING", err.Error()) + _, err = loadRuntimeConfig(strings.NewReader( + ` +overrides: + "29": + retention_stream: + - selector: '{app="foo"}' + period: 5h + priority: 10 +`)) + require.Equal(t, "invalid override for tenant 29: retention period must be >= 24h was 5h0m0s", err.Error()) +} + +func newTestOverrides(t *testing.T, yaml string) *validation.Overrides { + t.Helper() + f, err := ioutil.TempFile(t.TempDir(), "bar") + require.NoError(t, err) + path := f.Name() + // fake loader to load from string instead of file. + loader := func(_ io.Reader) (interface{}, error) { + return loadRuntimeConfig(strings.NewReader(yaml)) + } + cfg := runtimeconfig.ManagerConfig{ + ReloadPeriod: 1 * time.Second, + Loader: loader, + LoadPath: path, + } + flagset := flag.NewFlagSet("", flag.PanicOnError) + var defaults validation.Limits + defaults.RegisterFlags(flagset) + require.NoError(t, flagset.Parse(nil)) + validation.SetDefaultLimitsForYAMLUnmarshalling(defaults) + + runtimeConfig, err := runtimeconfig.NewRuntimeConfigManager(cfg, prometheus.DefaultRegisterer) + require.NoError(t, err) + + require.NoError(t, runtimeConfig.StartAsync(context.Background())) + require.NoError(t, runtimeConfig.AwaitRunning(context.Background())) + defer func() { + runtimeConfig.StopAsync() + require.NoError(t, runtimeConfig.AwaitTerminated(context.Background())) + }() + + overrides, err := validation.NewOverrides(defaults, tenantLimitsFromRuntimeConfig(runtimeConfig)) + require.NoError(t, err) + return overrides +} diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 94828f4ed2994..f7a7b5dd94515 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -7,6 +7,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -17,17 +18,24 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + loki_storage "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/util" + errUtil "github.com/grafana/loki/pkg/util" ) const delimiter = "/" type Config struct { - WorkingDirectory string `yaml:"working_directory"` - SharedStoreType string `yaml:"shared_store"` - SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` - CompactionInterval time.Duration `yaml:"compaction_interval"` + WorkingDirectory string `yaml:"working_directory"` + SharedStoreType string `yaml:"shared_store"` + SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + RetentionEnabled bool `yaml:"retention_enabled"` + RetentionInterval time.Duration `yaml:"retention_interval"` + RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` + RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` } // RegisterFlags registers flags. @@ -36,6 +44,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.") + f.DurationVar(&cfg.RetentionInterval, "boltdb.shipper.compactor.retention-interval", 10*time.Minute, "Interval at which to re-run the retention operation.") + f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.") + f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") + f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") } func (cfg *Config) IsDefaults() bool { @@ -53,11 +65,13 @@ type Compactor struct { cfg Config objectClient chunk.ObjectClient + tableMarker *retention.Marker + sweeper *retention.Sweeper metrics *metrics } -func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registerer) (*Compactor, error) { +func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { if cfg.IsDefaults() { return nil, errors.New("Must specify compactor config") } @@ -71,11 +85,24 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe if err != nil { return nil, err } + prefixedClient := util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix) + retentionWorkDir := filepath.Join(cfg.WorkingDirectory, "retention") + + sweeper, err := retention.NewSweeper(retentionWorkDir, retention.NewDeleteClient(objectClient), cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r) + if err != nil { + return nil, err + } + marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, prefixedClient, retention.NewExpirationChecker(limits), r) + if err != nil { + return nil, err + } compactor := Compactor{ cfg: cfg, - objectClient: util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix), + objectClient: prefixedClient, metrics: newMetrics(r), + tableMarker: marker, + sweeper: sweeper, } compactor.Service = services.NewBasicService(nil, compactor.loop, nil) @@ -84,28 +111,68 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe func (c *Compactor) loop(ctx context.Context) error { runCompaction := func() { - err := c.Run(ctx) + err := c.RunCompaction(ctx) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) } } + runRetention := func() { + err := c.RunRetention(ctx) + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to run retention", "err", err) + } + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runCompaction() - runCompaction() - - ticker := time.NewTicker(c.cfg.CompactionInterval) - defer ticker.Stop() + ticker := time.NewTicker(c.cfg.CompactionInterval) + defer ticker.Stop() - for { - select { - case <-ticker.C: - runCompaction() - case <-ctx.Done(): - return nil + for { + select { + case <-ticker.C: + runCompaction() + case <-ctx.Done(): + return + } } + }() + if c.cfg.RetentionEnabled { + wg.Add(2) + go func() { + // starts the chunk sweeper + defer func() { + c.sweeper.Stop() + wg.Done() + }() + c.sweeper.Start() + <-ctx.Done() + }() + go func() { + // start the index marker + defer wg.Done() + ticker := time.NewTicker(c.cfg.RetentionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + runRetention() + case <-ctx.Done(): + return + } + } + }() } + + wg.Wait() + return nil } -func (c *Compactor) Run(ctx context.Context) error { +func (c *Compactor) RunCompaction(ctx context.Context) error { status := statusSuccess start := time.Now() @@ -152,3 +219,40 @@ func (c *Compactor) Run(ctx context.Context) error { return nil } + +func (c *Compactor) RunRetention(ctx context.Context) error { + status := statusSuccess + start := time.Now() + + defer func() { + level.Debug(util_log.Logger).Log("msg", "finished to processing retention on all tables", "status", status, "duration", time.Since(start)) + c.metrics.retentionOperationTotal.WithLabelValues(status).Inc() + if status == statusSuccess { + c.metrics.retentionOperationDurationSeconds.Set(time.Since(start).Seconds()) + c.metrics.retentionOperationLastSuccess.SetToCurrentTime() + } + }() + level.Debug(util_log.Logger).Log("msg", "starting to processing retention on all all tables") + + _, dirs, err := c.objectClient.List(ctx, "", delimiter) + if err != nil { + status = statusFailure + return err + } + + tables := make([]string, len(dirs)) + for i, dir := range dirs { + tables[i] = strings.TrimSuffix(string(dir), delimiter) + } + + var errs errUtil.MultiError + + for _, tableName := range tables { + if err := c.tableMarker.MarkForDelete(ctx, tableName); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to mark table for deletes", "table", tableName, "err", err) + errs.Add(err) + status = statusFailure + } + } + return errs.Err() +} diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index f291a0fff7d21..77d79fdc2232a 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -18,8 +18,11 @@ func TestIsDefaults(t *testing.T) { }, false}, {&Config{}, false}, {&Config{ - SharedStoreKeyPrefix: "index/", - CompactionInterval: 2 * time.Hour, + SharedStoreKeyPrefix: "index/", + CompactionInterval: 2 * time.Hour, + RetentionInterval: 10 * time.Minute, + RetentionDeleteDelay: 2 * time.Hour, + RetentionDeleteWorkCount: 150, }, true}, } { t.Run(fmt.Sprint(i), func(t *testing.T) { diff --git a/pkg/storage/stores/shipper/compactor/metrics.go b/pkg/storage/stores/shipper/compactor/metrics.go index fdb304b7897b9..96b4a2aff7cc3 100644 --- a/pkg/storage/stores/shipper/compactor/metrics.go +++ b/pkg/storage/stores/shipper/compactor/metrics.go @@ -14,6 +14,10 @@ type metrics struct { compactTablesOperationTotal *prometheus.CounterVec compactTablesOperationDurationSeconds prometheus.Gauge compactTablesOperationLastSuccess prometheus.Gauge + + retentionOperationTotal *prometheus.CounterVec + retentionOperationDurationSeconds prometheus.Gauge + retentionOperationLastSuccess prometheus.Gauge } func newMetrics(r prometheus.Registerer) *metrics { @@ -33,6 +37,22 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "compact_tables_operation_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run", }), + + retentionOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_operation_total", + Help: "Total number of retention applied by status", + }, []string{"status"}), + retentionOperationDurationSeconds: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_operation_duration_seconds", + Help: "Time (in seconds) spent in applying retention for all the tables", + }), + retentionOperationLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_operation_last_successful_run_timestamp_seconds", + Help: "Unix timestamp of the last successful retention run", + }), } return &m diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go new file mode 100644 index 0000000000000..a334de4965e43 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -0,0 +1,64 @@ +package retention + +import ( + "time" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/validation" +) + +type ExpirationChecker interface { + Expired(ref ChunkEntry) bool +} + +type expirationChecker struct { + limits Limits +} + +type Limits interface { + RetentionPeriod(userID string) time.Duration + StreamRetention(userID string) []validation.StreamRetention +} + +func NewExpirationChecker(limits Limits) ExpirationChecker { + return &expirationChecker{ + limits: limits, + } +} + +// Expired tells if a ref chunk is expired based on retention rules. +func (e *expirationChecker) Expired(ref ChunkEntry) bool { + userID := unsafeGetString(ref.UserID) + streamRetentions := e.limits.StreamRetention(userID) + globalRetention := e.limits.RetentionPeriod(userID) + var ( + matchedRule validation.StreamRetention + found bool + ) +Outer: + for _, streamRetention := range streamRetentions { + for _, m := range streamRetention.Matchers { + if !m.Matches(ref.Labels.Get(m.Name)) { + continue Outer + } + } + // the rule is matched. + if found { + // if the current matched rule has a higher priority we keep it. + if matchedRule.Priority > streamRetention.Priority { + continue + } + // if priority is equal we keep the lowest retention. + if matchedRule.Priority == streamRetention.Priority && matchedRule.Period <= streamRetention.Period { + continue + } + } + found = true + matchedRule = streamRetention + } + if found { + return model.Now().Sub(ref.Through) > matchedRule.Period + } + return model.Now().Sub(ref.Through) > globalRetention +} diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go new file mode 100644 index 0000000000000..cb1887a97b6ef --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -0,0 +1,61 @@ +package retention + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/validation" +) + +type fakeLimits struct { + perTenant map[string]time.Duration + perStream map[string][]validation.StreamRetention +} + +func (f fakeLimits) RetentionPeriod(userID string) time.Duration { + return f.perTenant[userID] +} + +func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention { + return f.perStream[userID] +} + +func Test_expirationChecker_Expired(t *testing.T) { + e := NewExpirationChecker(&fakeLimits{ + perTenant: map[string]time.Duration{ + "1": time.Hour, + "2": 24 * time.Hour, + }, + perStream: map[string][]validation.StreamRetention{ + "1": { + {Period: 2 * time.Hour, Priority: 10, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + {Period: 2 * time.Hour, Priority: 1, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.+")}}, + }, + "2": { + {Period: 1 * time.Hour, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + {Period: 2 * time.Hour, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.")}}, + }, + }, + }) + tests := []struct { + name string + ref ChunkEntry + want bool + }{ + {"expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), true}, + {"just expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-1*time.Hour+(10*time.Microsecond))), false}, + {"not expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-30*time.Minute)), false}, + {"not expired tenant by far", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-72*time.Hour), model.Now().Add(-3*time.Hour)), false}, + {"expired stream override", newChunkEntry("2", `{foo="bar"}`, model.Now().Add(-12*time.Hour), model.Now().Add(-10*time.Hour)), true}, + {"non expired stream override", newChunkEntry("1", `{foo="bar"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-90*time.Minute)), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, e.Expired(tt.ref)) + }) + } +} diff --git a/pkg/storage/stores/shipper/compactor/retention/index.go b/pkg/storage/stores/shipper/compactor/retention/index.go new file mode 100644 index 0000000000000..9ff9e3cd7175e --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/index.go @@ -0,0 +1,298 @@ +package retention + +import ( + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/storage" +) + +const ( + chunkTimeRangeKeyV3 = '3' + seriesRangeKeyV1 = '7' + labelSeriesRangeKeyV1 = '8' +) + +var ErrInvalidIndexKey = errors.New("invalid index key") + +type InvalidIndexKeyError struct { + HashKey string + RangeKey string +} + +func newInvalidIndexKeyError(h, r []byte) InvalidIndexKeyError { + return InvalidIndexKeyError{ + HashKey: string(h), + RangeKey: string(r), + } +} + +func (e InvalidIndexKeyError) Error() string { + return fmt.Sprintf("%s: hash_key:%s range_key:%s", ErrInvalidIndexKey, e.HashKey, e.RangeKey) +} + +func (e InvalidIndexKeyError) Is(target error) bool { + return target == ErrInvalidIndexKey +} + +type ChunkRef struct { + UserID []byte + SeriesID []byte + ChunkID []byte + From model.Time + Through model.Time +} + +func (c ChunkRef) String() string { + return fmt.Sprintf("UserID: %s , SeriesID: %s , Time: [%s,%s]", c.UserID, c.SeriesID, c.From, c.Through) +} + +func parseChunkRef(hashKey, rangeKey []byte) (ChunkRef, bool, error) { + componentsRef := getComponents() + defer putComponents(componentsRef) + components := componentsRef.components + + components = decodeRangeKey(rangeKey, components) + if len(components) == 0 { + return ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) + } + + keyType := components[len(components)-1] + if len(keyType) == 0 || keyType[0] != chunkTimeRangeKeyV3 { + return ChunkRef{}, false, nil + } + chunkID := components[len(components)-2] + + userID, hexFrom, hexThrough, ok := parseChunkID(chunkID) + if !ok { + return ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) + } + from, err := strconv.ParseInt(unsafeGetString(hexFrom), 16, 64) + if err != nil { + return ChunkRef{}, false, err + } + through, err := strconv.ParseInt(unsafeGetString(hexThrough), 16, 64) + if err != nil { + return ChunkRef{}, false, err + } + + return ChunkRef{ + UserID: userID, + SeriesID: seriesFromHash(hashKey), + From: model.Time(from), + Through: model.Time(through), + ChunkID: chunkID, + }, true, nil +} + +func parseChunkID(chunkID []byte) (userID []byte, hexFrom, hexThrough []byte, valid bool) { + var ( + j, i int + hex []byte + ) + + for j < len(chunkID) { + if chunkID[j] != '/' { + j++ + continue + } + userID = chunkID[:j] + hex = chunkID[j+1:] + break + } + if len(userID) == 0 { + return nil, nil, nil, false + } + _, i = readOneHexPart(hex) + if i == 0 { + return nil, nil, nil, false + } + hex = hex[i+1:] + hexFrom, i = readOneHexPart(hex) + if i == 0 { + return nil, nil, nil, false + } + hex = hex[i+1:] + hexThrough, i = readOneHexPart(hex) + if i == 0 { + return nil, nil, nil, false + } + return userID, hexFrom, hexThrough, true +} + +func readOneHexPart(hex []byte) (part []byte, i int) { + for i < len(hex) { + if hex[i] != ':' { + i++ + continue + } + return hex[:i], i + } + return nil, 0 +} + +func parseLabelIndexSeriesID(hashKey, rangeKey []byte) ([]byte, bool, error) { + componentsRef := getComponents() + defer putComponents(componentsRef) + components := componentsRef.components + var seriesID []byte + components = decodeRangeKey(rangeKey, components) + if len(components) < 4 { + return nil, false, newInvalidIndexKeyError(hashKey, rangeKey) + } + keyType := components[len(components)-1] + if len(keyType) == 0 { + return nil, false, nil + } + switch keyType[0] { + case labelSeriesRangeKeyV1: + seriesID = components[1] + case seriesRangeKeyV1: + seriesID = components[0] + default: + return nil, false, nil + } + return seriesID, true, nil +} + +type LabelSeriesRangeKey struct { + SeriesID []byte + UserID []byte + Name []byte +} + +func (l LabelSeriesRangeKey) String() string { + return fmt.Sprintf("%s:%s:%s", l.SeriesID, l.UserID, l.Name) +} + +func parseLabelSeriesRangeKey(hashKey, rangeKey []byte) (LabelSeriesRangeKey, bool, error) { + rangeComponentsRef := getComponents() + defer putComponents(rangeComponentsRef) + rangeComponents := rangeComponentsRef.components + hashComponentsRef := getComponents() + defer putComponents(hashComponentsRef) + hashComponents := hashComponentsRef.components + + rangeComponents = decodeRangeKey(rangeKey, rangeComponents) + if len(rangeComponents) < 4 { + return LabelSeriesRangeKey{}, false, newInvalidIndexKeyError(hashKey, rangeKey) + } + keyType := rangeComponents[len(rangeComponents)-1] + if len(keyType) == 0 || keyType[0] != labelSeriesRangeKeyV1 { + return LabelSeriesRangeKey{}, false, nil + } + hashComponents = splitBytesBy(hashKey, ':', hashComponents) + // > v10 HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey , metricName, v.Name), + // < v10 HashValue: fmt.Sprintf("%s:%s:%s", bucket.hashKey, metricName, v.Name), + + if len(hashComponents) < 4 { + return LabelSeriesRangeKey{}, false, newInvalidIndexKeyError(hashKey, rangeKey) + } + return LabelSeriesRangeKey{ + SeriesID: rangeComponents[1], + Name: hashComponents[len(hashComponents)-1], + UserID: hashComponents[len(hashComponents)-4], + }, true, nil +} + +func validatePeriods(config storage.SchemaConfig) error { + for _, schema := range config.Configs { + if schema.IndexTables.Period != 24*time.Hour { + return fmt.Errorf("schema period must be daily, was: %s", schema.IndexTables.Period) + } + } + return nil +} + +func schemaPeriodForTable(config storage.SchemaConfig, tableName string) (chunk.PeriodConfig, bool) { + // first round removes configs that does not have the prefix. + candidates := []chunk.PeriodConfig{} + for _, schema := range config.Configs { + if strings.HasPrefix(tableName, schema.IndexTables.Prefix) { + candidates = append(candidates, schema) + } + } + // WARN we assume period is always daily. This is only true for boltdb-shipper. + var ( + matched chunk.PeriodConfig + found bool + ) + for _, schema := range candidates { + periodIndex, err := strconv.ParseInt(strings.TrimPrefix(tableName, schema.IndexTables.Prefix), 10, 64) + if err != nil { + continue + } + periodSec := int64(schema.IndexTables.Period / time.Second) + tableTs := model.TimeFromUnix(periodIndex * periodSec) + if tableTs.After(schema.From.Time) || tableTs == schema.From.Time { + matched = schema + found = true + } + } + + return matched, found +} + +func seriesFromHash(h []byte) (seriesID []byte) { + var index int + for i := range h { + if h[i] == ':' { + index++ + } + if index == 2 { + seriesID = h[i+1:] + return + } + } + return +} + +// decodeKey decodes hash and range value from a boltdb key. +func decodeKey(k []byte) (hashValue, rangeValue []byte) { + // hashValue + 0 + string(rangeValue) + for i := range k { + if k[i] == 0 { + hashValue = k[:i] + rangeValue = k[i+1:] + return + } + } + return +} + +func splitBytesBy(value []byte, by byte, components [][]byte) [][]byte { + components = components[:0] + i, j := 0, 0 + for j < len(value) { + if value[j] != by { + j++ + continue + } + components = append(components, value[i:j]) + j++ + i = j + } + components = append(components, value[i:]) + return components +} + +func decodeRangeKey(value []byte, components [][]byte) [][]byte { + components = components[:0] + i, j := 0, 0 + for j < len(value) { + if value[j] != 0 { + j++ + continue + } + components = append(components, value[i:j]) + j++ + i = j + } + return components +} diff --git a/pkg/storage/stores/shipper/compactor/retention/index_test.go b/pkg/storage/stores/shipper/compactor/retention/index_test.go new file mode 100644 index 0000000000000..81c39abc36e0c --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/index_test.go @@ -0,0 +1,38 @@ +package retention + +import ( + "fmt" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage" +) + +func Test_schemaPeriodForTable(t *testing.T) { + indexFromTime := func(t time.Time) string { + return fmt.Sprintf("%d", t.Unix()/int64(24*time.Hour/time.Second)) + } + tests := []struct { + name string + config storage.SchemaConfig + tableName string + expected chunk.PeriodConfig + expectedFound bool + }{ + {"out of scope", schemaCfg, "index_" + indexFromTime(start.Time().Add(-24*time.Hour)), chunk.PeriodConfig{}, false}, + {"first table", schemaCfg, "index_" + indexFromTime(dayFromTime(start).Time.Time()), schemaCfg.Configs[0], true}, + {"4 hour after first table", schemaCfg, "index_" + indexFromTime(dayFromTime(start).Time.Time().Add(4*time.Hour)), schemaCfg.Configs[0], true}, + {"second schema", schemaCfg, "index_" + indexFromTime(dayFromTime(start.Add(28*time.Hour)).Time.Time()), schemaCfg.Configs[1], true}, + {"now", schemaCfg, "index_" + indexFromTime(time.Now()), schemaCfg.Configs[2], true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual, actualFound := schemaPeriodForTable(tt.config, tt.tableName) + require.Equal(t, tt.expected, actual) + require.Equal(t, tt.expectedFound, actualFound) + }) + } +} diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator.go b/pkg/storage/stores/shipper/compactor/retention/iterator.go new file mode 100644 index 0000000000000..b95ff5e2b68a0 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/iterator.go @@ -0,0 +1,182 @@ +package retention + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/prometheus/prometheus/pkg/labels" + "go.etcd.io/bbolt" +) + +var ( + _ ChunkEntryIterator = &chunkIndexIterator{} + _ SeriesCleaner = &seriesCleaner{} +) + +type ChunkEntry struct { + ChunkRef + Labels labels.Labels +} + +type ChunkEntryIterator interface { + Next() bool + Entry() ChunkEntry + // Delete deletes the current entry. + Delete() error + Err() error +} + +type chunkIndexIterator struct { + cursor *bbolt.Cursor + current ChunkEntry + first bool + err error + + labelsMapper *seriesLabelsMapper +} + +func newChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*chunkIndexIterator, error) { + labelsMapper, err := newSeriesLabelsMapper(bucket, config) + if err != nil { + return nil, err + } + return &chunkIndexIterator{ + cursor: bucket.Cursor(), + first: true, + labelsMapper: labelsMapper, + current: ChunkEntry{}, + }, nil +} + +func (b *chunkIndexIterator) Err() error { + return b.err +} + +func (b *chunkIndexIterator) Entry() ChunkEntry { + return b.current +} + +func (b *chunkIndexIterator) Delete() error { + return b.cursor.Delete() +} + +func (b *chunkIndexIterator) Next() bool { + var key []byte + if b.first { + key, _ = b.cursor.First() + b.first = false + } else { + key, _ = b.cursor.Next() + } + for key != nil { + ref, ok, err := parseChunkRef(decodeKey(key)) + if err != nil { + b.err = err + return false + } + // skips anything else than chunk index entries. + if !ok { + key, _ = b.cursor.Next() + continue + } + b.current.ChunkRef = ref + b.current.Labels = b.labelsMapper.Get(ref.SeriesID, ref.UserID) + return true + } + return false +} + +type SeriesCleaner interface { + Cleanup(seriesID []byte, userID []byte) error +} + +type seriesCleaner struct { + bucketTimestamps []string + shards map[uint32]string + cursor *bbolt.Cursor + config chunk.PeriodConfig + + buf []byte +} + +func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig) *seriesCleaner { + var ( + fromDay = config.From.Time.Unix() / int64(config.IndexTables.Period/time.Second) + throughDay = config.From.Add(config.IndexTables.Period).Unix() / int64(config.IndexTables.Period/time.Second) + bucketTimestamps = []string{} + ) + for i := fromDay; i <= throughDay; i++ { + bucketTimestamps = append(bucketTimestamps, fmt.Sprintf("d%d", i)) + } + var shards map[uint32]string + if config.RowShards != 0 { + shards = map[uint32]string{} + for s := uint32(0); s <= config.RowShards; s++ { + shards[s] = fmt.Sprintf("%02d", s) + } + } + return &seriesCleaner{ + bucketTimestamps: bucketTimestamps, + cursor: bucket.Cursor(), + buf: make([]byte, 0, 1024), + config: config, + shards: shards, + } +} + +func (s *seriesCleaner) Cleanup(seriesID []byte, userID []byte) error { + for _, timestamp := range s.bucketTimestamps { + // build the chunk ref prefix + s.buf = s.buf[:0] + if s.config.Schema != "v9" { + shard := binary.BigEndian.Uint32(seriesID) % s.config.RowShards + s.buf = append(s.buf, unsafeGetBytes(s.shards[shard])...) + s.buf = append(s.buf, ':') + } + s.buf = append(s.buf, userID...) + s.buf = append(s.buf, ':') + s.buf = append(s.buf, unsafeGetBytes(timestamp)...) + s.buf = append(s.buf, ':') + s.buf = append(s.buf, seriesID...) + + if key, _ := s.cursor.Seek(s.buf); key != nil && bytes.HasPrefix(key, s.buf) { + // this series still have chunk entries we can't cleanup + continue + } + // we don't have any chunk ref for that series let's delete all label index entries + s.buf = s.buf[:0] + if s.config.Schema != "v9" { + shard := binary.BigEndian.Uint32(seriesID) % s.config.RowShards + s.buf = append(s.buf, unsafeGetBytes(s.shards[shard])...) + s.buf = append(s.buf, ':') + } + s.buf = append(s.buf, userID...) + s.buf = append(s.buf, ':') + s.buf = append(s.buf, unsafeGetBytes(timestamp)...) + s.buf = append(s.buf, ':') + s.buf = append(s.buf, unsafeGetBytes(logMetricName)...) + + // delete all seriesRangeKeyV1 and labelSeriesRangeKeyV1 via prefix + // todo(cyriltovena) we might be able to encode index key instead of parsing all label entries for faster delete. + for key, _ := s.cursor.Seek(s.buf); key != nil && bytes.HasPrefix(key, s.buf); key, _ = s.cursor.Next() { + + parsedSeriesID, ok, err := parseLabelIndexSeriesID(decodeKey(key)) + if err != nil { + return err + } + if !ok { + continue + } + if !bytes.Equal(seriesID, parsedSeriesID) { + continue + } + if err := s.cursor.Delete(); err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go new file mode 100644 index 0000000000000..972c7c7151657 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go @@ -0,0 +1,180 @@ +package retention + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func Test_ChunkIterator(t *testing.T) { + for _, tt := range allSchemas { + tt := tt + t.Run(tt.schema, func(t *testing.T) { + store := newTestStore(t) + c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, tt.from, tt.from.Add(1*time.Hour)) + c2 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "foo"}}, tt.from, tt.from.Add(1*time.Hour)) + + require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{ + c1, c2, + })) + + store.Stop() + + tables := store.indexTables() + require.Len(t, tables, 1) + var actual []ChunkEntry + err := tables[0].DB.Update(func(tx *bbolt.Tx) error { + it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config) + require.NoError(t, err) + for it.Next() { + require.NoError(t, it.Err()) + actual = append(actual, it.Entry()) + // delete the last entry + if len(actual) == 2 { + require.NoError(t, it.Delete()) + } + } + return nil + }) + require.NoError(t, err) + require.Equal(t, []ChunkEntry{ + entryFromChunk(c1), + entryFromChunk(c2), + }, actual) + + // second pass we delete c2 + actual = actual[:0] + err = tables[0].DB.Update(func(tx *bbolt.Tx) error { + it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config) + require.NoError(t, err) + for it.Next() { + actual = append(actual, it.Entry()) + } + return it.Err() + }) + require.NoError(t, err) + require.Equal(t, []ChunkEntry{ + entryFromChunk(c1), + }, actual) + }) + } +} + +func Test_SeriesCleaner(t *testing.T) { + for _, tt := range allSchemas { + tt := tt + t.Run(tt.schema, func(t *testing.T) { + store := newTestStore(t) + c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, tt.from, tt.from.Add(1*time.Hour)) + c2 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "foo"}}, tt.from, tt.from.Add(1*time.Hour)) + c3 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "buzz"}}, tt.from, tt.from.Add(1*time.Hour)) + + require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{ + c1, c2, c3, + })) + + store.Stop() + + tables := store.indexTables() + require.Len(t, tables, 1) + // remove c2 chunk + err := tables[0].DB.Update(func(tx *bbolt.Tx) error { + it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config) + require.NoError(t, err) + for it.Next() { + require.NoError(t, it.Err()) + if it.Entry().Labels.Get("bar") == "foo" { + require.NoError(t, it.Delete()) + } + } + return nil + }) + require.NoError(t, err) + + err = tables[0].DB.Update(func(tx *bbolt.Tx) error { + cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config) + if err := cleaner.Cleanup(entryFromChunk(c2).SeriesID, entryFromChunk(c2).UserID); err != nil { + return err + } + if err := cleaner.Cleanup(entryFromChunk(c1).SeriesID, entryFromChunk(c1).UserID); err != nil { + return err + } + return nil + }) + require.NoError(t, err) + + err = tables[0].DB.View(func(tx *bbolt.Tx) error { + return tx.Bucket(bucketName).ForEach(func(k, _ []byte) error { + expectedDeleteSeries := entryFromChunk(c2).SeriesID + series, ok, err := parseLabelIndexSeriesID(decodeKey(k)) + if !ok { + return nil + } + if err != nil { + return err + } + if string(expectedDeleteSeries) == string(series) { + require.Fail(t, "series should be deleted", expectedDeleteSeries) + } + + return nil + }) + }) + require.NoError(t, err) + }) + } +} + +func entryFromChunk(c chunk.Chunk) ChunkEntry { + return ChunkEntry{ + ChunkRef: ChunkRef{ + UserID: []byte(c.UserID), + SeriesID: labelsSeriesID(c.Metric), + ChunkID: []byte(c.ExternalKey()), + From: c.From, + Through: c.Through, + }, + Labels: c.Metric.WithoutLabels("__name__"), + } +} + +var chunkEntry ChunkEntry + +func Benchmark_ChunkIterator(b *testing.B) { + store := newTestStore(b) + for i := 0; i < 100; i++ { + require.NoError(b, store.Put(context.TODO(), + []chunk.Chunk{ + createChunk(b, "1", + labels.Labels{labels.Label{Name: "foo", Value: "bar"}, labels.Label{Name: "i", Value: fmt.Sprintf("%d", i)}}, + allSchemas[0].from, allSchemas[0].from.Add(1*time.Hour)), + }, + )) + } + store.Stop() + b.ReportAllocs() + b.ResetTimer() + + var total int64 + _ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(bucketName) + for n := 0; n < b.N; n++ { + it, err := newChunkIndexIterator(bucket, allSchemas[0].config) + require.NoError(b, err) + for it.Next() { + chunkEntry = it.Entry() + require.NoError(b, it.Delete()) + total++ + } + } + return errors.New("don't commit") + }) + b.Logf("Total chunk ref:%d", total) +} diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/shipper/compactor/retention/marker.go new file mode 100644 index 0000000000000..2eb708b5e3715 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/marker.go @@ -0,0 +1,348 @@ +package retention + +import ( + "bytes" + "context" + "fmt" + "io/fs" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "time" + + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/kit/log/level" + "go.etcd.io/bbolt" + + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" +) + +var minListMarkDelay = time.Minute + +type MarkerStorageWriter interface { + Put(chunkID []byte) error + Count() int64 + Close() error +} + +type markerStorageWriter struct { + db *bbolt.DB + tx *bbolt.Tx + bucket *bbolt.Bucket + + count int64 + fileName string +} + +func NewMarkerStorageWriter(workingDir string) (MarkerStorageWriter, error) { + err := chunk_util.EnsureDirectory(filepath.Join(workingDir, markersFolder)) + if err != nil { + return nil, err + } + fileName := filepath.Join(workingDir, markersFolder, fmt.Sprint(time.Now().UnixNano())) + db, err := shipper_util.SafeOpenBoltdbFile(fileName) + if err != nil { + return nil, err + } + tx, err := db.Begin(true) + if err != nil { + return nil, err + } + bucket, err := tx.CreateBucketIfNotExists(chunkBucket) + if err != nil { + return nil, err + } + return &markerStorageWriter{ + db: db, + tx: tx, + bucket: bucket, + count: 0, + fileName: fileName, + }, err +} + +func (m *markerStorageWriter) Put(chunkID []byte) error { + if err := m.bucket.Put(chunkID, empty); err != nil { + return err + } + m.count++ + return nil +} + +func (m *markerStorageWriter) Count() int64 { + return m.count +} + +func (m *markerStorageWriter) Close() error { + if err := m.tx.Commit(); err != nil { + return err + } + if err := m.db.Close(); err != nil { + return err + } + // The marker file is empty we can remove. + if m.count == 0 { + return os.Remove(m.fileName) + } + return nil +} + +type MarkerProcessor interface { + // Start starts parsing marks and calling deleteFunc for each. + // If deleteFunc returns no error the mark is deleted from the storage. + // Otherwise the mark will reappears in future iteration. + Start(deleteFunc func(ctx context.Context, chunkId []byte) error) + // Stop stops processing marks. + Stop() +} + +type markerProcessor struct { + folder string // folder where to find markers file. + maxParallelism int + minAgeFile time.Duration + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + sweeperMetrics *sweeperMetrics +} + +func newMarkerStorageReader(workingDir string, maxParallelism int, minAgeFile time.Duration, sweeperMetrics *sweeperMetrics) (*markerProcessor, error) { + folder := filepath.Join(workingDir, markersFolder) + err := chunk_util.EnsureDirectory(folder) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + return &markerProcessor{ + folder: folder, + ctx: ctx, + cancel: cancel, + maxParallelism: maxParallelism, + minAgeFile: minAgeFile, + sweeperMetrics: sweeperMetrics, + }, nil +} + +func (r *markerProcessor) Start(deleteFunc func(ctx context.Context, chunkId []byte) error) { + level.Info(util_log.Logger).Log("msg", "mark processor started", "workers", r.maxParallelism, "delay", r.minAgeFile) + r.wg.Wait() // only one start at a time. + r.wg.Add(1) + go func() { + defer r.wg.Done() + ticker := time.NewTicker(minListMarkDelay) + defer ticker.Stop() + tick := func() { + select { + case <-r.ctx.Done(): + case <-ticker.C: + } + } + // instant first tick + for ; true; tick() { + if r.ctx.Err() != nil { + // cancelled + return + } + paths, times, err := r.availablePath() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to list marks path", "path", r.folder, "err", err) + continue + } + r.sweeperMetrics.markerFilesCurrent.Set(float64(len(paths))) + if len(paths) == 0 { + level.Info(util_log.Logger).Log("msg", "no marks file found") + } + for i, path := range paths { + level.Debug(util_log.Logger).Log("msg", "processing mark file", "path", path) + if r.ctx.Err() != nil { + return + } + r.sweeperMetrics.markerFileCurrentTime.Set(float64(times[i].UnixNano()) / 1e9) + if err := r.processPath(path, deleteFunc); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to process marks", "path", path, "err", err) + continue + } + // delete if empty. + if err := r.deleteEmptyMarks(path); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to delete marks", "path", path, "err", err) + } + } + + } + }() +} + +func (r *markerProcessor) processPath(path string, deleteFunc func(ctx context.Context, chunkId []byte) error) error { + var ( + wg sync.WaitGroup + queue = make(chan *bytes.Buffer) + ) + // we use a copy to view the file so that we can read and update at the same time. + viewFile, err := ioutil.TempFile("/tmp/", "marker-view-") + if err != nil { + return err + } + if err := viewFile.Close(); err != nil { + return fmt.Errorf("failed to close view file: %w", err) + } + defer func() { + if err := os.Remove(viewFile.Name()); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to delete view file", "file", viewFile.Name(), "err", err) + } + }() + if _, err := copyFile(path, viewFile.Name()); err != nil { + return fmt.Errorf("failed to copy view file: %w", err) + } + dbView, err := shipper_util.SafeOpenBoltdbFile(viewFile.Name()) + if err != nil { + return err + } + defer func() { + if err := dbView.Close(); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to close db view", "err", err) + } + }() + dbUpdate, err := shipper_util.SafeOpenBoltdbFile(path) + if err != nil { + return err + } + defer func() { + close(queue) + wg.Wait() + if err := dbUpdate.Close(); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to close db", "err", err) + } + }() + for i := 0; i < r.maxParallelism; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for key := range queue { + if err := processKey(r.ctx, key, dbUpdate, deleteFunc); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to delete key", "key", key.String(), "err", err) + } + putKeyBuffer(key) + } + }() + } + if err := dbView.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(chunkBucket) + if b == nil { + return nil + } + + c := b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + key, err := getKeyBuffer(k) + if err != nil { + return err + } + select { + case queue <- key: + case <-r.ctx.Done(): + return r.ctx.Err() + } + + } + return nil + }); err != nil { + return err + } + return nil +} + +func processKey(ctx context.Context, key *bytes.Buffer, db *bbolt.DB, deleteFunc func(ctx context.Context, chunkId []byte) error) error { + keyData := key.Bytes() + if err := deleteFunc(ctx, keyData); err != nil { + return err + } + return db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(chunkBucket) + if b == nil { + return nil + } + return b.Delete(keyData) + }) +} + +func (r *markerProcessor) deleteEmptyMarks(path string) error { + db, err := shipper_util.SafeOpenBoltdbFile(path) + if err != nil { + return err + } + var empty bool + err = db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(chunkBucket) + if b == nil { + empty = true + return nil + } + if k, _ := b.Cursor().First(); k == nil { + empty = true + return nil + } + + return nil + }) + db.Close() + if err != nil { + return err + } + if empty { + r.sweeperMetrics.markerFilesDeletedTotal.Inc() + return os.Remove(path) + } + return nil +} + +// availablePath returns markers path in chronological order, skipping file that are not old enough. +func (r *markerProcessor) availablePath() ([]string, []time.Time, error) { + found := []int64{} + if err := filepath.WalkDir(r.folder, func(path string, d fs.DirEntry, err error) error { + if d == nil || err != nil { + return err + } + + if d.IsDir() && d.Name() != markersFolder { + return filepath.SkipDir + } + if d.IsDir() { + return nil + } + base := filepath.Base(path) + i, err := strconv.ParseInt(base, 10, 64) + if err != nil { + level.Warn(util_log.Logger).Log("msg", "wrong file name", "path", path, "base", base, "err", err) + return nil + } + + if time.Since(time.Unix(0, i)) > r.minAgeFile { + found = append(found, i) + } + return nil + }); err != nil { + return nil, nil, err + } + if len(found) == 0 { + return nil, nil, nil + } + sort.Slice(found, func(i, j int) bool { return found[i] < found[j] }) + res := make([]string, len(found)) + resTime := make([]time.Time, len(found)) + for i, f := range found { + res[i] = filepath.Join(r.folder, fmt.Sprintf("%d", f)) + resTime[i] = time.Unix(0, f) + } + return res, resTime, nil +} + +func (r *markerProcessor) Stop() { + r.cancel() + r.wg.Wait() +} diff --git a/pkg/storage/stores/shipper/compactor/retention/marker_test.go b/pkg/storage/stores/shipper/compactor/retention/marker_test.go new file mode 100644 index 0000000000000..8cfeacdfd36d8 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/marker_test.go @@ -0,0 +1,162 @@ +package retention + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func initAndFeedMarkerProcessor(t *testing.T, deleteWorkerCount int) *markerProcessor { + t.Helper() + minListMarkDelay = time.Second + dir := t.TempDir() + p, err := newMarkerStorageReader(dir, deleteWorkerCount, time.Second, sweepMetrics) + require.NoError(t, err) + go func() { + w, err := NewMarkerStorageWriter(dir) + require.NoError(t, err) + + require.NoError(t, w.Put([]byte("1"))) + require.NoError(t, w.Put([]byte("2"))) + require.NoError(t, w.Close()) + w, err = NewMarkerStorageWriter(dir) + require.NoError(t, err) + require.NoError(t, w.Put([]byte("3"))) + require.NoError(t, w.Put([]byte("4"))) + require.NoError(t, w.Close()) + }() + return p +} + +func Test_marlkerProcessor_Deadlock(t *testing.T) { + dir := t.TempDir() + p, err := newMarkerStorageReader(dir, 150, 0, sweepMetrics) + require.NoError(t, err) + w, err := NewMarkerStorageWriter(dir) + require.NoError(t, err) + for i := 0; i <= 2000; i++ { + require.NoError(t, w.Put([]byte(fmt.Sprintf("%d", i)))) + } + require.NoError(t, w.Close()) + paths, _, err := p.availablePath() + require.NoError(t, err) + for _, path := range paths { + require.NoError(t, p.processPath(path, func(ctx context.Context, chunkId []byte) error { return nil })) + require.NoError(t, p.deleteEmptyMarks(path)) + } + paths, _, err = p.availablePath() + require.NoError(t, err) + require.Len(t, paths, 0) +} + +func Test_markerProcessor_StartRetryKey(t *testing.T) { + p := initAndFeedMarkerProcessor(t, 5) + defer p.Stop() + counts := map[string]int{} + l := sync.Mutex{} + + p.Start(func(ctx context.Context, id []byte) error { + l.Lock() + defer l.Unlock() + counts[string(id)]++ + return errors.New("don't delete") + }) + + require.Eventually(t, func() bool { + l.Lock() + defer l.Unlock() + actual := []string{} + expected := []string{"1", "2", "3", "4"} + for k, v := range counts { + if v <= 1 { // we expects value to come back more than once since we don't delete them. + return false + } + actual = append(actual, k) + } + sort.Strings(actual) + return assert.ObjectsAreEqual(expected, actual) + }, 10*time.Second, 100*time.Microsecond) +} + +func Test_markerProcessor_StartDeleteOnSuccess(t *testing.T) { + p := initAndFeedMarkerProcessor(t, 5) + defer p.Stop() + counts := map[string]int{} + l := sync.Mutex{} + + p.Start(func(ctx context.Context, id []byte) error { + l.Lock() + defer l.Unlock() + counts[string(id)]++ + return nil + }) + + require.Eventually(t, func() bool { + l.Lock() + defer l.Unlock() + actual := []string{} + expected := []string{"1", "2", "3", "4"} + for k, v := range counts { + if v > 1 { // we should see keys only once ! + return false + } + actual = append(actual, k) + } + sort.Strings(actual) + return assert.ObjectsAreEqual(expected, actual) + }, 10*time.Second, 100*time.Microsecond) +} + +func Test_markerProcessor_availablePath(t *testing.T) { + now := time.Now() + for _, tt := range []struct { + name string + expected func(dir string) ([]string, []time.Time) + }{ + {"empty", func(_ string) ([]string, []time.Time) { return nil, nil }}, + {"skips bad files", func(dir string) ([]string, []time.Time) { + _, _ = os.Create(filepath.Join(dir, "foo")) + return nil, nil + }}, + {"happy path", func(dir string) ([]string, []time.Time) { + _, _ = os.Create(filepath.Join(dir, fmt.Sprintf("%d", now.UnixNano()))) + _, _ = os.Create(filepath.Join(dir, "foo")) + _, _ = os.Create(filepath.Join(dir, fmt.Sprintf("%d", now.Add(-30*time.Minute).UnixNano()))) + _, _ = os.Create(filepath.Join(dir, fmt.Sprintf("%d", now.Add(-1*time.Hour).UnixNano()))) + _, _ = os.Create(filepath.Join(dir, fmt.Sprintf("%d", now.Add(-3*time.Hour).UnixNano()))) + _, _ = os.Create(filepath.Join(dir, fmt.Sprintf("%d", now.Add(-2*time.Hour).UnixNano()))) + _, _ = os.Create(filepath.Join(dir, fmt.Sprintf("%d", now.Add(-48*time.Hour).UnixNano()))) + return []string{ + filepath.Join(dir, fmt.Sprintf("%d", now.Add(-48*time.Hour).UnixNano())), // oldest should be first + filepath.Join(dir, fmt.Sprintf("%d", now.Add(-3*time.Hour).UnixNano())), + filepath.Join(dir, fmt.Sprintf("%d", now.Add(-2*time.Hour).UnixNano())), + }, []time.Time{ + time.Unix(0, now.Add(-48*time.Hour).UnixNano()), + time.Unix(0, now.Add(-3*time.Hour).UnixNano()), + time.Unix(0, now.Add(-2*time.Hour).UnixNano()), + } + }}, + } { + t.Run("", func(t *testing.T) { + dir := t.TempDir() + p, err := newMarkerStorageReader(dir, 5, 2*time.Hour, sweepMetrics) + + expectedPath, expectedTimes := tt.expected(p.folder) + + require.NoError(t, err) + paths, times, err := p.availablePath() + require.Nil(t, err) + require.Equal(t, expectedPath, paths) + require.Equal(t, expectedTimes, times) + }) + } +} diff --git a/pkg/storage/stores/shipper/compactor/retention/metrics.go b/pkg/storage/stores/shipper/compactor/retention/metrics.go new file mode 100644 index 0000000000000..6734b9b24a79a --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/metrics.go @@ -0,0 +1,76 @@ +package retention + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + statusFailure = "failure" + statusSuccess = "success" + statusNotFound = "notfound" + + tableActionModified = "modified" + tableActionDeleted = "deleted" + tableActionNone = "none" +) + +type sweeperMetrics struct { + deleteChunkDurationSeconds *prometheus.HistogramVec + markerFileCurrentTime prometheus.Gauge + markerFilesCurrent prometheus.Gauge + markerFilesDeletedTotal prometheus.Counter +} + +func newSweeperMetrics(r prometheus.Registerer) *sweeperMetrics { + return &sweeperMetrics{ + deleteChunkDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_sweeper_chunk_deleted_duration_seconds", + Help: "Time (in seconds) spent in deleting chunk", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 8), + }, []string{"status"}), + markerFilesCurrent: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_sweeper_marker_files_current", + Help: "The current total of marker files valid for deletion.", + }), + markerFileCurrentTime: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_sweeper_marker_file_processing_current_time", + Help: "The current time of creation of the marker file being processed.", + }), + markerFilesDeletedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_sweeper_marker_files_deleted_total", + Help: "The total of marker files deleted after being fully processed.", + }), + } +} + +type markerMetrics struct { + tableProcessedTotal *prometheus.CounterVec + tableMarksCreatedTotal *prometheus.CounterVec + tableProcessedDurationSeconds *prometheus.HistogramVec +} + +func newMarkerMetrics(r prometheus.Registerer) *markerMetrics { + return &markerMetrics{ + tableProcessedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_marker_table_processed_total", + Help: "Total amount of table processed per action.", + }, []string{"table", "action"}), + tableMarksCreatedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_marker_count_total", + Help: "Total count of markers created per table.", + }, []string{"table"}), + tableProcessedDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_boltdb_shipper", + Name: "retention_marker_table_processed_duration_seconds", + Help: "Time (in seconds) spent in marking table for chunks to delete", + Buckets: []float64{1, 2.5, 5, 10, 20, 40, 90, 360, 600, 1800}, + }, []string{"table", "status"}), + } +} diff --git a/pkg/storage/stores/shipper/compactor/retention/pool.go b/pkg/storage/stores/shipper/compactor/retention/pool.go new file mode 100644 index 0000000000000..0dd00b5e43354 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/pool.go @@ -0,0 +1,49 @@ +package retention + +import ( + "bytes" + "sync" +) + +type componentRef struct { + components [][]byte +} + +var ( + componentPools = sync.Pool{ + New: func() interface{} { + return &componentRef{ + components: make([][]byte, 0, 5), + } + }, + } + keyPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 512)) + }, + } +) + +func getComponents() *componentRef { + ref := componentPools.Get().(*componentRef) + ref.components = ref.components[:0] + return ref +} + +func putComponents(ref *componentRef) { + componentPools.Put(ref) +} + +func getKeyBuffer(key []byte) (*bytes.Buffer, error) { + buf := keyPool.Get().(*bytes.Buffer) + if _, err := buf.Write(key); err != nil { + putKeyBuffer(buf) + return nil, err + } + return buf, nil +} + +func putKeyBuffer(buf *bytes.Buffer) { + buf.Reset() + keyPool.Put(buf) +} diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go new file mode 100644 index 0000000000000..47d42e38edd34 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -0,0 +1,295 @@ +package retention + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/bbolt" + + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper/util" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" +) + +var ( + bucketName = []byte("index") + chunkBucket = []byte("chunks") + empty = []byte("-") +) + +const ( + logMetricName = "logs" + markersFolder = "markers" +) + +type Marker struct { + workingDirectory string + config storage.SchemaConfig + objectClient chunk.ObjectClient + expiration ExpirationChecker + markerMetrics *markerMetrics +} + +func NewMarker(workingDirectory string, config storage.SchemaConfig, objectClient chunk.ObjectClient, expiration ExpirationChecker, r prometheus.Registerer) (*Marker, error) { + if err := validatePeriods(config); err != nil { + return nil, err + } + metrics := newMarkerMetrics(r) + return &Marker{ + workingDirectory: workingDirectory, + config: config, + objectClient: objectClient, + expiration: expiration, + markerMetrics: metrics, + }, nil +} + +// MarkForDelete marks all chunks expired for a given table. +func (t *Marker) MarkForDelete(ctx context.Context, tableName string) error { + start := time.Now() + status := statusSuccess + defer func() { + t.markerMetrics.tableProcessedDurationSeconds.WithLabelValues(tableName, status).Observe(time.Since(start).Seconds()) + level.Debug(util_log.Logger).Log("msg", "finished to process table", "table", tableName, "duration", time.Since(start)) + }() + level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName) + + if err := t.markTable(ctx, tableName); err != nil { + status = statusFailure + return err + } + return nil +} + +func (t *Marker) markTable(ctx context.Context, tableName string) error { + objects, err := util.ListDirectory(ctx, tableName, t.objectClient) + if err != nil { + return err + } + + if len(objects) != 1 { + // todo(1): in the future we would want to support more tables so that we can apply retention below 1d. + // for simplicity and to avoid conflict with compactor we'll support only compacted db file. + // Possibly we should apply retention right before the compactor upload compacted db. + + // todo(2): Depending on the retention rules we should be able to skip tables. + // For instance if there isn't a retention rules below 1 week, then we can skip the first 7 tables. + level.Debug(util_log.Logger).Log("msg", "skipping retention for non-compacted table", "name", tableName) + return nil + } + objectKey := objects[0].Key + + if shipper_util.IsDirectory(objectKey) { + level.Debug(util_log.Logger).Log("msg", "skipping retention no table file found", "objectKey", objectKey) + return nil + } + + tableDirectory := path.Join(t.workingDirectory, tableName) + err = chunk_util.EnsureDirectory(tableDirectory) + if err != nil { + return err + } + defer func() { + if err := os.RemoveAll(tableDirectory); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to remove temporary table directory", "err", err, "path", tableDirectory) + } + }() + + downloadAt := filepath.Join(tableDirectory, fmt.Sprintf("retention-%d", time.Now().UnixNano())) + + err = shipper_util.GetFileFromStorage(ctx, t.objectClient, objectKey, downloadAt) + if err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to download table", "err", err, "path", downloadAt, "objectKey", objectKey) + return err + } + + db, err := shipper_util.SafeOpenBoltdbFile(downloadAt) + if err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to open db", "err", err, "path", downloadAt) + return err + } + + defer func() { + if err := db.Close(); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to close local db", "err", err) + } + }() + + schemaCfg, ok := schemaPeriodForTable(t.config, tableName) + if !ok { + return fmt.Errorf("could not find schema for table: %s", tableName) + } + + markerWriter, err := NewMarkerStorageWriter(t.workingDirectory) + if err != nil { + return fmt.Errorf("failed to create marker writer: %w", err) + } + + var empty bool + err = db.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(bucketName) + if bucket == nil { + return nil + } + + chunkIt, err := newChunkIndexIterator(bucket, schemaCfg) + if err != nil { + return fmt.Errorf("failed to create chunk index iterator: %w", err) + } + + empty, err = markforDelete(markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg), t.expiration) + if err != nil { + return err + } + t.markerMetrics.tableMarksCreatedTotal.WithLabelValues(tableName).Add(float64(markerWriter.Count())) + if err := markerWriter.Close(); err != nil { + return fmt.Errorf("failed to close marker writer: %w", err) + } + return nil + }) + if err != nil { + return err + } + // if the index is empty we can delete the index table. + if empty { + t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc() + return t.objectClient.DeleteObject(ctx, objectKey) + } + // No chunks to delete means no changes to the remote index, we don't need to upload it. + if markerWriter.Count() == 0 { + t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc() + return nil + } + t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc() + return t.uploadDB(ctx, db, objectKey) +} + +func (t *Marker) uploadDB(ctx context.Context, db *bbolt.DB, objectKey string) error { + sourcePath := db.Path() + if strings.HasSuffix(objectKey, ".gz") { + compressedPath := fmt.Sprintf("%s.gz", sourcePath) + err := shipper_util.CompressFile(sourcePath, compressedPath) + if err != nil { + return err + } + defer func() { + os.Remove(compressedPath) + }() + sourcePath = compressedPath + } + sourceFile, err := os.Open(sourcePath) + if err != nil { + return err + } + defer func() { + if err := sourceFile.Close(); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to close file", "path", sourceFile, "err", err) + } + }() + return t.objectClient.PutObject(ctx, objectKey, sourceFile) +} + +func markforDelete(marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker) (bool, error) { + seriesMap := newUserSeriesMap() + empty := true + for chunkIt.Next() { + if chunkIt.Err() != nil { + return false, chunkIt.Err() + } + c := chunkIt.Entry() + if expiration.Expired(c) { + seriesMap.Add(c.SeriesID, c.UserID) + if err := chunkIt.Delete(); err != nil { + return false, err + } + if err := marker.Put(c.ChunkID); err != nil { + return false, err + } + continue + } + empty = false + } + if empty { + return true, nil + } + return false, seriesMap.ForEach(func(seriesID, userID []byte) error { + return seriesCleaner.Cleanup(seriesID, userID) + }) +} + +type DeleteClient interface { + DeleteObject(ctx context.Context, objectKey string) error +} + +type DeleteClientFunc func(ctx context.Context, objectKey string) error + +func (d DeleteClientFunc) DeleteObject(ctx context.Context, objectKey string) error { + return d(ctx, objectKey) +} + +func NewDeleteClient(objectClient chunk.ObjectClient) DeleteClient { + // filesystem encode64 keys on disk. useful for testing. + if fs, ok := objectClient.(*local.FSObjectClient); ok { + return DeleteClientFunc(func(ctx context.Context, objectKey string) error { + return fs.DeleteObject(ctx, base64.StdEncoding.EncodeToString([]byte(objectKey))) + }) + } + return objectClient +} + +type Sweeper struct { + markerProcessor MarkerProcessor + deleteClient DeleteClient + sweeperMetrics *sweeperMetrics +} + +func NewSweeper(workingDir string, deleteClient DeleteClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) { + m := newSweeperMetrics(r) + p, err := newMarkerStorageReader(workingDir, deleteWorkerCount, minAgeDelete, m) + if err != nil { + return nil, err + } + return &Sweeper{ + markerProcessor: p, + deleteClient: deleteClient, + sweeperMetrics: m, + }, nil +} + +func (s *Sweeper) Start() { + s.markerProcessor.Start(func(ctx context.Context, chunkId []byte) error { + status := statusSuccess + start := time.Now() + defer func() { + s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds()) + }() + chunkIDString := unsafeGetString(chunkId) + err := s.deleteClient.DeleteObject(ctx, chunkIDString) + if err == chunk.ErrStorageObjectNotFound { + status = statusNotFound + level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString) + return nil + } + if err != nil { + level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err) + status = statusFailure + } + return err + }) +} + +func (s *Sweeper) Stop() { + s.markerProcessor.Stop() +} diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go new file mode 100644 index 0000000000000..f13f4f9d639e9 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -0,0 +1,256 @@ +package retention + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/storage/stores/util" + "github.com/grafana/loki/pkg/validation" +) + +func Test_Retention(t *testing.T) { + minListMarkDelay = 1 * time.Second + for _, tt := range []struct { + name string + limits Limits + chunks []chunk.Chunk + alive []bool + }{ + { + "nothing is expiring", + fakeLimits{ + perTenant: map[string]time.Duration{ + "1": 1000 * time.Hour, + "2": 1000 * time.Hour, + }, + perStream: map[string][]validation.StreamRetention{}, + }, + []chunk.Chunk{ + createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, start, start.Add(1*time.Hour)), + createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}}, start.Add(26*time.Hour), start.Add(27*time.Hour)), + }, + []bool{ + true, + true, + }, + }, + { + "one global expiration", + fakeLimits{ + perTenant: map[string]time.Duration{ + "1": 10 * time.Hour, + "2": 1000 * time.Hour, + }, + perStream: map[string][]validation.StreamRetention{}, + }, + []chunk.Chunk{ + createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, start, start.Add(1*time.Hour)), + createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}}, start.Add(26*time.Hour), start.Add(27*time.Hour)), + }, + []bool{ + false, + true, + }, + }, + { + "one global expiration and stream", + fakeLimits{ + perTenant: map[string]time.Duration{ + "1": 10 * time.Hour, + "2": 1000 * time.Hour, + }, + perStream: map[string][]validation.StreamRetention{ + "1": { + {Period: 5 * time.Hour, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "buzz")}}, + }, + }, + }, + []chunk.Chunk{ + createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, start, start.Add(1*time.Hour)), + createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "fuzz"}}, start.Add(26*time.Hour), start.Add(27*time.Hour)), + createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}}, model.Now().Add(-2*time.Hour), model.Now().Add(-1*time.Hour)), + createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}}, model.Now().Add(-7*time.Hour), model.Now().Add(-6*time.Hour)), + }, + []bool{ + false, + true, + true, + false, + }, + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + // insert in the store. + var ( + store = newTestStore(t) + expectDeleted = []string{} + actualDeleted = []string{} + lock sync.Mutex + ) + for _, c := range tt.chunks { + require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{c})) + } + store.Stop() + + // marks and sweep + expiration := NewExpirationChecker(tt.limits) + workDir := filepath.Join(t.TempDir(), "retention") + sweep, err := NewSweeper(workDir, DeleteClientFunc(func(ctx context.Context, objectKey string) error { + lock.Lock() + defer lock.Unlock() + key := string([]byte(objectKey)) // forces a copy, because this string is only valid within the delete fn. + actualDeleted = append(actualDeleted, key) + return nil + }), 10, 0, nil) + require.NoError(t, err) + sweep.Start() + defer sweep.Stop() + + marker, err := NewMarker(workDir, store.schemaCfg, util.NewPrefixedObjectClient(store.objectClient, "index/"), expiration, prometheus.NewRegistry()) + require.NoError(t, err) + for _, table := range store.indexTables() { + table.Close() + require.NoError(t, marker.MarkForDelete(context.Background(), table.name)) + } + + // assert using the store again. + store.open() + + for i, e := range tt.alive { + require.Equal(t, e, store.HasChunk(tt.chunks[i]), "chunk %d should be %t", i, e) + if !e { + expectDeleted = append(expectDeleted, tt.chunks[i].ExternalKey()) + } + } + store.Stop() + if len(expectDeleted) != 0 { + require.Eventually(t, func() bool { + lock.Lock() + defer lock.Unlock() + return assert.ObjectsAreEqual(expectDeleted, actualDeleted) + }, 10*time.Second, 1*time.Second) + } + }) + } +} + +type noopWriter struct{} + +func (noopWriter) Put(chunkID []byte) error { return nil } +func (noopWriter) Count() int64 { return 0 } +func (noopWriter) Close() error { return nil } + +type noopCleaner struct{} + +func (noopCleaner) Cleanup(seriesID []byte, userID []byte) error { return nil } + +func Test_EmptyTable(t *testing.T) { + schema := allSchemas[0] + store := newTestStore(t) + c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, schema.from, schema.from.Add(1*time.Hour)) + c2 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "foo"}}, schema.from, schema.from.Add(1*time.Hour)) + c3 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "buzz"}}, schema.from, schema.from.Add(1*time.Hour)) + + require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{ + c1, c2, c3, + })) + + store.Stop() + + tables := store.indexTables() + require.Len(t, tables, 1) + err := tables[0].DB.Update(func(tx *bbolt.Tx) error { + it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) + require.NoError(t, err) + empty, err := markforDelete(noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]time.Duration{"1": 0, "2": 0}})) + require.NoError(t, err) + require.True(t, empty) + return nil + }) + require.NoError(t, err) +} + +func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time, through model.Time) chunk.Chunk { + t.Helper() + const ( + targetSize = 1500 * 1024 + blockSize = 256 * 1024 + ) + labelsBuilder := labels.NewBuilder(lbs) + labelsBuilder.Set(labels.MetricName, "logs") + metric := labelsBuilder.Labels() + fp := client.Fingerprint(lbs) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, blockSize, targetSize) + + for ts := from; ts.Before(through); ts = ts.Add(1 * time.Minute) { + require.NoError(t, chunkEnc.Append(&logproto.Entry{ + Timestamp: ts.Time(), + Line: ts.String(), + })) + } + c := chunk.NewChunk(userID, fp, metric, chunkenc.NewFacade(chunkEnc, blockSize, targetSize), from, through) + require.NoError(t, c.Encode()) + return c +} + +func labelsSeriesID(ls labels.Labels) []byte { + h := sha256.Sum256([]byte(labelsString(ls))) + return encodeBase64Bytes(h[:]) +} + +func encodeBase64Bytes(bytes []byte) []byte { + encodedLen := base64.RawStdEncoding.EncodedLen(len(bytes)) + encoded := make([]byte, encodedLen) + base64.RawStdEncoding.Encode(encoded, bytes) + return encoded +} + +// Backwards-compatible with model.Metric.String() +func labelsString(ls labels.Labels) string { + metricName := ls.Get(labels.MetricName) + if metricName != "" && len(ls) == 1 { + return metricName + } + var b strings.Builder + b.Grow(1000) + + b.WriteString(metricName) + b.WriteByte('{') + i := 0 + for _, l := range ls { + if l.Name == labels.MetricName { + continue + } + if i > 0 { + b.WriteByte(',') + b.WriteByte(' ') + } + b.WriteString(l.Name) + b.WriteByte('=') + var buf [1000]byte + b.Write(strconv.AppendQuote(buf[:0], l.Value)) + i++ + } + b.WriteByte('}') + + return b.String() +} diff --git a/pkg/storage/stores/shipper/compactor/retention/series.go b/pkg/storage/stores/shipper/compactor/retention/series.go new file mode 100644 index 0000000000000..f33dbb7975985 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/series.go @@ -0,0 +1,130 @@ +package retention + +import ( + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/prometheus/prometheus/pkg/labels" + "go.etcd.io/bbolt" +) + +type userSeries struct { + key []byte + seriesIDLen int +} + +func newUserSeries(seriesID []byte, userID []byte) userSeries { + key := make([]byte, 0, len(seriesID)+len(userID)) + key = append(key, seriesID...) + key = append(key, userID...) + return userSeries{ + key: key, + seriesIDLen: len(seriesID), + } +} + +func (us userSeries) Key() string { + return unsafeGetString(us.key) +} + +func (us userSeries) SeriesID() []byte { + return us.key[:us.seriesIDLen] +} + +func (us userSeries) UserID() []byte { + return us.key[us.seriesIDLen:] +} + +func (us *userSeries) Reset(seriesID []byte, userID []byte) { + if us.key == nil { + us.key = make([]byte, 0, len(seriesID)+len(userID)) + } + us.key = us.key[:0] + us.key = append(us.key, seriesID...) + us.key = append(us.key, userID...) + us.seriesIDLen = len(seriesID) +} + +type userSeriesMap map[string]userSeries + +func newUserSeriesMap() userSeriesMap { + return make(userSeriesMap) +} + +func (u userSeriesMap) Add(seriesID []byte, userID []byte) { + us := newUserSeries(seriesID, userID) + u[us.Key()] = us +} + +func (u userSeriesMap) ForEach(callback func(seriesID []byte, userID []byte) error) error { + for _, v := range u { + if err := callback(v.SeriesID(), v.UserID()); err != nil { + return err + } + } + return nil +} + +type seriesLabels struct { + userSeries + lbs labels.Labels +} + +type seriesLabelsMapper struct { + cursor *bbolt.Cursor + config chunk.PeriodConfig + + bufKey userSeries + mapping map[string]*seriesLabels +} + +func newSeriesLabelsMapper(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*seriesLabelsMapper, error) { + sm := &seriesLabelsMapper{ + cursor: bucket.Cursor(), + mapping: map[string]*seriesLabels{}, + config: config, + bufKey: newUserSeries(nil, nil), + } + if err := sm.build(); err != nil { + return nil, err + } + return sm, nil +} + +func (sm *seriesLabelsMapper) Get(seriesID []byte, userID []byte) labels.Labels { + sm.bufKey.Reset(seriesID, userID) + lbs, ok := sm.mapping[sm.bufKey.Key()] + if ok { + return lbs.lbs + } + return labels.Labels{} +} + +func (sm *seriesLabelsMapper) build() error { +Outer: + for k, v := sm.cursor.First(); k != nil; k, v = sm.cursor.Next() { + ref, ok, err := parseLabelSeriesRangeKey(decodeKey(k)) + if err != nil { + return err + } + if !ok { + continue + } + sm.bufKey.Reset(ref.SeriesID, ref.UserID) + lbs, ok := sm.mapping[sm.bufKey.Key()] + if !ok { + k := newUserSeries(ref.SeriesID, ref.UserID) + lbs = &seriesLabels{ + userSeries: k, + lbs: make(labels.Labels, 0, 15), + } + sm.mapping[k.Key()] = lbs + } + // add the labels if it doesn't exist. + for _, l := range lbs.lbs { + if l.Name == unsafeGetString(ref.Name) { + continue Outer + } + } + lbs.lbs = append(lbs.lbs, labels.Label{Name: string(ref.Name), Value: string(v)}) + } + return nil +} diff --git a/pkg/storage/stores/shipper/compactor/retention/series_test.go b/pkg/storage/stores/shipper/compactor/retention/series_test.go new file mode 100644 index 0000000000000..dc978055c4f3d --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/series_test.go @@ -0,0 +1,35 @@ +package retention + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_UserSeries(t *testing.T) { + m := newUserSeriesMap() + + m.Add([]byte(`series1`), []byte(`user1`)) + m.Add([]byte(`series1`), []byte(`user1`)) + m.Add([]byte(`series1`), []byte(`user2`)) + m.Add([]byte(`series2`), []byte(`user1`)) + m.Add([]byte(`series2`), []byte(`user1`)) + m.Add([]byte(`series2`), []byte(`user2`)) + + keys := []string{} + + err := m.ForEach(func(seriesID, userID []byte) error { + keys = append(keys, string(seriesID)+":"+string(userID)) + return nil + }) + require.NoError(t, err) + require.Len(t, keys, 4) + sort.Strings(keys) + require.Equal(t, []string{ + "series1:user1", + "series1:user2", + "series2:user1", + "series2:user2", + }, keys) +} diff --git a/pkg/storage/stores/shipper/compactor/retention/util.go b/pkg/storage/stores/shipper/compactor/retention/util.go new file mode 100644 index 0000000000000..4802cbe9534f2 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/util.go @@ -0,0 +1,47 @@ +package retention + +import ( + "fmt" + "io" + "os" + "reflect" + "unsafe" +) + +// unsafeGetString is like yolostring but with a meaningful name +func unsafeGetString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} + +func unsafeGetBytes(s string) []byte { + var buf []byte + p := unsafe.Pointer(&buf) + *(*string)(p) = s + (*reflect.SliceHeader)(p).Cap = len(s) + return buf +} + +func copyFile(src, dst string) (int64, error) { + sourceFileStat, err := os.Stat(src) + if err != nil { + return 0, err + } + + if !sourceFileStat.Mode().IsRegular() { + return 0, fmt.Errorf("%s is not a regular file", src) + } + + source, err := os.Open(src) + if err != nil { + return 0, err + } + defer source.Close() + + destination, err := os.Create(dst) + if err != nil { + return 0, err + } + defer destination.Close() + nBytes, err := io.Copy(destination, source) + return nBytes, err +} diff --git a/pkg/storage/stores/shipper/compactor/retention/util_test.go b/pkg/storage/stores/shipper/compactor/retention/util_test.go new file mode 100644 index 0000000000000..2f01dc610a469 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/retention/util_test.go @@ -0,0 +1,253 @@ +package retention + +import ( + "context" + "io/ioutil" + "path/filepath" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + cortex_storage "github.com/cortexproject/cortex/pkg/chunk/storage" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" + "github.com/weaveworks/common/user" + "go.etcd.io/bbolt" + + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/validation" +) + +func dayFromTime(t model.Time) chunk.DayTime { + parsed, err := time.Parse("2006-01-02", t.Time().Format("2006-01-02")) + if err != nil { + panic(err) + } + return chunk.DayTime{ + Time: model.TimeFromUnix(parsed.Unix()), + } +} + +var ( + start = model.Now().Add(-30 * 24 * time.Hour) + schemaCfg = storage.SchemaConfig{ + SchemaConfig: chunk.SchemaConfig{ + // we want to test over all supported schema. + Configs: []chunk.PeriodConfig{ + { + From: dayFromTime(start), + IndexType: "boltdb", + ObjectType: "filesystem", + Schema: "v9", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 16, + }, + { + From: dayFromTime(start.Add(25 * time.Hour)), + IndexType: "boltdb", + ObjectType: "filesystem", + Schema: "v10", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 16, + }, + { + From: dayFromTime(start.Add(49 * time.Hour)), + IndexType: "boltdb", + ObjectType: "filesystem", + Schema: "v11", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 16, + }, + }, + }, + } + allSchemas = []struct { + schema string + from model.Time + config chunk.PeriodConfig + }{ + {"v9", schemaCfg.Configs[0].From.Time, schemaCfg.Configs[0]}, + {"v10", schemaCfg.Configs[1].From.Time, schemaCfg.Configs[1]}, + {"v11", schemaCfg.Configs[2].From.Time, schemaCfg.Configs[2]}, + } + + sweepMetrics = newSweeperMetrics(prometheus.DefaultRegisterer) +) + +func newChunkEntry(userID, labels string, from, through model.Time) ChunkEntry { + lbs, err := logql.ParseLabels(labels) + if err != nil { + panic(err) + } + return ChunkEntry{ + ChunkRef: ChunkRef{ + UserID: []byte(userID), + SeriesID: labelsSeriesID(lbs), + From: from, + Through: through, + }, + Labels: lbs, + } +} + +type testStore struct { + storage.Store + cfg storage.Config + objectClient chunk.ObjectClient + indexDir, chunkDir string + schemaCfg storage.SchemaConfig + t testing.TB + limits cortex_storage.StoreLimits +} + +// testObjectClient is a testing object client +type testObjectClient struct { + chunk.ObjectClient + path string +} + +func newTestObjectClient(path string) chunk.ObjectClient { + c, err := cortex_storage.NewObjectClient("filesystem", cortex_storage.Config{ + FSConfig: local.FSConfig{ + Directory: path, + }, + }) + if err != nil { + panic(err) + } + return &testObjectClient{ + ObjectClient: c, + path: path, + } +} + +type table struct { + name string + *bbolt.DB +} + +func (t *testStore) indexTables() []table { + t.t.Helper() + res := []table{} + indexFilesInfo, err := ioutil.ReadDir(t.indexDir) + require.NoError(t.t, err) + for _, indexFileInfo := range indexFilesInfo { + db, err := shipper_util.SafeOpenBoltdbFile(filepath.Join(t.indexDir, indexFileInfo.Name())) + require.NoError(t.t, err) + res = append(res, table{name: indexFileInfo.Name(), DB: db}) + } + return res +} + +func (t *testStore) HasChunk(c chunk.Chunk) bool { + t.t.Helper() + var matchers []*labels.Matcher + for _, l := range c.Metric { + matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value)) + } + chunks, err := t.Store.Get(user.InjectOrgID(context.Background(), c.UserID), + c.UserID, c.From, c.Through, matchers...) + require.NoError(t.t, err) + return len(chunks) == 1 && c.ExternalKey() == chunks[0].ExternalKey() +} + +func (t *testStore) open() { + chunkStore, err := cortex_storage.NewStore( + t.cfg.Config, + chunk.StoreConfig{}, + schemaCfg.SchemaConfig, + t.limits, + nil, + nil, + util_log.Logger, + ) + require.NoError(t.t, err) + + store, err := storage.NewStore(t.cfg, schemaCfg, chunkStore, nil) + require.NoError(t.t, err) + t.Store = store +} + +func newTestStore(t testing.TB) *testStore { + t.Helper() + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util_log.InitLogger(cfg) + workdir := t.TempDir() + filepath.Join(workdir, "index") + indexDir := filepath.Join(workdir, "index") + err := chunk_util.EnsureDirectory(indexDir) + require.Nil(t, err) + + chunkDir := filepath.Join(workdir, "chunk_test") + err = chunk_util.EnsureDirectory(indexDir) + require.Nil(t, err) + require.Nil(t, err) + + defer func() { + }() + limits, err := validation.NewOverrides(validation.Limits{}, nil) + require.NoError(t, err) + + require.NoError(t, schemaCfg.SchemaConfig.Validate()) + + config := storage.Config{ + Config: cortex_storage.Config{ + BoltDBConfig: local.BoltDBConfig{ + Directory: indexDir, + }, + FSConfig: local.FSConfig{ + Directory: chunkDir, + }, + }, + BoltDBShipperConfig: shipper.Config{ + ActiveIndexDirectory: indexDir, + SharedStoreType: "filesystem", + SharedStoreKeyPrefix: "index", + ResyncInterval: 1 * time.Millisecond, + IngesterName: "foo", + Mode: shipper.ModeReadWrite, + }, + } + chunkStore, err := cortex_storage.NewStore( + config.Config, + chunk.StoreConfig{}, + schemaCfg.SchemaConfig, + limits, + nil, + nil, + util_log.Logger, + ) + require.NoError(t, err) + + store, err := storage.NewStore(config, schemaCfg, chunkStore, nil) + require.NoError(t, err) + return &testStore{ + indexDir: indexDir, + chunkDir: chunkDir, + t: t, + Store: store, + schemaCfg: schemaCfg, + objectClient: newTestObjectClient(workdir), + cfg: config, + limits: limits, + } +} diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index ca2b845e71700..a1752c12b7dce 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/kit/log/level" "go.etcd.io/bbolt" + "github.com/grafana/loki/pkg/storage/stores/shipper/util" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) @@ -61,9 +62,7 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O } func (t *table) compact() error { - // The forward slash here needs to stay because we are trying to list contents of a directory without it we will get the name of the same directory back with hosted object stores. - // This is due to the object stores not having a concept of directories. - objects, _, err := t.storageClient.List(t.ctx, t.name+delimiter, delimiter) + objects, err := util.ListDirectory(t.ctx, t.name, t.storageClient) if err != nil { return err } @@ -82,6 +81,7 @@ func (t *table) compact() error { } }() + // create a new compacted db t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))) if err != nil { return err @@ -93,7 +93,7 @@ func (t *table) compact() error { readObjectChan := make(chan string) n := util_math.Min(len(objects), readDBsParallelism) - // read files parallelly + // read files in parallel for i := 0; i < n; i++ { go func() { var err error @@ -137,7 +137,6 @@ func (t *table) compact() error { return } } - }() } @@ -268,7 +267,7 @@ func (t *table) readFile(path string) error { if err != nil { return err } - + // todo(cyriltovena) we should just re-slice to avoid allocations writeBatch = make([]indexEntry, 0, batchSize) } diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index 48bedcdf75a0d..e030522d29de9 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -191,3 +191,13 @@ func ValidateSharedStoreKeyPrefix(prefix string) error { return nil } + +func ListDirectory(ctx context.Context, dirName string, objectClient chunk.ObjectClient) ([]chunk.StorageObject, error) { + // The forward slash here needs to stay because we are trying to list contents of a directory without it we will get the name of the same directory back with hosted object stores. + // This is due to the object stores not having a concept of directories. + objects, _, err := objectClient.List(ctx, dirName+delimiter, delimiter) + if err != nil { + return nil, err + } + return objects, nil +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 78454f58063cf..01dfada213b22 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -2,11 +2,14 @@ package validation import ( "flag" + "fmt" "time" - "github.com/grafana/loki/pkg/util/flagext" - "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util/flagext" ) const ( @@ -61,11 +64,22 @@ type Limits struct { RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"` RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"` + // Global and per tenant retention + RetentionPeriod time.Duration `yaml:"retention_period" json:"retention_period"` + StreamRetention []StreamRetention `yaml:"retention_stream" json:"retention_stream"` + // Config for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"` } +type StreamRetention struct { + Period time.Duration `yaml:"period"` + Priority int `yaml:"priority"` + Selector string `yaml:"selector"` + Matchers []*labels.Matcher `yaml:"-"` // populated during validation. +} + // RegisterFlags adds the flags required to config this to the given FlagSet func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") @@ -110,6 +124,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.") f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") + f.DurationVar(&l.RetentionPeriod, "store.retention", 31*24*time.Hour, "How long before chunks will be deleted from the store. (requires compactor retention enabled).") _ = l.PerTenantOverridePeriod.Set("10s") f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") @@ -129,6 +144,30 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { return unmarshal((*plain)(l)) } +// Validate validates that this limits config is valid. +func (l *Limits) Validate() error { + if l.StreamRetention != nil { + for i, rule := range l.StreamRetention { + matchers, err := logql.ParseMatchers(rule.Selector) + if err != nil { + return fmt.Errorf("invalid labels matchers: %w", err) + } + if rule.Period < 24*time.Hour { + return fmt.Errorf("retention period must be >= 24h was %s", rule.Period) + } + // populate matchers during validation + l.StreamRetention[i] = StreamRetention{ + Period: rule.Period, + Priority: rule.Priority, + Selector: rule.Selector, + Matchers: matchers, + } + + } + } + return nil +} + // When we load YAML from disk, we want the various per-customer limits // to default to any values specified on the command line, not default // command line values. This global contains those values. I (Tom) cannot @@ -310,6 +349,16 @@ func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int { return o.getOverridesForUser(userID).RulerMaxRuleGroupsPerTenant } +// RetentionPeriod returns the retention period for a given user. +func (o *Overrides) RetentionPeriod(userID string) time.Duration { + return o.getOverridesForUser(userID).RetentionPeriod +} + +// RetentionPeriod returns the retention period for a given user. +func (o *Overrides) StreamRetention(userID string) []StreamRetention { + return o.getOverridesForUser(userID).StreamRetention +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits(userID)