Skip to content

Commit

Permalink
allow applying retention at different interval than compaction with a…
Browse files Browse the repository at this point in the history
… config (#4736)

* allow applying retention at different interval than compaction with a config

* add changelog entry
  • Loading branch information
sandeepsukhani authored Nov 12, 2021
1 parent 11071d4 commit 02d8846
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Release notes for 2.4.1 can be found on the [release notes page](https://grafana
* [4687](https://github.com/grafana/loki/pull/4687) **owen-d**: overrides checks for nil tenant limits on AllByUserID
* [4683](https://github.com/grafana/loki/pull/4683) **owen-d**: Adds replication_factor doc to common config
* [4681](https://github.com/grafana/loki/pull/4681) **slim-bean**: Loki: check new Read target when initializing boltdb-shipper store
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction

# 2.4.0 (2021/11/05)

Expand Down
31 changes: 26 additions & 5 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Config struct {
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
Expand All @@ -71,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.DurationVar(&cfg.ApplyRetentionInterval, "boltdb.shipper.compactor.apply-retention-interval", 0, "Interval at which to apply/enforce retention. 0 means run at same interval as compaction. If non-zero, it should always be a multiple of compaction interval.")
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.")
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
Expand All @@ -84,6 +86,10 @@ func (cfg *Config) Validate() error {
if cfg.MaxCompactionParallelism < 1 {
return errors.New("max compaction parallelism must be >= 1")
}
if cfg.RetentionEnabled && cfg.ApplyRetentionInterval != 0 && cfg.ApplyRetentionInterval%cfg.CompactionInterval != 0 {
return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
}

return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

Expand Down Expand Up @@ -338,12 +344,24 @@ func (c *Compactor) runCompactions(ctx context.Context) {
break
}

lastRetentionRunAt := time.Unix(0, 0)
runCompaction := func() {
err := c.RunCompaction(ctx)
applyRetention := false
if c.cfg.RetentionEnabled && time.Since(lastRetentionRunAt) >= c.cfg.ApplyRetentionInterval {
level.Info(util_log.Logger).Log("msg", "applying retention with compaction")
applyRetention = true
}

err := c.RunCompaction(ctx, applyRetention)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err)
}

if applyRetention {
lastRetentionRunAt = time.Now()
}
}

c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -380,7 +398,7 @@ func (c *Compactor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), c.subservices)
}

func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error {
table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err)
Expand All @@ -389,7 +407,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {

interval := retention.ExtractIntervalFromTableName(tableName)
intervalMayHaveExpiredChunks := false
if c.cfg.RetentionEnabled {
if c.cfg.RetentionEnabled && applyRetention {
intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval)
}

Expand All @@ -401,7 +419,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
return nil
}

func (c *Compactor) RunCompaction(ctx context.Context) error {
func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error {
status := statusSuccess
start := time.Now()

Expand All @@ -415,6 +433,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
if status == statusSuccess {
c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds())
c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
if applyRetention {
c.metrics.applyRetentionLastSuccess.SetToCurrentTime()
}
}

if c.cfg.RetentionEnabled {
Expand Down Expand Up @@ -453,7 +474,7 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
}

level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName)
err = c.CompactTable(ctx, tableName)
err = c.CompactTable(ctx, tableName, applyRetention)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestCompactor_RunCompaction(t *testing.T) {
}

compactor := setupTestCompactor(t, tempDir)
err = compactor.RunCompaction(context.Background())
err = compactor.RunCompaction(context.Background(), true)
require.NoError(t, err)

for name := range tables {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/stores/shipper/compactor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type metrics struct {
compactTablesOperationTotal *prometheus.CounterVec
compactTablesOperationDurationSeconds prometheus.Gauge
compactTablesOperationLastSuccess prometheus.Gauge
applyRetentionLastSuccess prometheus.Gauge
compactorRunning prometheus.Gauge
}

Expand All @@ -34,6 +35,11 @@ func newMetrics(r prometheus.Registerer) *metrics {
Name: "compact_tables_operation_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful compaction run",
}),
applyRetentionLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "apply_retention_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful retention run",
}),
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "compactor_running",
Expand Down

0 comments on commit 02d8846

Please sign in to comment.