Skip to content

Commit

Permalink
Ingester: Added -blocks-storage.tsdb.head-chunks-write-queue-size a…
Browse files Browse the repository at this point in the history
…llowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

Signed-off-by: tanghengjian <1040104807@qq.com>
  • Loading branch information
t00350320 committed Nov 29, 2022
1 parent d595cba commit fe220e4
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978

* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

## 1.14.0 in progress

Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,10 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout
[close_idle_tsdb_timeout: <duration> | default = 0s]
# The size of the in-memory queue used before flushing chunks to the disk.
# CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size
[head_chunks_write_queue_size: <int> | default = 0]
# limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,10 @@ blocks_storage:
# CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout
[close_idle_tsdb_timeout: <duration> | default = 0s]
# The size of the in-memory queue used before flushing chunks to the disk.
# CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size
[head_chunks_write_queue_size: <int> | default = 0]
# limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3706,6 +3706,10 @@ tsdb:
# CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout
[close_idle_tsdb_timeout: <duration> | default = 0s]
# The size of the in-memory queue used before flushing chunks to the disk.
# CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size
[head_chunks_write_queue_size: <int> | default = 0]
# limit the number of concurrently opening TSDB's on startup
# CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand All @@ -3714,6 +3718,7 @@ tsdb:
# be stored. 0 or less means disabled.
# CLI flag: -blocks-storage.tsdb.max-exemplars
[max_exemplars: <int> | default = 0]
```

### `compactor_config`
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
EnableExemplarStorage: enableExemplars,
IsolationDisabled: true,
MaxExemplars: int64(i.cfg.BlocksStorageConfig.TSDB.MaxExemplars),
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
45 changes: 26 additions & 19 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,25 +336,26 @@ type tsdbMetrics struct {
uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total)

// Metrics aggregated from TSDB.
tsdbCompactionsTotal *prometheus.Desc
tsdbCompactionDuration *prometheus.Desc
tsdbFsyncDuration *prometheus.Desc
tsdbPageFlushes *prometheus.Desc
tsdbPageCompletions *prometheus.Desc
tsdbWALTruncateFail *prometheus.Desc
tsdbWALTruncateTotal *prometheus.Desc
tsdbWALTruncateDuration *prometheus.Desc
tsdbWALCorruptionsTotal *prometheus.Desc
tsdbWALWritesFailed *prometheus.Desc
tsdbHeadTruncateFail *prometheus.Desc
tsdbHeadTruncateTotal *prometheus.Desc
tsdbHeadGcDuration *prometheus.Desc
tsdbActiveAppenders *prometheus.Desc
tsdbSeriesNotFound *prometheus.Desc
tsdbChunks *prometheus.Desc
tsdbChunksCreatedTotal *prometheus.Desc
tsdbChunksRemovedTotal *prometheus.Desc
tsdbMmapChunkCorruptionTotal *prometheus.Desc
tsdbCompactionsTotal *prometheus.Desc
tsdbCompactionDuration *prometheus.Desc
tsdbFsyncDuration *prometheus.Desc
tsdbPageFlushes *prometheus.Desc
tsdbPageCompletions *prometheus.Desc
tsdbWALTruncateFail *prometheus.Desc
tsdbWALTruncateTotal *prometheus.Desc
tsdbWALTruncateDuration *prometheus.Desc
tsdbWALCorruptionsTotal *prometheus.Desc
tsdbWALWritesFailed *prometheus.Desc
tsdbHeadTruncateFail *prometheus.Desc
tsdbHeadTruncateTotal *prometheus.Desc
tsdbHeadGcDuration *prometheus.Desc
tsdbActiveAppenders *prometheus.Desc
tsdbSeriesNotFound *prometheus.Desc
tsdbChunks *prometheus.Desc
tsdbChunksCreatedTotal *prometheus.Desc
tsdbChunksRemovedTotal *prometheus.Desc
tsdbMmapChunkCorruptionTotal *prometheus.Desc
tsdbChunkwriteQueueOperationsTotal *prometheus.Desc

tsdbExemplarsTotal *prometheus.Desc
tsdbExemplarsInStorage *prometheus.Desc
Expand Down Expand Up @@ -478,6 +479,10 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
"cortex_ingester_tsdb_mmap_chunk_corruptions_total",
"Total number of memory-mapped TSDB chunk corruptions.",
nil, nil),
tsdbChunkwriteQueueOperationsTotal: prometheus.NewDesc(
"cortex_ingester_tsdb_chunk_write_queue_operations_total",
"Number of currently tsdb chunk write queues.",
[]string{"user", "operation"}, nil),
tsdbLoadedBlocks: prometheus.NewDesc(
"cortex_ingester_tsdb_blocks_loaded",
"Number of currently loaded data blocks",
Expand Down Expand Up @@ -579,6 +584,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
out <- sm.tsdbChunksCreatedTotal
out <- sm.tsdbChunksRemovedTotal
out <- sm.tsdbMmapChunkCorruptionTotal
out <- sm.tsdbChunkwriteQueueOperationsTotal
out <- sm.tsdbLoadedBlocks
out <- sm.tsdbSymbolTableSize
out <- sm.tsdbReloads
Expand Down Expand Up @@ -628,6 +634,7 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfCountersPerUser(out, sm.tsdbChunksCreatedTotal, "prometheus_tsdb_head_chunks_created_total")
data.SendSumOfCountersPerUser(out, sm.tsdbChunksRemovedTotal, "prometheus_tsdb_head_chunks_removed_total")
data.SendSumOfCounters(out, sm.tsdbMmapChunkCorruptionTotal, "prometheus_tsdb_mmap_chunk_corruptions_total")
data.SendSumOfCountersPerUserWithLabels(out, sm.tsdbChunkwriteQueueOperationsTotal, "prometheus_tsdb_chunk_write_queue_operations_total", "operation")
data.SendSumOfGauges(out, sm.tsdbLoadedBlocks, "prometheus_tsdb_blocks_loaded")
data.SendSumOfGaugesPerUser(out, sm.tsdbSymbolTableSize, "prometheus_tsdb_symbol_table_size_bytes")
data.SendSumOfCounters(out, sm.tsdbReloads, "prometheus_tsdb_reloads_total")
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type TSDBConfig struct {
WALSegmentSizeBytes int `yaml:"wal_segment_size_bytes"`
FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"`
CloseIdleTSDBTimeout time.Duration `yaml:"close_idle_tsdb_timeout"`
//The size of the in-memory queue used before flushing chunks to the disk.
HeadChunksWriteQueueSize int `yaml:"head_chunks_write_queue_size"`

// MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup.
MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"`
Expand Down Expand Up @@ -173,6 +175,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.")
f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.")
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
}

// Validate the config.
Expand Down

0 comments on commit fe220e4

Please sign in to comment.