Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add Gitops dataset configuration options
Browse files Browse the repository at this point in the history
This change adds more options to the existing one (metrics chunk interval) for
setting global dataset configuration using the `-startup.dataset.config` flag.
This makes it a bit easier to configure and maintain these settings vs.
doing it through the database directly.
  • Loading branch information
antekresic committed Apr 11, 2022
1 parent f714452 commit c8970a6
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use the following categories for changes:
- Add database SQL stats as Prometheus metrics. These can be queried under `promscale_sql` namespace [#1193]
- Add alerts for database SQL metrics [#1193]
- Query Jaeger traces directly through Promscale [#1224]
- Additional dataset configuration options via `-startup.dataset.config` flag. Read more (here)[docs/dataset.md] [#1276]

### Changed
- Enable tracing by default [#1213]
Expand Down
17 changes: 14 additions & 3 deletions docs/dataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Example configuration in config.yaml:
startup.dataset.config: |
metrics:
default_chunk_interval: 6h
compress_data: true
ha_lease_refresh: 10s
ha_lease_timeout: 1m
default_retention_period: 90d
traces:
default_retention_period: 30d
```

Above configuration will set the default chunk interval to 6 hours.
Expand All @@ -26,6 +32,11 @@ Note: Any configuration omitted from the configuration structure will be set to

## Default values

| Setting | Type | Default | Description |
|:--------|:----:|:-------:|:------------|
| default_chunk_interval | duration | 8h | Chunk interval used to create hypertable chunks that store the metric data |
| Section | Setting | Type | Default | Description |
|:-------|:-------------------|:------:|:-------:|:---------------------|
| metric | default_chunk_interval | duration | 8h | Chunk interval used to create hypertable chunks that store the metric data |
| metric | compress_data | bool | true | Boolean setting to turn on or off compression of metric data |
| metric | ha_lease_refresh | duration | 10s | High availability lease refresh duration, period after which the lease will be refreshed|
| metric | ha_lease_timeout | duration | 1m | High availability lease timeout duration, period after which the lease will be lost in case it wasn't refreshed |
| metric | default_retention_period | duration | 90d | Retention period for metric data, all data older than this period will be dropped |
| traces | default_retention_period | duration | 90d | Retention period for tracing data, all data older than this period will be dropped |
104 changes: 94 additions & 10 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,114 @@ import (
"gopkg.in/yaml.v2"
)

const defaultChunkInterval = 8 * time.Hour
const (
defaultMetricChunkInterval = 8 * time.Hour
defaultMetricCompression = true
defaultMetricHALeaseRefresh = 10 * time.Second
defaultMetricHALeaseTimeout = 1 * time.Minute
defaultMetricRetentionPeriod = 90 * 24 * time.Hour
defaultTraceRetentionPeriod = 30 * 24 * time.Hour
)

var (
setDefaultMetricChunkIntervalSQL = "SELECT prom_api.set_default_chunk_interval($1)"
setDefaultMetricCompressionSQL = "SELECT prom_api.set_default_compression_setting($1)"
// TODO: Add proper SQL function for setting this.
setDefaultMetricHAReleaseRefreshSQL = `INSERT INTO _prom_catalog.default(key, value)
VALUES ('ha_lease_refresh', $1::text)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`
// TODO: Add proper SQL function for setting this.
setDefaultMetricHAReleaseTimeoutSQL = `INSERT INTO _prom_catalog.default(key, value)
VALUES ('ha_lease_timeout', $1::text)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`
setDefaultMetricRetentionPeriodSQL = "SELECT prom_api.set_default_retention_period($1)"
setDefaultTraceRetentionPeriodSQL = "SELECT ps_trace.set_trace_retention_period($1)"

var setDefaultChunkIntervalSQL = "SELECT prom_api.set_default_chunk_interval($1)"
defaultMetricCompressionVar = defaultMetricCompression
)

// Config represents a dataset config.
type Config struct {
Metrics Metrics `yaml:"metrics"`
Metrics `yaml:"metrics"`
Traces `yaml:"traces"`

withTimescaleDB bool
}

// Metrics contains dataset configuration options for metrics data.
type Metrics struct {
ChunkInterval time.Duration `yaml:"default_chunk_interval"`
ChunkInterval DayDuration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the the value was set.
HALeaseRefresh DayDuration `yaml:"ha_lease_refresh"`
HALeaseTimeout DayDuration `yaml:"ha_lease_timeout"`
RetentionPeriod DayDuration `yaml:"default_retention_period"`
}

// Traces contains dataset configuration options for traces data.
type Traces struct {
RetentionPeriod DayDuration `yaml:"default_retention_period"`
}

func NewConfig(contents string) (cfg Config, err error) {
// NewConfig creates a new dataset config based on the configuration YAML contents and
// whether or now we are running TimescaleDB (used for determining default compression setting).
func NewConfig(contents string, withTimescaleDB bool) (cfg Config, err error) {
err = yaml.Unmarshal([]byte(contents), &cfg)
cfg.withTimescaleDB = withTimescaleDB
return cfg, err
}

// Apply applies the configuration to the database via the supplied DB connection.
func (c *Config) Apply(conn *pgx.Conn) error {
if c.Metrics.ChunkInterval <= 0 {
c.Metrics.ChunkInterval = defaultChunkInterval
c.applyDefaults()

log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval))
log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout))
log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod))
log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod))

queries := map[string]interface{}{
setDefaultMetricChunkIntervalSQL: time.Duration(c.Metrics.ChunkInterval),
setDefaultMetricCompressionSQL: c.Metrics.Compression,
setDefaultMetricHAReleaseRefreshSQL: time.Duration(c.Metrics.HALeaseRefresh),
setDefaultMetricHAReleaseTimeoutSQL: time.Duration(c.Metrics.HALeaseTimeout),
setDefaultMetricRetentionPeriodSQL: time.Duration(c.Metrics.RetentionPeriod),
setDefaultTraceRetentionPeriodSQL: time.Duration(c.Traces.RetentionPeriod),
}

log.Info("msg", fmt.Sprintf("Setting dataset default chunk interval to %s", c.Metrics.ChunkInterval))
for sql, param := range queries {
if _, err := conn.Exec(context.Background(), sql, param); err != nil {
return err
}
}

_, err := conn.Exec(context.Background(), setDefaultChunkIntervalSQL, c.Metrics.ChunkInterval)
return err
return nil
}

func (c *Config) applyDefaults() {
if c.Metrics.ChunkInterval <= 0 {
c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval)
}
if c.Metrics.Compression == nil {
switch c.withTimescaleDB {
case false:
// No TSDB, no compression.
c.Metrics.Compression = &c.withTimescaleDB
default:
c.Metrics.Compression = &defaultMetricCompressionVar
}
}
if c.Metrics.HALeaseRefresh <= 0 {
c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh)
}
if c.Metrics.HALeaseTimeout <= 0 {
c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout)
}
if c.Metrics.RetentionPeriod <= 0 {
c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod)
}
if c.Traces.RetentionPeriod <= 0 {
c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod)
}
}
117 changes: 113 additions & 4 deletions pkg/dataset/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/stretchr/testify/require"
)

var testCompressionSetting = true

func TestNewConfig(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -20,16 +22,64 @@ func TestNewConfig(t *testing.T) {
err: "yaml: unmarshal errors:\n line 1: cannot unmarshal !!str `invalid` into dataset.Config",
},
{
name: "happy path",
input: "metrics:\n default_chunk_interval: 3h",
cfg: Config{Metrics: Metrics{ChunkInterval: 3 * time.Hour}},
name: "invalid duration format 1",
input: `metrics:
default_retention_period: d3d`,
err: `time: invalid duration "d3d"`,
},
{
name: "invalid duration format 2",
input: `metrics:
default_retention_period: 3d2h2`,
err: `time: invalid duration "3d2h2"`,
},
{
name: "invalid duration format 3",
input: `metrics:
default_retention_period: 3d2d`,
err: `time: invalid duration "3d2d"`,
},
{
name: "duration in days and hours",
input: `metrics:
default_retention_period: 3d2h`,
cfg: Config{
withTimescaleDB: true,
Metrics: Metrics{
RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour),
},
},
},
{
name: "happy path",
input: `metrics:
default_chunk_interval: 3h
compress_data: true
ha_lease_refresh: 2m
ha_lease_timeout: 5s
default_retention_period: 30d
traces:
default_retention_period: 15d`,
cfg: Config{
withTimescaleDB: true,
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
},
},
},
}

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {

cfg, err := NewConfig(c.input)
cfg, err := NewConfig(c.input, true)

if c.err != "" {
require.EqualError(t, err, c.err)
Expand All @@ -40,3 +90,62 @@ func TestNewConfig(t *testing.T) {
})
}
}

func TestApplyDefaults(t *testing.T) {
c := Config{withTimescaleDB: true}
c.applyDefaults()

require.Equal(
t,
Config{
withTimescaleDB: true,
Metrics: Metrics{
ChunkInterval: DayDuration(defaultMetricChunkInterval),
Compression: &defaultMetricCompressionVar,
HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh),
HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout),
RetentionPeriod: DayDuration(defaultMetricRetentionPeriod),
},
Traces: Traces{
RetentionPeriod: DayDuration(defaultTraceRetentionPeriod),
},
},
c,
)

untouched := Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
},
}

copyConfig := untouched
copyConfig.applyDefaults()

require.Equal(t, untouched, copyConfig)

// No TSDB, no compression by default.
c = Config{withTimescaleDB: false}
noCompression := false
c.applyDefaults()

require.Equal(t, c, Config{
Metrics: Metrics{
ChunkInterval: DayDuration(defaultMetricChunkInterval),
Compression: &noCompression,
HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh),
HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout),
RetentionPeriod: DayDuration(defaultMetricRetentionPeriod),
},
Traces: Traces{
RetentionPeriod: DayDuration(defaultTraceRetentionPeriod),
},
})
}
66 changes: 66 additions & 0 deletions pkg/dataset/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package dataset

import (
"fmt"
"strings"
"time"
)

const (
dayUnit = 'd'
unknownUnitDErrorPrefix = `time: unknown unit "d"`
)

// DayDuration acts like a time.Duration with support for "d" unit
// which is used for specifying number of days in duration.
type DayDuration time.Duration

// UnmarshalText unmarshals strings into DayDuration values while
// handling the day unit. It leans heavily into time.ParseDuration.
func (d *DayDuration) UnmarshalText(s []byte) error {
val, err := time.ParseDuration(string(s))
if err != nil {
// Check for specific error indicating we are using days unit.
if !strings.HasPrefix(err.Error(), unknownUnitDErrorPrefix) {
return err
}

val, err = handleDays(s)
if err != nil {
return err
}
}
*d = DayDuration(val)
return nil
}

func handleDays(s []byte) (time.Duration, error) {
parts := strings.Split(string(s), string(dayUnit))

if len(parts) > 2 {
return 0, fmt.Errorf(`time: invalid duration "%s"`, string(s))
}

// Treating first part as hours and multiplying with 24 to get duration in days.
days, err := time.ParseDuration(parts[0] + "h")
if err != nil {
return 0, fmt.Errorf(`time: invalid duration "%s"`, string(s))
}
days = days * 24

if s[len(s)-1] == dayUnit {
return days, nil
}

val, err := time.ParseDuration(parts[1])
if err != nil {
return 0, fmt.Errorf(`time: invalid duration "%s"`, string(s))
}

return val + days, nil
}

// String returns a string value of DayDuration.
func (d DayDuration) String() string {
return time.Duration(d).String()
}
Loading

0 comments on commit c8970a6

Please sign in to comment.