-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom Retention #3642
Custom Retention #3642
Changes from all commits
8ee10a1
934d04d
e87510f
337c107
a68cb4c
2ae282d
7ca79a8
22af468
0af8883
ce3b995
edb035c
0c0032f
773d403
cdc752d
a93bb91
87822c7
42dd29e
8468cfb
b560196
8f85309
b0aa3dd
bb6c010
a10c898
3e21016
f66e6be
efb1dcd
520fad5
f644e9b
a6cceaf
9a8f43c
74cb42b
78a4673
6091368
7086c12
ceb3b8f
0e9ad61
80f8cc4
456bb52
520d393
9616c44
a7dc298
af0edc9
b451e75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package loki | ||
|
||
import ( | ||
"context" | ||
"flag" | ||
"io" | ||
"io/ioutil" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/cortexproject/cortex/pkg/util/runtimeconfig" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/grafana/loki/pkg/validation" | ||
) | ||
|
||
func Test_LoadRetentionRules(t *testing.T) { | ||
overrides := newTestOverrides(t, | ||
` | ||
overrides: | ||
"1": | ||
creation_grace_period: 48h | ||
"29": | ||
creation_grace_period: 48h | ||
ingestion_burst_size_mb: 140 | ||
ingestion_rate_mb: 120 | ||
max_concurrent_tail_requests: 1000 | ||
max_global_streams_per_user: 100000 | ||
max_label_names_per_series: 30 | ||
max_query_parallelism: 256 | ||
split_queries_by_interval: 15m | ||
retention_period: 1440h | ||
retention_stream: | ||
- selector: '{app="foo"}' | ||
period: 48h | ||
priority: 10 | ||
- selector: '{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}' | ||
period: 24h | ||
priority: 5 | ||
`) | ||
require.Equal(t, 31*24*time.Hour, overrides.RetentionPeriod("1")) // default | ||
require.Equal(t, 2*30*24*time.Hour, overrides.RetentionPeriod("29")) // overrides | ||
require.Equal(t, []validation.StreamRetention(nil), overrides.StreamRetention("1")) | ||
require.Equal(t, []validation.StreamRetention{ | ||
{Period: 48 * time.Hour, Priority: 10, Selector: `{app="foo"}`, Matchers: []*labels.Matcher{ | ||
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), | ||
}}, | ||
{Period: 24 * time.Hour, Priority: 5, Selector: `{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}`, Matchers: []*labels.Matcher{ | ||
labels.MustNewMatcher(labels.MatchEqual, "namespace", "bar"), | ||
labels.MustNewMatcher(labels.MatchRegexp, "cluster", "fo.*|b.+|[1-2]"), | ||
}}, | ||
}, overrides.StreamRetention("29")) | ||
} | ||
|
||
func Test_ValidateRules(t *testing.T) { | ||
_, err := loadRuntimeConfig(strings.NewReader( | ||
` | ||
overrides: | ||
"29": | ||
retention_stream: | ||
- selector: '{app=foo"}' | ||
period: 48h | ||
priority: 10 | ||
- selector: '{namespace="bar", cluster=~"fo.*|b.+|[1-2]"}' | ||
period: 24h | ||
priority: 10 | ||
`)) | ||
require.Equal(t, "invalid override for tenant 29: invalid labels matchers: parse error at line 1, col 6: syntax error: unexpected IDENTIFIER, expecting STRING", err.Error()) | ||
_, err = loadRuntimeConfig(strings.NewReader( | ||
` | ||
overrides: | ||
"29": | ||
retention_stream: | ||
- selector: '{app="foo"}' | ||
period: 5h | ||
priority: 10 | ||
`)) | ||
require.Equal(t, "invalid override for tenant 29: retention period must be >= 24h was 5h0m0s", err.Error()) | ||
} | ||
|
||
func newTestOverrides(t *testing.T, yaml string) *validation.Overrides { | ||
t.Helper() | ||
f, err := ioutil.TempFile(t.TempDir(), "bar") | ||
require.NoError(t, err) | ||
path := f.Name() | ||
// fake loader to load from string instead of file. | ||
loader := func(_ io.Reader) (interface{}, error) { | ||
return loadRuntimeConfig(strings.NewReader(yaml)) | ||
} | ||
cfg := runtimeconfig.ManagerConfig{ | ||
ReloadPeriod: 1 * time.Second, | ||
Loader: loader, | ||
LoadPath: path, | ||
} | ||
flagset := flag.NewFlagSet("", flag.PanicOnError) | ||
var defaults validation.Limits | ||
defaults.RegisterFlags(flagset) | ||
require.NoError(t, flagset.Parse(nil)) | ||
validation.SetDefaultLimitsForYAMLUnmarshalling(defaults) | ||
|
||
runtimeConfig, err := runtimeconfig.NewRuntimeConfigManager(cfg, prometheus.DefaultRegisterer) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, runtimeConfig.StartAsync(context.Background())) | ||
require.NoError(t, runtimeConfig.AwaitRunning(context.Background())) | ||
defer func() { | ||
runtimeConfig.StopAsync() | ||
require.NoError(t, runtimeConfig.AwaitTerminated(context.Background())) | ||
}() | ||
|
||
overrides, err := validation.NewOverrides(defaults, tenantLimitsFromRuntimeConfig(runtimeConfig)) | ||
require.NoError(t, err) | ||
return overrides | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"path/filepath" | ||
"reflect" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/cortexproject/cortex/pkg/chunk" | ||
|
@@ -17,17 +18,24 @@ import ( | |
"github.com/go-kit/kit/log/level" | ||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
loki_storage "github.com/grafana/loki/pkg/storage" | ||
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" | ||
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" | ||
"github.com/grafana/loki/pkg/storage/stores/util" | ||
errUtil "github.com/grafana/loki/pkg/util" | ||
) | ||
|
||
const delimiter = "/" | ||
|
||
type Config struct { | ||
WorkingDirectory string `yaml:"working_directory"` | ||
SharedStoreType string `yaml:"shared_store"` | ||
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` | ||
CompactionInterval time.Duration `yaml:"compaction_interval"` | ||
WorkingDirectory string `yaml:"working_directory"` | ||
SharedStoreType string `yaml:"shared_store"` | ||
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` | ||
CompactionInterval time.Duration `yaml:"compaction_interval"` | ||
RetentionEnabled bool `yaml:"retention_enabled"` | ||
RetentionInterval time.Duration `yaml:"retention_interval"` | ||
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` | ||
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` | ||
} | ||
|
||
// RegisterFlags registers flags. | ||
|
@@ -36,6 +44,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | |
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") | ||
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") | ||
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.") | ||
f.DurationVar(&cfg.RetentionInterval, "boltdb.shipper.compactor.retention-interval", 10*time.Minute, "Interval at which to re-run the retention operation.") | ||
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.") | ||
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") | ||
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") | ||
} | ||
|
||
func (cfg *Config) IsDefaults() bool { | ||
|
@@ -53,11 +65,13 @@ type Compactor struct { | |
|
||
cfg Config | ||
objectClient chunk.ObjectClient | ||
tableMarker *retention.Marker | ||
sweeper *retention.Sweeper | ||
|
||
metrics *metrics | ||
} | ||
|
||
func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registerer) (*Compactor, error) { | ||
func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { | ||
if cfg.IsDefaults() { | ||
return nil, errors.New("Must specify compactor config") | ||
} | ||
|
@@ -71,11 +85,24 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe | |
if err != nil { | ||
return nil, err | ||
} | ||
prefixedClient := util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix) | ||
|
||
retentionWorkDir := filepath.Join(cfg.WorkingDirectory, "retention") | ||
|
||
sweeper, err := retention.NewSweeper(retentionWorkDir, retention.NewDeleteClient(objectClient), cfg.RetentionDeleteWorkCount, cfg.RetentionDeleteDelay, r) | ||
if err != nil { | ||
return nil, err | ||
} | ||
marker, err := retention.NewMarker(retentionWorkDir, schemaConfig, prefixedClient, retention.NewExpirationChecker(limits), r) | ||
if err != nil { | ||
return nil, err | ||
} | ||
compactor := Compactor{ | ||
cfg: cfg, | ||
objectClient: util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix), | ||
objectClient: prefixedClient, | ||
metrics: newMetrics(r), | ||
tableMarker: marker, | ||
sweeper: sweeper, | ||
} | ||
|
||
compactor.Service = services.NewBasicService(nil, compactor.loop, nil) | ||
|
@@ -84,28 +111,68 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe | |
|
||
func (c *Compactor) loop(ctx context.Context) error { | ||
runCompaction := func() { | ||
err := c.Run(ctx) | ||
err := c.RunCompaction(ctx) | ||
if err != nil { | ||
level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) | ||
} | ||
} | ||
runRetention := func() { | ||
err := c.RunRetention(ctx) | ||
if err != nil { | ||
level.Error(util_log.Logger).Log("msg", "failed to run retention", "err", err) | ||
} | ||
} | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
runCompaction() | ||
|
||
runCompaction() | ||
|
||
ticker := time.NewTicker(c.cfg.CompactionInterval) | ||
defer ticker.Stop() | ||
ticker := time.NewTicker(c.cfg.CompactionInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
runCompaction() | ||
case <-ctx.Done(): | ||
return nil | ||
for { | ||
select { | ||
case <-ticker.C: | ||
runCompaction() | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
if c.cfg.RetentionEnabled { | ||
wg.Add(2) | ||
go func() { | ||
// starts the chunk sweeper | ||
defer func() { | ||
c.sweeper.Stop() | ||
wg.Done() | ||
}() | ||
c.sweeper.Start() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need some way to lock the tables when mark operation is running for a table to avoid running compaction on them simultaneously and hence avoid deleted index to re-appear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently I don't touch table that are not compacted. |
||
<-ctx.Done() | ||
}() | ||
go func() { | ||
// start the index marker | ||
defer wg.Done() | ||
ticker := time.NewTicker(c.cfg.RetentionInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
runRetention() | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
|
||
wg.Wait() | ||
return nil | ||
} | ||
|
||
func (c *Compactor) Run(ctx context.Context) error { | ||
func (c *Compactor) RunCompaction(ctx context.Context) error { | ||
status := statusSuccess | ||
start := time.Now() | ||
|
||
|
@@ -152,3 +219,40 @@ func (c *Compactor) Run(ctx context.Context) error { | |
|
||
return nil | ||
} | ||
|
||
func (c *Compactor) RunRetention(ctx context.Context) error { | ||
status := statusSuccess | ||
start := time.Now() | ||
|
||
defer func() { | ||
level.Debug(util_log.Logger).Log("msg", "finished to processing retention on all tables", "status", status, "duration", time.Since(start)) | ||
c.metrics.retentionOperationTotal.WithLabelValues(status).Inc() | ||
if status == statusSuccess { | ||
c.metrics.retentionOperationDurationSeconds.Set(time.Since(start).Seconds()) | ||
c.metrics.retentionOperationLastSuccess.SetToCurrentTime() | ||
} | ||
}() | ||
level.Debug(util_log.Logger).Log("msg", "starting to processing retention on all all tables") | ||
|
||
_, dirs, err := c.objectClient.List(ctx, "", delimiter) | ||
if err != nil { | ||
status = statusFailure | ||
return err | ||
} | ||
|
||
tables := make([]string, len(dirs)) | ||
for i, dir := range dirs { | ||
tables[i] = strings.TrimSuffix(string(dir), delimiter) | ||
} | ||
|
||
var errs errUtil.MultiError | ||
|
||
for _, tableName := range tables { | ||
if err := c.tableMarker.MarkForDelete(ctx, tableName); err != nil { | ||
level.Error(util_log.Logger).Log("msg", "failed to mark table for deletes", "table", tableName, "err", err) | ||
errs.Add(err) | ||
status = statusFailure | ||
} | ||
} | ||
return errs.Err() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us add a comment that we assume chunks would always be stored in the same object store as the index files?
People who are migrating between stores might face an issue here.