From 62ccf9d4102e5d582598691af3d844d3d6df10a6 Mon Sep 17 00:00:00 2001 From: Rene Treffer Date: Mon, 21 Sep 2015 10:56:30 +0200 Subject: [PATCH] Add information_schema.processlist based thread state counters The new processlist flag enables a query to generate a thread count based on the processlist table. This is useful to find out more about concurrency problems of a server (e.g. connection piling up, certain clients making exessive use of locks, ...). --- README.md | 1 + mysqld_exporter.go | 169 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 167 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f8672be92..51842c982 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ collect.perf_schema.tableiowaits | Collect metrics from performance_sc collect.perf_schema.tableiowaitstime | Collect time metrics from performance_schema.table_io_waits_summary_by_table. collect.perf_schema.tablelocks | Collect metrics from performance_schema.table_lock_waits_summary_by_table. collect.perf_schema.tablelockstime | Collect time metrics from performance_schema.events_statements_summary_by_digest. +collect.info_schema.processlist | Collect thread state counts from information_schema.processlist. log.level | Logging verbosity (default: info) web.listen-address | Address to listen on for web interface and telemetry. web.telemetry-path | Path under which to expose metrics. diff --git a/mysqld_exporter.go b/mysqld_exporter.go index 94adedb0c..2d64dbdb4 100644 --- a/mysqld_exporter.go +++ b/mysqld_exporter.go @@ -26,6 +26,10 @@ var ( "web.telemetry-path", "/metrics", "Path under which to expose metrics.", ) + processlist = flag.Bool( + "collect.info_schema.processlist", false, + "Collect current thread state counts from the information_schema.processlist", + ) autoIncrementColumns = flag.Bool( "collect.auto_increment.columns", false, "Collect auto_increment columns and max values from information_schema", @@ -90,9 +94,15 @@ const ( // Metric SQL Queries. const ( - globalStatusQuery = `SHOW GLOBAL STATUS` - globalVariablesQuery = `SHOW GLOBAL VARIABLES` - slaveStatusQuery = `SHOW SLAVE STATUS` + globalStatusQuery = `SHOW GLOBAL STATUS` + globalVariablesQuery = `SHOW GLOBAL VARIABLES` + slaveStatusQuery = `SHOW SLAVE STATUS` + infoSchemaProcesslistQuery = ` + SELECT COALESCE(command,''),COALESCE(state,''),count(*) + FROM information_schema.processlist + WHERE ID != connection_id() + GROUP BY command,state + ORDER BY null` infoSchemaAutoIncrementQuery = ` SELECT table_schema, table_name, column_name, auto_increment, pow(2, case data_type @@ -416,6 +426,91 @@ const ( picoSeconds = 1e12 ) +// whitelist for connection/process states in SHOW PROCESSLIST +// tokudb uses the state column for "Queried about _______ rows" +var ( + // TODO: might need some more keys for other MySQL versions or other storage engines + // see https://dev.mysql.com/doc/refman/5.7/en/general-thread-states.html + threadStateCounterMap = map[string]uint32{ + "after create": uint32(0), + "altering table": uint32(0), + "analyzing": uint32(0), + "checking permissions": uint32(0), + "checking table": uint32(0), + "cleaning up": uint32(0), + "closing tables": uint32(0), + "converting heap to myisam": uint32(0), + "copying to tmp table": uint32(0), + "creating sort index": uint32(0), + "creating table": uint32(0), + "creating tmp table": uint32(0), + "deleting": uint32(0), + "executing": uint32(0), + "execution of init_command": uint32(0), + "end": uint32(0), + "freeing items": uint32(0), + "flushing tables": uint32(0), + "fulltext initialization": uint32(0), + "idle": uint32(0), + "init": uint32(0), + "killed": uint32(0), + "waiting for lock": uint32(0), + "logging slow query": uint32(0), + "login": uint32(0), + "manage keys": uint32(0), + "opening tables": uint32(0), + "optimizing": uint32(0), + "preparing": uint32(0), + "reading from net": uint32(0), + "removing duplicates": uint32(0), + "removing tmp table": uint32(0), + "reopen tables": uint32(0), + "repair by sorting": uint32(0), + "repair done": uint32(0), + "repair with keycache": uint32(0), + "replication master": uint32(0), + "rolling back": uint32(0), + "searching rows for update": uint32(0), + "sending data": uint32(0), + "sorting for group": uint32(0), + "sorting for order": uint32(0), + "sorting index": uint32(0), + "sorting result": uint32(0), + "statistics": uint32(0), + "updating": uint32(0), + "waiting for tables": uint32(0), + "waiting for table flush": uint32(0), + "waiting on cond": uint32(0), + "writing to net": uint32(0), + "other": uint32(0), + } + threadStateMapping = map[string]string{ + "user sleep": "idle", + "creating index": "altering table", + "committing alter table to storage engine": "altering table", + "discard or import tablespace": "altering table", + "rename": "altering table", + "setup": "altering table", + "renaming result table": "altering table", + "preparing for alter table": "altering table", + "copying to group table": "copying to tmp table", + "copy to tmp table": "copying to tmp table", + "query end": "end", + "update": "updating", + "updating main table": "updating", + "updating reference tables": "updating", + "system lock": "waiting for lock", + "user lock": "waiting for lock", + "table lock": "waiting for lock", + "deleting from main table": "deleting", + "deleting from reference tables": "deltting", + } + processlistDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, informationSchema, "threads"), + "The number of threads (connections) split by current state.", + []string{"state"}, nil) +) + // Various regexps. var ( globalStatusRE = regexp.MustCompile(`^(com|connection_errors|innodb_rows|performance_schema)_(.*)$`) @@ -522,6 +617,12 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) { log.Println("Error scraping slave state:", err) return } + if *processlist { + if err = scrapeProcesslist(db, ch); err != nil { + log.Println("Error scraping process list:", err) + return + } + } if *autoIncrementColumns { if err = scrapeInformationSchema(db, ch); err != nil { log.Println("Error scraping information schema:", err) @@ -1151,6 +1252,68 @@ func scrapeUserStat(db *sql.DB, ch chan<- prometheus.Metric) error { return nil } +func deriveThreadState(command string, state string) string { + var normCmd = strings.Replace(strings.ToLower(command), "_", " ", -1) + var normState = strings.Replace(strings.ToLower(state), "_", " ", -1) + // check if it's already a valid state + _, knownState := threadStateCounterMap[normState] + if knownState { + return normState + } + // check if plain mapping applies + mappedState, canMap := threadStateMapping[normState] + if canMap { + return mappedState + } + // check special waiting for XYZ lock + if strings.Contains(normState, "waiting for") && strings.Contains(normState, "lock") { + return "waiting for lock" + } + if normCmd == "sleep" && normState == "" { + return "idle" + } + if normCmd == "query" { + return "executing" + } + if normCmd == "binlog dump" { + return "replication master" + } + return "other" +} + +func scrapeProcesslist(db *sql.DB, ch chan<- prometheus.Metric) error { + processlistRows, err := db.Query(infoSchemaProcesslistQuery) + if err != nil { + return err + } + defer processlistRows.Close() + + var ( + command string + state string + count uint32 + ) + stateCounts := make(map[string]uint32, len(threadStateCounterMap)) + for k, v := range threadStateCounterMap { + stateCounts[k] = v + } + + for processlistRows.Next() { + err = processlistRows.Scan(&command, &state, &count) + if err != nil { + return err + } + realState := deriveThreadState(command, state) + stateCounts[realState] += count + } + + for state, count := range stateCounts { + ch <- prometheus.MustNewConstMetric(processlistDesc, prometheus.GaugeValue, float64(count), state) + } + + return nil +} + func newDesc(subsystem, name, help string) *prometheus.Desc { return prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, name),