Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit per-table PG stats #760

Merged
merged 15 commits into from
Jan 27, 2014
Merged
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ stats.dat
conf.d/*.yaml
!conf.d/network.yaml
packaging/build/
packaging/root/
packaging/root/
/.vagrant/machines/default/virtualbox/id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be /.vagrant/ alone, we don't want anything from that subdir in git.

2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include datadog.conf.example
include checks/libs/jmxterm-1.0-alpha-4-uber.jar
include checks/libs/jmxfetch-0.1.2-jar-with-dependencies.jar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still bundling jmxterm for now for debugging purposes.
We will remove it in a next version of the agent.

include pup/pup.html
include pup/static/*
261 changes: 191 additions & 70 deletions checks.d/postgres.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,89 @@
from checks import AgentCheck

GAUGE = 'gauge'
RATE = 'rate'

METRICS = {
'numbackends' : ('connections', GAUGE),
'xact_commit' : ('commits', RATE),
'xact_rollback' : ('rollbacks', RATE),
'blks_read' : ('disk_read', RATE),
'blks_hit' : ('buffer_hit', RATE),
'tup_returned' : ('rows_returned', RATE),
'tup_fetched' : ('rows_fetched', RATE),
'tup_inserted' : ('rows_inserted', RATE),
'tup_updated' : ('rows_updated', RATE),
'tup_deleted' : ('rows_deleted', RATE),

}

NEWER_92_METRICS = {
'deadlocks' : ('deadlocks', GAUGE),
'temp_bytes' : ('temp_bytes_per_sec', RATE),
'temp_files' : ('temp_files_per_sec', RATE),
}
from checks import AgentCheck, CheckException

class PostgreSql(AgentCheck):
"""Collects per-database, and optionally per-relation metrics
"""

RATE = AgentCheck.rate
GAUGE = AgentCheck.gauge

# turning columns into tags
DB_METRICS = {
'descriptors': [
('datname', 'db')
],
'metrics': {
'numbackends' : ('postgresql.connections', GAUGE),
'xact_commit' : ('postgresql.commits', RATE),
'xact_rollback' : ('postgresql.rollbacks', RATE),
'blks_read' : ('postgresql.disk_read', RATE),
'blks_hit' : ('postgresql.buffer_hit', RATE),
'tup_returned' : ('postgresql.rows_returned', RATE),
'tup_fetched' : ('postgresql.rows_fetched', RATE),
'tup_inserted' : ('postgresql.rows_inserted', RATE),
'tup_updated' : ('postgresql.rows_updated', RATE),
'tup_deleted' : ('postgresql.rows_deleted', RATE),
},
'query': """
SELECT datname,
%s
FROM pg_stat_database
WHERE datname not ilike 'template%%'
AND datname not ilike 'postgres'
""",
'relation': False,
}

NEWER_92_METRICS = {
'deadlocks' : ('postgresql.deadlocks', GAUGE),
'temp_bytes' : ('postgresql.temp_bytes', RATE),
'temp_files' : ('postgresql.temp_files', RATE),
}

REL_METRICS = {
'descriptors': [
('relname', 'table')
],
'metrics': {
'seq_scan' : ('postgresql.seq_scans', RATE),
'seq_tup_read' : ('postgresql.seq_rows_read', RATE),
'idx_scan' : ('postgresql.index_scans', RATE),
'idx_tup_fetch' : ('postgresql.index_rows_fetched', RATE),
'n_tup_ins' : ('postgresql.rows_inserted', RATE),
'n_tup_upd' : ('postgresql.rows_updated', RATE),
'n_tup_del' : ('postgresql.rows_deleted', RATE),
'n_tup_hot_upd' : ('postgresql.rows_hot_updated', RATE),
'n_live_tup' : ('postgresql.live_rows', GAUGE),
'n_dead_tup' : ('postgresql.dead_rows', GAUGE),
},
'query': """
SELECT relname,
%s
FROM pg_stat_user_tables
WHERE relname = ANY(%s)""",
'relation': True,
}

IDX_METRICS = {
'descriptors': [
('relname', 'table'),
('indexrelname', 'index')
],
'metrics': {
'idx_scan' : ('postgresql.index_scans', RATE),
'idx_tup_read' : ('postgresql.index_rows_read', RATE),
'idx_tup_fetch' : ('postgresql.index_rows_fetched', RATE),
},
'query': """
SELECT relname,
indexrelname,
%s
FROM pg_stat_user_indexes
WHERE relname = ANY(%s)""",
'relation': True,
}


def __init__(self, name, init_config, agentConfig):
AgentCheck.__init__(self, name, init_config, agentConfig)
self.dbs = {}
Expand All @@ -37,7 +97,6 @@ def get_library_versions(self):
version = "Not Found"
except AttributeError:
version = "Unknown"

return {"psycopg2": version}

def _get_version(self, key, db):
Expand All @@ -56,63 +115,96 @@ def _get_version(self, key, db):
def _is_9_2_or_above(self, key, db):
version = self._get_version(key, db)
if type(version) == list:
return version >= [9,2,0]
return version >= [9, 2, 0]

return False

def _collect_stats(self, key, db, instance_tags, relations):
"""Query pg_stat_* for various metrics
If relations is not an empty list, gather per-relation metrics
on top of that.
"""

def _collect_stats(self, key, db, instance_tags):

metrics_to_collect = METRICS
# Extended 9.2+ metrics
if self._is_9_2_or_above(key, db):
metrics_to_collect.update(NEWER_92_METRICS)
self.DB_METRICS['metrics'].update(self.NEWER_92_METRICS)

# Do we need relation-specific metrics?
if relations is None or relations == []:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but you can just do:

if not relations:

here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I miss string[] in the function signature...

metric_scope = (self.DB_METRICS,)
else:
metric_scope = (self.DB_METRICS, self.REL_METRICS, self.IDX_METRICS)

for scope in metric_scope:
# build query
cols = scope['metrics'].keys() # list of metrics to query, in some order
# we must remember that order to parse results
cursor = db.cursor()

# if this is a relation-specific query, we need to list all relations last
if scope['relation'] and len(relations) > 0:
query = scope['query'] % (", ".join(cols), "%s") # Keep the last %s intact
cursor.execute(query, (relations, ))
else:
query = scope['query'] % (", ".join(cols))
cursor.execute(query)

metrics_keys = metrics_to_collect.keys()
fields = ",".join(metrics_keys)
query = """SELECT datname,
%s
FROM pg_stat_database
WHERE datname not ilike 'template%%'
AND datname not ilike 'postgres'
;""" % fields

cursor = db.cursor()
cursor.execute(query)
result = cursor.fetchone()
while result is not None:
dbname = result[0]
try:
tags = ['db:%s' % dbname] + instance_tags
except Exception:
# if tags is none or is not of the right type
tags = ['db:%s' % dbname]

for i, value in enumerate(result):
if i == 0:
# This is the dbname
continue
results = cursor.fetchall()
cursor.close()

# parse & submit results
# A row should look like this
# (descriptor, descriptor, ..., value, value, value, value, ...)
# with descriptor a PG relation or index name, which we use to create the tags
for row in results:
# turn descriptors into tags
desc = scope['descriptors']
# Check that all columns will be processed
assert len(row) == len(cols) + len(desc)

# Build tags
# descriptors are: (pg_name, dd_tag_name): value
# Special-case the "db" tag, which overrides the one that is passed as instance_tag
# The reason is that pg_stat_database returns all databases regardless of the
# connection.
if not scope['relation']:
tags = [t for t in instance_tags if not t.startswith("db:")]
else:
tags = [t for t in instance_tags]

metric_name = "postgresql.%s" % metrics_to_collect[metrics_keys[i-1]][0]
metric_type = metrics_to_collect[metrics_keys[i-1]][1]
if metric_type == GAUGE:
self.gauge(metric_name, value, tags=tags)
elif metric_type == RATE:
self.rate(metric_name, value, tags=tags)
tags += ["%s:%s" % (d[0][1], d[1]) for d in zip(desc, row[:len(desc)])]

result = cursor.fetchone()
del cursor
# [(metric-map, value), (metric-map, value), ...]
# metric-map is: (dd_name, "rate"|"gauge")
# shift the results since the first columns will be the "descriptors"
values = zip([scope['metrics'][c] for c in cols], row[len(desc):])

# To submit simply call the function for each value v
# v[0] == (metric_name, submit_function)
# v[1] == the actual value
# tags are
[v[0][1](self, v[0][0], v[1], tags=tags) for v in values]


def get_connection(self, key, host, port, user, password, dbname):

"Get and memoize connections to instances"
if key in self.dbs:
return self.dbs[key]

elif host != '' and user != '':
elif host != "" and user != "":
try:
import psycopg2 as pg
if host == 'localhost' and password == '':
# Use ident method
return pg.connect("user=%s dbname=%s" % (user, dbname))
elif port != '':
return pg.connect(host=host, port=port, user=user,
password=password, database=dbname)
else:
return pg.connect(host=host, user=user, password=password,
database=dbname)
except ImportError:
raise ImportError("psycopg2 library can not be imported. Please check the installation instruction on the Datadog Website")
raise ImportError("psycopg2 library cannot be imported. Please check the installation instruction on the Datadog Website.")

if host == 'localhost' and password == '':
# Use ident method
Expand All @@ -123,6 +215,13 @@ def get_connection(self, key, host, port, user, password, dbname):
else:
connection = pg.connect(host=host, user=user, password=password,
database=dbname)
else:
if host is None or host == "":
raise CheckException("Please specify a Postgres host to connect to.")
elif user is None or user == "":
raise CheckException("Please specify a user to connect to Postgres as.")
else:
raise CheckException("Cannot connect to Postgres.")

self.dbs[key] = connection
return connection
Expand All @@ -135,20 +234,25 @@ def check(self, instance):
password = instance.get('password', '')
tags = instance.get('tags', [])
dbname = instance.get('database', 'postgres')
relations = instance.get('relations', [])

key = '%s:%s' % (host, port)
db = self.get_connection(key, host, port, user, password, dbname)

# Clean up tags in case there was a None entry in the instance
# e.g. if the yaml contains tags: but no actual tags
if tags is None:
tags = []
key = '%s:%s' % (host, port)

db = self.get_connection(key, host, port, user, password, dbname)

# preset tags to the database name
tags.extend(["db:%s" % dbname])

# Check version
version = self._get_version(key, db)
self.log.debug("Running check against version %s" % version)

# Collect metrics
self._collect_stats(key, db, tags)
self._collect_stats(key, db, tags, relations)

@staticmethod
def parse_agent_config(agentConfig):
Expand All @@ -168,3 +272,20 @@ def parse_agent_config(agentConfig):
}

return False

if __name__ == '__main__':
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this ?

The agent accept a check argument if you want to do some tests, e.g.:

python agent.py check postgres

(It will read the postgres.yaml file in your conf.d directory)

# Assumes that you have a pg db that bears the same name
# as your unix username. Furthermore assumes that you can connect
# using ident.
# if you want to collect per-table stats, simply pass them as arguments
p = PostgreSql("", {}, {})
# get current username
import getpass, sys
usr = getpass.getuser()
p.check({"host": "localhost",
"port": 5432,
"username": usr,
"password": "",
"tags": ["code"],
"database": usr,
"relations": sys.argv[1:]})
13 changes: 12 additions & 1 deletion conf.d/postgres.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,15 @@ instances:
# dbname: db_name
# tags:
# - optional_tag1
# - optional_tag2
# - optional_tag2

# Custom-metrics section

# You can now track per-relation (table) metrics
# You need to specify the list. Each relation
# generates a lot of metrics (10 + 10 per index)
# so you want to only use the ones you really care about

# relations:
# - my_table
# - my_other_table