diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bfd681212..5829ccf858 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use the following categories for changes: - Add database SQL stats as Prometheus metrics. These can be queried under `promscale_sql` namespace [#1193] - Add alerts for database SQL metrics [#1193] - Query Jaeger traces directly through Promscale [#1224] +- Additional dataset configuration options via `-startup.dataset.config` flag. Read more (here)[docs/dataset.md] [#1276] ### Changed - Enable tracing by default [#1213] diff --git a/docs/dataset.md b/docs/dataset.md index 09f6832f63..0fe9de4cf8 100644 --- a/docs/dataset.md +++ b/docs/dataset.md @@ -17,6 +17,12 @@ Example configuration in config.yaml: startup.dataset.config: | metrics: default_chunk_interval: 6h + compress_data: true + ha_lease_refresh: 10s + ha_lease_timeout: 1m + default_retention_period: 90d + traces: + default_retention_period: 30d ``` Above configuration will set the default chunk interval to 6 hours. @@ -26,6 +32,11 @@ Note: Any configuration omitted from the configuration structure will be set to ## Default values -| Setting | Type | Default | Description | -|:--------|:----:|:-------:|:------------| -| default_chunk_interval | duration | 8h | Chunk interval used to create hypertable chunks that store the metric data | +| Section | Setting | Type | Default | Description | +|:-------|:-------------------|:------:|:-------:|:---------------------| +| metric | default_chunk_interval | duration | 8h | Chunk interval used to create hypertable chunks that store the metric data | +| metric | compress_data | bool | true | Boolean setting to turn on or off compression of metric data | +| metric | ha_lease_refresh | duration | 10s | High availability lease refresh duration, period after which the lease will be refreshed| +| metric | ha_lease_timeout | duration | 1m | High availability lease timeout duration, period after which the lease will be lost in case it wasn't refreshed | +| metric | default_retention_period | duration | 90d | Retention period for metric data, all data older than this period will be dropped | +| traces | default_retention_period | duration | 90d | Retention period for tracing data, all data older than this period will be dropped | diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index 17f888a271..d21595c26d 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -10,30 +10,114 @@ import ( "gopkg.in/yaml.v2" ) -const defaultChunkInterval = 8 * time.Hour +const ( + defaultMetricChunkInterval = 8 * time.Hour + defaultMetricCompression = true + defaultMetricHALeaseRefresh = 10 * time.Second + defaultMetricHALeaseTimeout = 1 * time.Minute + defaultMetricRetentionPeriod = 90 * 24 * time.Hour + defaultTraceRetentionPeriod = 30 * 24 * time.Hour +) + +var ( + setDefaultMetricChunkIntervalSQL = "SELECT prom_api.set_default_chunk_interval($1)" + setDefaultMetricCompressionSQL = "SELECT prom_api.set_default_compression_setting($1)" + // TODO: Add proper SQL function for setting this. + setDefaultMetricHAReleaseRefreshSQL = `INSERT INTO _prom_catalog.default(key, value) + VALUES ('ha_lease_refresh', $1::text) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value` + // TODO: Add proper SQL function for setting this. + setDefaultMetricHAReleaseTimeoutSQL = `INSERT INTO _prom_catalog.default(key, value) + VALUES ('ha_lease_timeout', $1::text) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value` + setDefaultMetricRetentionPeriodSQL = "SELECT prom_api.set_default_retention_period($1)" + setDefaultTraceRetentionPeriodSQL = "SELECT ps_trace.set_trace_retention_period($1)" -var setDefaultChunkIntervalSQL = "SELECT prom_api.set_default_chunk_interval($1)" + defaultMetricCompressionVar = defaultMetricCompression +) +// Config represents a dataset config. type Config struct { - Metrics Metrics `yaml:"metrics"` + Metrics `yaml:"metrics"` + Traces `yaml:"traces"` + + withTimescaleDB bool } +// Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval time.Duration `yaml:"default_chunk_interval"` + ChunkInterval DayDuration `yaml:"default_chunk_interval"` + Compression *bool `yaml:"compress_data"` // Using pointer to check if the the value was set. + HALeaseRefresh DayDuration `yaml:"ha_lease_refresh"` + HALeaseTimeout DayDuration `yaml:"ha_lease_timeout"` + RetentionPeriod DayDuration `yaml:"default_retention_period"` +} + +// Traces contains dataset configuration options for traces data. +type Traces struct { + RetentionPeriod DayDuration `yaml:"default_retention_period"` } -func NewConfig(contents string) (cfg Config, err error) { +// NewConfig creates a new dataset config based on the configuration YAML contents and +// whether or now we are running TimescaleDB (used for determining default compression setting). +func NewConfig(contents string, withTimescaleDB bool) (cfg Config, err error) { err = yaml.Unmarshal([]byte(contents), &cfg) + cfg.withTimescaleDB = withTimescaleDB return cfg, err } +// Apply applies the configuration to the database via the supplied DB connection. func (c *Config) Apply(conn *pgx.Conn) error { - if c.Metrics.ChunkInterval <= 0 { - c.Metrics.ChunkInterval = defaultChunkInterval + c.applyDefaults() + + log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval)) + log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression)) + log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh)) + log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout)) + log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod)) + log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod)) + + queries := map[string]interface{}{ + setDefaultMetricChunkIntervalSQL: time.Duration(c.Metrics.ChunkInterval), + setDefaultMetricCompressionSQL: c.Metrics.Compression, + setDefaultMetricHAReleaseRefreshSQL: time.Duration(c.Metrics.HALeaseRefresh), + setDefaultMetricHAReleaseTimeoutSQL: time.Duration(c.Metrics.HALeaseTimeout), + setDefaultMetricRetentionPeriodSQL: time.Duration(c.Metrics.RetentionPeriod), + setDefaultTraceRetentionPeriodSQL: time.Duration(c.Traces.RetentionPeriod), } - log.Info("msg", fmt.Sprintf("Setting dataset default chunk interval to %s", c.Metrics.ChunkInterval)) + for sql, param := range queries { + if _, err := conn.Exec(context.Background(), sql, param); err != nil { + return err + } + } - _, err := conn.Exec(context.Background(), setDefaultChunkIntervalSQL, c.Metrics.ChunkInterval) - return err + return nil +} + +func (c *Config) applyDefaults() { + if c.Metrics.ChunkInterval <= 0 { + c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval) + } + if c.Metrics.Compression == nil { + switch c.withTimescaleDB { + case false: + // No TSDB, no compression. + c.Metrics.Compression = &c.withTimescaleDB + default: + c.Metrics.Compression = &defaultMetricCompressionVar + } + } + if c.Metrics.HALeaseRefresh <= 0 { + c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh) + } + if c.Metrics.HALeaseTimeout <= 0 { + c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout) + } + if c.Metrics.RetentionPeriod <= 0 { + c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod) + } + if c.Traces.RetentionPeriod <= 0 { + c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod) + } } diff --git a/pkg/dataset/config_test.go b/pkg/dataset/config_test.go index fae851d182..f9d2d5bde1 100644 --- a/pkg/dataset/config_test.go +++ b/pkg/dataset/config_test.go @@ -7,6 +7,8 @@ import ( "github.com/stretchr/testify/require" ) +var testCompressionSetting = true + func TestNewConfig(t *testing.T) { testCases := []struct { name string @@ -20,16 +22,64 @@ func TestNewConfig(t *testing.T) { err: "yaml: unmarshal errors:\n line 1: cannot unmarshal !!str `invalid` into dataset.Config", }, { - name: "happy path", - input: "metrics:\n default_chunk_interval: 3h", - cfg: Config{Metrics: Metrics{ChunkInterval: 3 * time.Hour}}, + name: "invalid duration format 1", + input: `metrics: + default_retention_period: d3d`, + err: `time: invalid duration "d3d"`, + }, + { + name: "invalid duration format 2", + input: `metrics: + default_retention_period: 3d2h2`, + err: `time: invalid duration "3d2h2"`, + }, + { + name: "invalid duration format 3", + input: `metrics: + default_retention_period: 3d2d`, + err: `time: invalid duration "3d2d"`, + }, + { + name: "duration in days and hours", + input: `metrics: + default_retention_period: 3d2h`, + cfg: Config{ + withTimescaleDB: true, + Metrics: Metrics{ + RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour), + }, + }, + }, + { + name: "happy path", + input: `metrics: + default_chunk_interval: 3h + compress_data: true + ha_lease_refresh: 2m + ha_lease_timeout: 5s + default_retention_period: 30d +traces: + default_retention_period: 15d`, + cfg: Config{ + withTimescaleDB: true, + Metrics: Metrics{ + ChunkInterval: DayDuration(3 * time.Hour), + Compression: &testCompressionSetting, + HALeaseRefresh: DayDuration(2 * time.Minute), + HALeaseTimeout: DayDuration(5 * time.Second), + RetentionPeriod: DayDuration(30 * 24 * time.Hour), + }, + Traces: Traces{ + RetentionPeriod: DayDuration(15 * 24 * time.Hour), + }, + }, }, } for _, c := range testCases { t.Run(c.name, func(t *testing.T) { - cfg, err := NewConfig(c.input) + cfg, err := NewConfig(c.input, true) if c.err != "" { require.EqualError(t, err, c.err) @@ -40,3 +90,62 @@ func TestNewConfig(t *testing.T) { }) } } + +func TestApplyDefaults(t *testing.T) { + c := Config{withTimescaleDB: true} + c.applyDefaults() + + require.Equal( + t, + Config{ + withTimescaleDB: true, + Metrics: Metrics{ + ChunkInterval: DayDuration(defaultMetricChunkInterval), + Compression: &defaultMetricCompressionVar, + HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh), + HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout), + RetentionPeriod: DayDuration(defaultMetricRetentionPeriod), + }, + Traces: Traces{ + RetentionPeriod: DayDuration(defaultTraceRetentionPeriod), + }, + }, + c, + ) + + untouched := Config{ + Metrics: Metrics{ + ChunkInterval: DayDuration(3 * time.Hour), + Compression: &testCompressionSetting, + HALeaseRefresh: DayDuration(2 * time.Minute), + HALeaseTimeout: DayDuration(5 * time.Second), + RetentionPeriod: DayDuration(30 * 24 * time.Hour), + }, + Traces: Traces{ + RetentionPeriod: DayDuration(15 * 24 * time.Hour), + }, + } + + copyConfig := untouched + copyConfig.applyDefaults() + + require.Equal(t, untouched, copyConfig) + + // No TSDB, no compression by default. + c = Config{withTimescaleDB: false} + noCompression := false + c.applyDefaults() + + require.Equal(t, c, Config{ + Metrics: Metrics{ + ChunkInterval: DayDuration(defaultMetricChunkInterval), + Compression: &noCompression, + HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh), + HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout), + RetentionPeriod: DayDuration(defaultMetricRetentionPeriod), + }, + Traces: Traces{ + RetentionPeriod: DayDuration(defaultTraceRetentionPeriod), + }, + }) +} diff --git a/pkg/dataset/duration.go b/pkg/dataset/duration.go new file mode 100644 index 0000000000..14d6b9bfaf --- /dev/null +++ b/pkg/dataset/duration.go @@ -0,0 +1,66 @@ +package dataset + +import ( + "fmt" + "strings" + "time" +) + +const ( + dayUnit = 'd' + unknownUnitDErrorPrefix = `time: unknown unit "d"` +) + +// DayDuration acts like a time.Duration with support for "d" unit +// which is used for specifying number of days in duration. +type DayDuration time.Duration + +// UnmarshalText unmarshals strings into DayDuration values while +// handling the day unit. It leans heavily into time.ParseDuration. +func (d *DayDuration) UnmarshalText(s []byte) error { + val, err := time.ParseDuration(string(s)) + if err != nil { + // Check for specific error indicating we are using days unit. + if !strings.HasPrefix(err.Error(), unknownUnitDErrorPrefix) { + return err + } + + val, err = handleDays(s) + if err != nil { + return err + } + } + *d = DayDuration(val) + return nil +} + +func handleDays(s []byte) (time.Duration, error) { + parts := strings.Split(string(s), string(dayUnit)) + + if len(parts) > 2 { + return 0, fmt.Errorf(`time: invalid duration "%s"`, string(s)) + } + + // Treating first part as hours and multiplying with 24 to get duration in days. + days, err := time.ParseDuration(parts[0] + "h") + if err != nil { + return 0, fmt.Errorf(`time: invalid duration "%s"`, string(s)) + } + days = days * 24 + + if s[len(s)-1] == dayUnit { + return days, nil + } + + val, err := time.ParseDuration(parts[1]) + if err != nil { + return 0, fmt.Errorf(`time: invalid duration "%s"`, string(s)) + } + + return val + days, nil +} + +// String returns a string value of DayDuration. +func (d DayDuration) String() string { + return time.Duration(d).String() +} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index cfc07e6765..6e129dd9ca 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -122,7 +122,7 @@ func CreateClient(cfg *Config) (*pgclient.Client, error) { } if cfg.InstallExtensions { - // Only check for background workers if TimessaleDB is installed. + // Only check for background workers if TimescaleDB is installed. if notOk, err := isBGWLessThanDBs(conn); err != nil { return nil, fmt.Errorf("Error checking the number of background workers: %w", err) } else if notOk { @@ -131,7 +131,7 @@ func CreateClient(cfg *Config) (*pgclient.Client, error) { } } - isLicenseOSS, err := isTimescaleDBOSS(conn) + isTimescaleDB, isLicenseOSS, err := getDatabaseDetails(conn) if err != nil { return nil, fmt.Errorf("fetching license information: %w", err) } @@ -164,7 +164,7 @@ func CreateClient(cfg *Config) (*pgclient.Client, error) { } if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(conn, cfg.DatasetConfig) + err = ApplyDatasetConfig(conn, cfg.DatasetConfig, isTimescaleDB) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } @@ -180,24 +180,20 @@ func CreateClient(cfg *Config) (*pgclient.Client, error) { return client, nil } -func isTimescaleDBOSS(conn *pgx.Conn) (bool, error) { - var ( - isTimescaleDB bool - isLicenseOSS bool - ) - err := conn.QueryRow(context.Background(), "SELECT _prom_catalog.is_timescaledb_installed()").Scan(&isTimescaleDB) +func getDatabaseDetails(conn *pgx.Conn) (isTimescaleDB, isLicenseOSS bool, err error) { + err = conn.QueryRow(context.Background(), "SELECT _prom_catalog.is_timescaledb_installed()").Scan(&isTimescaleDB) if err != nil { - return false, fmt.Errorf("error fetching whether TimescaleDB is installed: %w", err) + return false, false, fmt.Errorf("error fetching whether TimescaleDB is installed: %w", err) } if !isTimescaleDB { // Return false so that we don't warn for OSS TimescaleDB. - return false, nil + return false, false, nil } err = conn.QueryRow(context.Background(), "SELECT _prom_catalog.is_timescaledb_oss()").Scan(&isLicenseOSS) if err != nil { - return false, fmt.Errorf("error fetching TimescaleDB license: %w", err) + return isTimescaleDB, false, fmt.Errorf("error fetching TimescaleDB license: %w", err) } - return isLicenseOSS, nil + return isTimescaleDB, isLicenseOSS, nil } // isBGWLessThanDBs checks if the background workers count is less than the database count. It should be @@ -225,8 +221,8 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(conn *pgx.Conn, cfgFilename string) error { - cfg, err := dataset.NewConfig(cfgFilename) +func ApplyDatasetConfig(conn *pgx.Conn, cfgFilename string, withTimescaleDB bool) error { + cfg, err := dataset.NewConfig(cfgFilename, withTimescaleDB) if err != nil { return err } diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index 9b00b502ea..a06d36e209 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -12,36 +12,98 @@ import ( ) func TestDatasetConfigApply(t *testing.T) { - withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) { conn, err := dbOwner.Acquire(context.Background()) require.NoError(t, err) defer conn.Release() + disableCompression := false pgxConn := conn.Conn() - require.Equal(t, getDefaultChunkInterval(t, pgxConn), 8*time.Hour) + require.Equal(t, 8*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) + require.Equal(t, *useTimescaleDB, getMetricsDefaultCompressionSetting(t, pgxConn)) + require.Equal(t, 10*time.Second, getMetricsDefaultHALeaseRefresh(t, pgxConn)) + require.Equal(t, 1*time.Minute, getMetricsDefaultHALeaseTimeout(t, pgxConn)) + require.Equal(t, 90*24*time.Hour, getMetricsDefaultRetention(t, pgxConn)) + require.Equal(t, 30*24*time.Hour, getTracesDefaultRetention(t, pgxConn)) - cfg := dataset.Config{Metrics: dataset.Metrics{ChunkInterval: 4 * time.Hour}} + cfg := dataset.Config{ + Metrics: dataset.Metrics{ + ChunkInterval: dataset.DayDuration(4 * time.Hour), + Compression: &disableCompression, + HALeaseRefresh: dataset.DayDuration(15 * time.Second), + HALeaseTimeout: dataset.DayDuration(2 * time.Minute), + RetentionPeriod: dataset.DayDuration(15 * 24 * time.Hour), + }, + Traces: dataset.Traces{ + RetentionPeriod: dataset.DayDuration(10 * 24 * time.Hour), + }, + } err = cfg.Apply(pgxConn) require.NoError(t, err) - require.Equal(t, getDefaultChunkInterval(t, pgxConn), 4*time.Hour) + require.Equal(t, 4*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) + require.Equal(t, false, getMetricsDefaultCompressionSetting(t, pgxConn)) + require.Equal(t, 15*time.Second, getMetricsDefaultHALeaseRefresh(t, pgxConn)) + require.Equal(t, 2*time.Minute, getMetricsDefaultHALeaseTimeout(t, pgxConn)) + require.Equal(t, 15*24*time.Hour, getMetricsDefaultRetention(t, pgxConn)) + require.Equal(t, 10*24*time.Hour, getTracesDefaultRetention(t, pgxConn)) // Set to default if chunk interval is not specified. - cfg.Metrics.ChunkInterval = 0 + cfg, err = dataset.NewConfig("", *useTimescaleDB) + require.NoError(t, err) err = cfg.Apply(pgxConn) require.NoError(t, err) - require.Equal(t, getDefaultChunkInterval(t, pgxConn), 8*time.Hour) + require.Equal(t, 8*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) + require.Equal(t, *useTimescaleDB, getMetricsDefaultCompressionSetting(t, pgxConn)) + require.Equal(t, 10*time.Second, getMetricsDefaultHALeaseRefresh(t, pgxConn)) + require.Equal(t, 1*time.Minute, getMetricsDefaultHALeaseTimeout(t, pgxConn)) + require.Equal(t, 90*24*time.Hour, getMetricsDefaultRetention(t, pgxConn)) + require.Equal(t, 30*24*time.Hour, getTracesDefaultRetention(t, pgxConn)) }) } -func getDefaultChunkInterval(t testing.TB, conn *pgx.Conn) (chunkInterval time.Duration) { +func getMetricsDefaultChunkInterval(t testing.TB, conn *pgx.Conn) (chunkInterval time.Duration) { err := conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_chunk_interval()").Scan(&chunkInterval) if err != nil { - t.Fatal("error getting default chunk interval", err) + t.Fatal("error getting default metric chunk interval", err) } return chunkInterval } +func getMetricsDefaultCompressionSetting(t testing.TB, conn *pgx.Conn) (compressionSetting bool) { + err := conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_compression_setting()").Scan(&compressionSetting) + if err != nil { + t.Fatal("error getting default metric compression setting", err) + } + return compressionSetting +} +func getMetricsDefaultHALeaseRefresh(t testing.TB, conn *pgx.Conn) (haRefresh time.Duration) { + err := conn.QueryRow(context.Background(), "SELECT value::interval from _prom_catalog.default where key = 'ha_lease_refresh' LIMIT 1").Scan(&haRefresh) + if err != nil { + t.Fatal("error getting default metric HA lease refresh duration", err) + } + return haRefresh +} +func getMetricsDefaultHALeaseTimeout(t testing.TB, conn *pgx.Conn) (haTimeout time.Duration) { + err := conn.QueryRow(context.Background(), "SELECT value::interval from _prom_catalog.default where key = 'ha_lease_timeout' LIMIT 1").Scan(&haTimeout) + if err != nil { + t.Fatal("error getting default metric HA lease timeout duration", err) + } + return haTimeout +} +func getMetricsDefaultRetention(t testing.TB, conn *pgx.Conn) (retention time.Duration) { + err := conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_retention_period()").Scan(&retention) + if err != nil { + t.Fatal("error getting default metric retention period", err) + } + return retention +} +func getTracesDefaultRetention(t testing.TB, conn *pgx.Conn) (retention time.Duration) { + err := conn.QueryRow(context.Background(), "SELECT ps_trace.get_trace_retention_period()").Scan(&retention) + if err != nil { + t.Fatal("error getting default metric retention period", err) + } + return retention +}