Skip to content

Commit

Permalink
Add information_schema.processlist based thread state counters
Browse files Browse the repository at this point in the history
The new processlist flag enables a query to generate a thread count
based on the processlist table.

This is useful to find out more about concurrency problems of a
server (e.g. connection piling up, certain clients making exessive
use of locks, ...).
  • Loading branch information
rtreffer committed Sep 22, 2015
1 parent 2e0e13b commit 62ccf9d
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ collect.perf_schema.tableiowaits | Collect metrics from performance_sc
collect.perf_schema.tableiowaitstime | Collect time metrics from performance_schema.table_io_waits_summary_by_table.
collect.perf_schema.tablelocks | Collect metrics from performance_schema.table_lock_waits_summary_by_table.
collect.perf_schema.tablelockstime | Collect time metrics from performance_schema.events_statements_summary_by_digest.
collect.info_schema.processlist | Collect thread state counts from information_schema.processlist.
log.level | Logging verbosity (default: info)
web.listen-address | Address to listen on for web interface and telemetry.
web.telemetry-path | Path under which to expose metrics.
Expand Down
169 changes: 166 additions & 3 deletions mysqld_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var (
"web.telemetry-path", "/metrics",
"Path under which to expose metrics.",
)
processlist = flag.Bool(
"collect.info_schema.processlist", false,
"Collect current thread state counts from the information_schema.processlist",
)
autoIncrementColumns = flag.Bool(
"collect.auto_increment.columns", false,
"Collect auto_increment columns and max values from information_schema",
Expand Down Expand Up @@ -90,9 +94,15 @@ const (

// Metric SQL Queries.
const (
globalStatusQuery = `SHOW GLOBAL STATUS`
globalVariablesQuery = `SHOW GLOBAL VARIABLES`
slaveStatusQuery = `SHOW SLAVE STATUS`
globalStatusQuery = `SHOW GLOBAL STATUS`
globalVariablesQuery = `SHOW GLOBAL VARIABLES`
slaveStatusQuery = `SHOW SLAVE STATUS`
infoSchemaProcesslistQuery = `
SELECT COALESCE(command,''),COALESCE(state,''),count(*)
FROM information_schema.processlist
WHERE ID != connection_id()
GROUP BY command,state
ORDER BY null`
infoSchemaAutoIncrementQuery = `
SELECT table_schema, table_name, column_name, auto_increment,
pow(2, case data_type
Expand Down Expand Up @@ -416,6 +426,91 @@ const (
picoSeconds = 1e12
)

// whitelist for connection/process states in SHOW PROCESSLIST
// tokudb uses the state column for "Queried about _______ rows"
var (
// TODO: might need some more keys for other MySQL versions or other storage engines
// see https://dev.mysql.com/doc/refman/5.7/en/general-thread-states.html
threadStateCounterMap = map[string]uint32{
"after create": uint32(0),
"altering table": uint32(0),
"analyzing": uint32(0),
"checking permissions": uint32(0),
"checking table": uint32(0),
"cleaning up": uint32(0),
"closing tables": uint32(0),
"converting heap to myisam": uint32(0),
"copying to tmp table": uint32(0),
"creating sort index": uint32(0),
"creating table": uint32(0),
"creating tmp table": uint32(0),
"deleting": uint32(0),
"executing": uint32(0),
"execution of init_command": uint32(0),
"end": uint32(0),
"freeing items": uint32(0),
"flushing tables": uint32(0),
"fulltext initialization": uint32(0),
"idle": uint32(0),
"init": uint32(0),
"killed": uint32(0),
"waiting for lock": uint32(0),
"logging slow query": uint32(0),
"login": uint32(0),
"manage keys": uint32(0),
"opening tables": uint32(0),
"optimizing": uint32(0),
"preparing": uint32(0),
"reading from net": uint32(0),
"removing duplicates": uint32(0),
"removing tmp table": uint32(0),
"reopen tables": uint32(0),
"repair by sorting": uint32(0),
"repair done": uint32(0),
"repair with keycache": uint32(0),
"replication master": uint32(0),
"rolling back": uint32(0),
"searching rows for update": uint32(0),
"sending data": uint32(0),
"sorting for group": uint32(0),
"sorting for order": uint32(0),
"sorting index": uint32(0),
"sorting result": uint32(0),
"statistics": uint32(0),
"updating": uint32(0),
"waiting for tables": uint32(0),
"waiting for table flush": uint32(0),
"waiting on cond": uint32(0),
"writing to net": uint32(0),
"other": uint32(0),
}
threadStateMapping = map[string]string{
"user sleep": "idle",
"creating index": "altering table",
"committing alter table to storage engine": "altering table",
"discard or import tablespace": "altering table",
"rename": "altering table",
"setup": "altering table",
"renaming result table": "altering table",
"preparing for alter table": "altering table",
"copying to group table": "copying to tmp table",
"copy to tmp table": "copying to tmp table",
"query end": "end",
"update": "updating",
"updating main table": "updating",
"updating reference tables": "updating",
"system lock": "waiting for lock",
"user lock": "waiting for lock",
"table lock": "waiting for lock",
"deleting from main table": "deleting",
"deleting from reference tables": "deltting",
}
processlistDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "threads"),
"The number of threads (connections) split by current state.",
[]string{"state"}, nil)
)

// Various regexps.
var (
globalStatusRE = regexp.MustCompile(`^(com|connection_errors|innodb_rows|performance_schema)_(.*)$`)
Expand Down Expand Up @@ -522,6 +617,12 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) {
log.Println("Error scraping slave state:", err)
return
}
if *processlist {
if err = scrapeProcesslist(db, ch); err != nil {
log.Println("Error scraping process list:", err)
return
}
}
if *autoIncrementColumns {
if err = scrapeInformationSchema(db, ch); err != nil {
log.Println("Error scraping information schema:", err)
Expand Down Expand Up @@ -1151,6 +1252,68 @@ func scrapeUserStat(db *sql.DB, ch chan<- prometheus.Metric) error {
return nil
}

func deriveThreadState(command string, state string) string {
var normCmd = strings.Replace(strings.ToLower(command), "_", " ", -1)
var normState = strings.Replace(strings.ToLower(state), "_", " ", -1)
// check if it's already a valid state
_, knownState := threadStateCounterMap[normState]
if knownState {
return normState
}
// check if plain mapping applies
mappedState, canMap := threadStateMapping[normState]
if canMap {
return mappedState
}
// check special waiting for XYZ lock
if strings.Contains(normState, "waiting for") && strings.Contains(normState, "lock") {
return "waiting for lock"
}
if normCmd == "sleep" && normState == "" {
return "idle"
}
if normCmd == "query" {
return "executing"
}
if normCmd == "binlog dump" {
return "replication master"
}
return "other"
}

func scrapeProcesslist(db *sql.DB, ch chan<- prometheus.Metric) error {
processlistRows, err := db.Query(infoSchemaProcesslistQuery)
if err != nil {
return err
}
defer processlistRows.Close()

var (
command string
state string
count uint32
)
stateCounts := make(map[string]uint32, len(threadStateCounterMap))
for k, v := range threadStateCounterMap {
stateCounts[k] = v
}

for processlistRows.Next() {
err = processlistRows.Scan(&command, &state, &count)
if err != nil {
return err
}
realState := deriveThreadState(command, state)
stateCounts[realState] += count
}

for state, count := range stateCounts {
ch <- prometheus.MustNewConstMetric(processlistDesc, prometheus.GaugeValue, float64(count), state)
}

return nil
}

func newDesc(subsystem, name, help string) *prometheus.Desc {
return prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, name),
Expand Down

0 comments on commit 62ccf9d

Please sign in to comment.