From b76cde7cbeebb49f130e7792ead8f9d94bc392c4 Mon Sep 17 00:00:00 2001 From: tanghengjian <1040104807@qq.com> Date: Fri, 25 Nov 2022 17:04:23 +0800 Subject: [PATCH] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 Signed-off-by: tanghengjian <1040104807@qq.com> --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 +++ pkg/ingester/ingester.go | 1 + pkg/ingester/metrics.go | 45 ++++++++++++--------- pkg/storage/tsdb/config.go | 3 ++ 5 files changed, 36 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fe6bd8bacb..2ec95d2af9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 +* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 ## 1.14.0 in progress diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e717e6ec7a7..a4de24ae4b4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3714,6 +3714,11 @@ tsdb: # be stored. 0 or less means disabled. # CLI flag: -blocks-storage.tsdb.max-exemplars [max_exemplars: | default = 0] + + # The size of the in-memory queue used before flushing chunks to the disk. + # CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size + [head_chunks_write_queue_size: | default = 0] + ``` ### `compactor_config` diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8273c5117cf..8593c89952a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1846,6 +1846,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { EnableExemplarStorage: enableExemplars, IsolationDisabled: true, MaxExemplars: int64(i.cfg.BlocksStorageConfig.TSDB.MaxExemplars), + HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 585cc45df94..42cf86b97ff 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -336,25 +336,26 @@ type tsdbMetrics struct { uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total) // Metrics aggregated from TSDB. - tsdbCompactionsTotal *prometheus.Desc - tsdbCompactionDuration *prometheus.Desc - tsdbFsyncDuration *prometheus.Desc - tsdbPageFlushes *prometheus.Desc - tsdbPageCompletions *prometheus.Desc - tsdbWALTruncateFail *prometheus.Desc - tsdbWALTruncateTotal *prometheus.Desc - tsdbWALTruncateDuration *prometheus.Desc - tsdbWALCorruptionsTotal *prometheus.Desc - tsdbWALWritesFailed *prometheus.Desc - tsdbHeadTruncateFail *prometheus.Desc - tsdbHeadTruncateTotal *prometheus.Desc - tsdbHeadGcDuration *prometheus.Desc - tsdbActiveAppenders *prometheus.Desc - tsdbSeriesNotFound *prometheus.Desc - tsdbChunks *prometheus.Desc - tsdbChunksCreatedTotal *prometheus.Desc - tsdbChunksRemovedTotal *prometheus.Desc - tsdbMmapChunkCorruptionTotal *prometheus.Desc + tsdbCompactionsTotal *prometheus.Desc + tsdbCompactionDuration *prometheus.Desc + tsdbFsyncDuration *prometheus.Desc + tsdbPageFlushes *prometheus.Desc + tsdbPageCompletions *prometheus.Desc + tsdbWALTruncateFail *prometheus.Desc + tsdbWALTruncateTotal *prometheus.Desc + tsdbWALTruncateDuration *prometheus.Desc + tsdbWALCorruptionsTotal *prometheus.Desc + tsdbWALWritesFailed *prometheus.Desc + tsdbHeadTruncateFail *prometheus.Desc + tsdbHeadTruncateTotal *prometheus.Desc + tsdbHeadGcDuration *prometheus.Desc + tsdbActiveAppenders *prometheus.Desc + tsdbSeriesNotFound *prometheus.Desc + tsdbChunks *prometheus.Desc + tsdbChunksCreatedTotal *prometheus.Desc + tsdbChunksRemovedTotal *prometheus.Desc + tsdbMmapChunkCorruptionTotal *prometheus.Desc + tsdbChunkwriteQueueOperationsTotal *prometheus.Desc tsdbExemplarsTotal *prometheus.Desc tsdbExemplarsInStorage *prometheus.Desc @@ -478,6 +479,10 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { "cortex_ingester_tsdb_mmap_chunk_corruptions_total", "Total number of memory-mapped TSDB chunk corruptions.", nil, nil), + tsdbChunkwriteQueueOperationsTotal: prometheus.NewDesc( + "cortex_ingester_tsdb_chunk_write_queue_operations_total", + "Number of currently tsdb chunk write queues.", + []string{"user", "operation"}, nil), tsdbLoadedBlocks: prometheus.NewDesc( "cortex_ingester_tsdb_blocks_loaded", "Number of currently loaded data blocks", @@ -579,6 +584,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { out <- sm.tsdbChunksCreatedTotal out <- sm.tsdbChunksRemovedTotal out <- sm.tsdbMmapChunkCorruptionTotal + out <- sm.tsdbChunkwriteQueueOperationsTotal out <- sm.tsdbLoadedBlocks out <- sm.tsdbSymbolTableSize out <- sm.tsdbReloads @@ -628,6 +634,7 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCountersPerUser(out, sm.tsdbChunksCreatedTotal, "prometheus_tsdb_head_chunks_created_total") data.SendSumOfCountersPerUser(out, sm.tsdbChunksRemovedTotal, "prometheus_tsdb_head_chunks_removed_total") data.SendSumOfCounters(out, sm.tsdbMmapChunkCorruptionTotal, "prometheus_tsdb_mmap_chunk_corruptions_total") + data.SendSumOfCountersPerUserWithLabels(out, sm.tsdbChunkwriteQueueOperationsTotal, "prometheus_tsdb_chunk_write_queue_operations_total", "operation") data.SendSumOfGauges(out, sm.tsdbLoadedBlocks, "prometheus_tsdb_blocks_loaded") data.SendSumOfGaugesPerUser(out, sm.tsdbSymbolTableSize, "prometheus_tsdb_symbol_table_size_bytes") data.SendSumOfCounters(out, sm.tsdbReloads, "prometheus_tsdb_reloads_total") diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2fcc0cded15..2fc3e86abb0 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -136,6 +136,8 @@ type TSDBConfig struct { WALSegmentSizeBytes int `yaml:"wal_segment_size_bytes"` FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"` CloseIdleTSDBTimeout time.Duration `yaml:"close_idle_tsdb_timeout"` + //The size of the in-memory queue used before flushing chunks to the disk. + HeadChunksWriteQueueSize int `yaml:"head_chunks_write_queue_size"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup. MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -173,6 +175,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.") f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.") + f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.") } // Validate the config.