Skip to content

Commit

Permalink
Merge pull request #760 from DataDog/pg-per-table-stats
Browse files Browse the repository at this point in the history
Explicit per-table PG stats
  • Loading branch information
Remi Hakim committed Jan 27, 2014
2 parents ac4c8d8 + 6f2a342 commit 31dc88f
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 71 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include datadog.conf.example
include checks/libs/jmxterm-1.0-alpha-4-uber.jar
include checks/libs/jmxfetch-0.1.2-jar-with-dependencies.jar
include pup/pup.html
include pup/static/*
245 changes: 175 additions & 70 deletions checks.d/postgres.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,89 @@
from checks import AgentCheck

GAUGE = 'gauge'
RATE = 'rate'

METRICS = {
'numbackends' : ('connections', GAUGE),
'xact_commit' : ('commits', RATE),
'xact_rollback' : ('rollbacks', RATE),
'blks_read' : ('disk_read', RATE),
'blks_hit' : ('buffer_hit', RATE),
'tup_returned' : ('rows_returned', RATE),
'tup_fetched' : ('rows_fetched', RATE),
'tup_inserted' : ('rows_inserted', RATE),
'tup_updated' : ('rows_updated', RATE),
'tup_deleted' : ('rows_deleted', RATE),

}

NEWER_92_METRICS = {
'deadlocks' : ('deadlocks', GAUGE),
'temp_bytes' : ('temp_bytes', RATE),
'temp_files' : ('temp_files', RATE),
}
from checks import AgentCheck, CheckException

class PostgreSql(AgentCheck):
"""Collects per-database, and optionally per-relation metrics
"""

RATE = AgentCheck.rate
GAUGE = AgentCheck.gauge

# turning columns into tags
DB_METRICS = {
'descriptors': [
('datname', 'db')
],
'metrics': {
'numbackends' : ('postgresql.connections', GAUGE),
'xact_commit' : ('postgresql.commits', RATE),
'xact_rollback' : ('postgresql.rollbacks', RATE),
'blks_read' : ('postgresql.disk_read', RATE),
'blks_hit' : ('postgresql.buffer_hit', RATE),
'tup_returned' : ('postgresql.rows_returned', RATE),
'tup_fetched' : ('postgresql.rows_fetched', RATE),
'tup_inserted' : ('postgresql.rows_inserted', RATE),
'tup_updated' : ('postgresql.rows_updated', RATE),
'tup_deleted' : ('postgresql.rows_deleted', RATE),
},
'query': """
SELECT datname,
%s
FROM pg_stat_database
WHERE datname not ilike 'template%%'
AND datname not ilike 'postgres'
""",
'relation': False,
}

NEWER_92_METRICS = {
'deadlocks' : ('postgresql.deadlocks', GAUGE),
'temp_bytes' : ('postgresql.temp_bytes', RATE),
'temp_files' : ('postgresql.temp_files', RATE),
}

REL_METRICS = {
'descriptors': [
('relname', 'table')
],
'metrics': {
'seq_scan' : ('postgresql.seq_scans', RATE),
'seq_tup_read' : ('postgresql.seq_rows_read', RATE),
'idx_scan' : ('postgresql.index_scans', RATE),
'idx_tup_fetch' : ('postgresql.index_rows_fetched', RATE),
'n_tup_ins' : ('postgresql.rows_inserted', RATE),
'n_tup_upd' : ('postgresql.rows_updated', RATE),
'n_tup_del' : ('postgresql.rows_deleted', RATE),
'n_tup_hot_upd' : ('postgresql.rows_hot_updated', RATE),
'n_live_tup' : ('postgresql.live_rows', GAUGE),
'n_dead_tup' : ('postgresql.dead_rows', GAUGE),
},
'query': """
SELECT relname,
%s
FROM pg_stat_user_tables
WHERE relname = ANY(%s)""",
'relation': True,
}

IDX_METRICS = {
'descriptors': [
('relname', 'table'),
('indexrelname', 'index')
],
'metrics': {
'idx_scan' : ('postgresql.index_scans', RATE),
'idx_tup_read' : ('postgresql.index_rows_read', RATE),
'idx_tup_fetch' : ('postgresql.index_rows_fetched', RATE),
},
'query': """
SELECT relname,
indexrelname,
%s
FROM pg_stat_user_indexes
WHERE relname = ANY(%s)""",
'relation': True,
}


def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)
self.dbs = {}
Expand All @@ -37,7 +97,6 @@ def get_library_versions(self):
version = "Not Found"
except AttributeError:
version = "Unknown"

return {"psycopg2": version}

def _get_version(self, key, db):
Expand All @@ -56,63 +115,96 @@ def _get_version(self, key, db):
def _is_9_2_or_above(self, key, db):
version = self._get_version(key, db)
if type(version) == list:
return version >= [9,2,0]
return version >= [9, 2, 0]

return False

def _collect_stats(self, key, db, instance_tags, relations):
"""Query pg_stat_* for various metrics
If relations is not an empty list, gather per-relation metrics
on top of that.
"""

def _collect_stats(self, key, db, instance_tags):

metrics_to_collect = METRICS
# Extended 9.2+ metrics
if self._is_9_2_or_above(key, db):
metrics_to_collect.update(NEWER_92_METRICS)
self.DB_METRICS['metrics'].update(self.NEWER_92_METRICS)

# Do we need relation-specific metrics?
if not relations:
metric_scope = (self.DB_METRICS,)
else:
metric_scope = (self.DB_METRICS, self.REL_METRICS, self.IDX_METRICS)

for scope in metric_scope:
# build query
cols = scope['metrics'].keys() # list of metrics to query, in some order
# we must remember that order to parse results
cursor = db.cursor()

# if this is a relation-specific query, we need to list all relations last
if scope['relation'] and len(relations) > 0:
query = scope['query'] % (", ".join(cols), "%s") # Keep the last %s intact
cursor.execute(query, (relations, ))
else:
query = scope['query'] % (", ".join(cols))
cursor.execute(query)

metrics_keys = metrics_to_collect.keys()
fields = ",".join(metrics_keys)
query = """SELECT datname,
%s
FROM pg_stat_database
WHERE datname not ilike 'template%%'
AND datname not ilike 'postgres'
;""" % fields

cursor = db.cursor()
cursor.execute(query)
result = cursor.fetchone()
while result is not None:
dbname = result[0]
try:
tags = ['db:%s' % dbname] + instance_tags
except Exception:
# if tags is none or is not of the right type
tags = ['db:%s' % dbname]

for i, value in enumerate(result):
if i == 0:
# This is the dbname
continue
results = cursor.fetchall()
cursor.close()

# parse & submit results
# A row should look like this
# (descriptor, descriptor, ..., value, value, value, value, ...)
# with descriptor a PG relation or index name, which we use to create the tags
for row in results:
# turn descriptors into tags
desc = scope['descriptors']
# Check that all columns will be processed
assert len(row) == len(cols) + len(desc)

# Build tags
# descriptors are: (pg_name, dd_tag_name): value
# Special-case the "db" tag, which overrides the one that is passed as instance_tag
# The reason is that pg_stat_database returns all databases regardless of the
# connection.
if not scope['relation']:
tags = [t for t in instance_tags if not t.startswith("db:")]
else:
tags = [t for t in instance_tags]

metric_name = "postgresql.%s" % metrics_to_collect[metrics_keys[i-1]][0]
metric_type = metrics_to_collect[metrics_keys[i-1]][1]
if metric_type == GAUGE:
self.gauge(metric_name, value, tags=tags)
elif metric_type == RATE:
self.rate(metric_name, value, tags=tags)
tags += ["%s:%s" % (d[0][1], d[1]) for d in zip(desc, row[:len(desc)])]

result = cursor.fetchone()
del cursor
# [(metric-map, value), (metric-map, value), ...]
# metric-map is: (dd_name, "rate"|"gauge")
# shift the results since the first columns will be the "descriptors"
values = zip([scope['metrics'][c] for c in cols], row[len(desc):])

# To submit simply call the function for each value v
# v[0] == (metric_name, submit_function)
# v[1] == the actual value
# tags are
[v[0][1](self, v[0][0], v[1], tags=tags) for v in values]


def get_connection(self, key, host, port, user, password, dbname):

"Get and memoize connections to instances"
if key in self.dbs:
return self.dbs[key]

elif host != '' and user != '':
elif host != "" and user != "":
try:
import psycopg2 as pg
if host == 'localhost' and password == '':
# Use ident method
return pg.connect("user=%s dbname=%s" % (user, dbname))
elif port != '':
return pg.connect(host=host, port=port, user=user,
password=password, database=dbname)
else:
return pg.connect(host=host, user=user, password=password,
database=dbname)
except ImportError:
raise ImportError("psycopg2 library can not be imported. Please check the installation instruction on the Datadog Website")
raise ImportError("psycopg2 library cannot be imported. Please check the installation instruction on the Datadog Website.")

if host == 'localhost' and password == '':
# Use ident method
Expand All @@ -123,6 +215,14 @@ def get_connection(self, key, host, port, user, password, dbname):
else:
connection = pg.connect(host=host, user=user, password=password,
database=dbname)
else:
if host is None or host == "":
raise CheckException("Please specify a Postgres host to connect to.")
elif user is None or user == "":
raise CheckException("Please specify a user to connect to Postgres as.")
else:
raise CheckException("Cannot connect to Postgres.")

connection.autocommit = True

self.dbs[key] = connection
Expand All @@ -136,20 +236,25 @@ def check(self, instance):
password = instance.get('password', '')
tags = instance.get('tags', [])
dbname = instance.get('database', 'postgres')
relations = instance.get('relations', [])

key = '%s:%s' % (host, port)
db = self.get_connection(key, host, port, user, password, dbname)

# Clean up tags in case there was a None entry in the instance
# e.g. if the yaml contains tags: but no actual tags
if tags is None:
tags = []
key = '%s:%s' % (host, port)

db = self.get_connection(key, host, port, user, password, dbname)

# preset tags to the database name
tags.extend(["db:%s" % dbname])

# Check version
version = self._get_version(key, db)
self.log.debug("Running check against version %s" % version)

# Collect metrics
self._collect_stats(key, db, tags)
self._collect_stats(key, db, tags, relations)

@staticmethod
def parse_agent_config(agentConfig):
Expand Down
13 changes: 12 additions & 1 deletion conf.d/postgres.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,15 @@ instances:
# dbname: db_name
# tags:
# - optional_tag1
# - optional_tag2
# - optional_tag2

# Custom-metrics section

# You can now track per-relation (table) metrics
# You need to specify the list. Each relation
# generates a lot of metrics (10 + 10 per index)
# so you want to only use the ones you really care about

# relations:
# - my_table
# - my_other_table

0 comments on commit 31dc88f

Please sign in to comment.