Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Memcache] Improve tests and bug fixes #1490

Merged
merged 1 commit into from
Apr 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions checks.d/mcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# 3rd party
import memcache

# Reference: http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
# Ref: http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
# Name Type Meaning
# ----------------------------------
# pid 32u Process id of this server process
Expand Down Expand Up @@ -61,6 +61,7 @@
# http://www.couchbase.org/wiki/display/membase/Membase+Statistics
# https://github.com/membase/ep-engine/blob/master/docs/stats.org


class Memcache(AgentCheck):

SOURCE_TYPE_NAME = 'memcached'
Expand Down Expand Up @@ -111,7 +112,10 @@ def _get_metrics(self, server, port, tags):
mc = memcache.Client(["%s:%s" % (server, port)])
raw_stats = mc.get_stats()

assert len(raw_stats) == 1 and len(raw_stats[0]) == 2, "Malformed response: %s" % raw_stats
assert len(raw_stats) == 1 and len(raw_stats[0]) == 2,\
"Malformed response: %s" % raw_stats


# Access the dict
stats = raw_stats[0][1]
for metric in stats:
Expand All @@ -123,7 +127,8 @@ def _get_metrics(self, server, port, tags):
# Tweak the name if it's a rate so that we don't use the exact
# same metric name as the memcache documentation
if metric in self.RATES:
our_metric = self.normalize(metric.lower() + "_rate", 'memcache')
our_metric = self.normalize(
"{0}_rate".format(metric.lower()), 'memcache')
self.rate(our_metric, float(stats[metric]), tags=tags)

# calculate some metrics based on other metrics.
Expand Down Expand Up @@ -157,38 +162,37 @@ def _get_metrics(self, server, port, tags):
pass

uptime = stats.get("uptime", 0)
self.service_check(self.SERVICE_CHECK, AgentCheck.OK,
self.service_check(
self.SERVICE_CHECK, AgentCheck.OK,
tags=service_check_tags,
message="Server has been up for %s seconds" % uptime)
except AssertionError:
self.service_check(self.SERVICE_CHECK, AgentCheck.CRITICAL,
self.service_check(
self.SERVICE_CHECK, AgentCheck.CRITICAL,
tags=service_check_tags,
message="Unable to fetch stats from server")
raise Exception("Unable to retrieve stats from memcache instance: " + server + ":" + str(port) + ". Please check your configuration")
raise Exception(
"Unable to retrieve stats from memcache instance: {0}:{1}."
"Please check your configuration".format(server, port))

if mc is not None:
mc.disconnect_all()
self.log.debug("Disconnected from memcached")
del mc

def check(self, instance):
socket = instance.get('socket', None)
server = instance.get('url', None)
socket = instance.get('socket')
server = instance.get('url')
if not server and not socket:
raise Exception("Missing or null 'url' and 'socket' in mcache config")

# Hacky monkeypatch to fix a memory leak in the memcache library.
# See https://github.com/DataDog/dd-agent/issues/278 for details.
try:
memcache.Client.debuglog = None
except Exception:
pass
raise Exception('Either "url" or "socket" must be configured')

if socket:
server = 'unix'
port = socket
else:
port = int(instance.get('port', self.DEFAULT_PORT))
tags = instance.get('tags', None)
custom_tags = instance.get('tags') or []

tags = ["url:{0}:{1}".format(server, port)] + custom_tags

self._get_metrics(server, port, tags)
8 changes: 6 additions & 2 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,13 @@ def __init__(self, *args, **kwargs):

self.check = None

def is_travis(self):
return "TRAVIS" in os.environ

# Helper function when testing rates
def run_check_twice(self, config, agent_config=None, mocks=None):
self.run_check(config, agent_config, mocks)
def run_check_twice(self, config, agent_config=None, mocks=None,
force_reload=False):
self.run_check(config, agent_config, mocks, force_reload)
time.sleep(1)
self.run_check(config, agent_config, mocks)

Expand Down
232 changes: 126 additions & 106 deletions tests/test_mcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,129 @@
import time
from subprocess import Popen, PIPE

from tests.common import load_check
from tests.common import AgentCheckTest

from checks import AgentCheck

# 3rd party
import memcache


GAUGES = [
"total_items",
"curr_items",
"limit_maxbytes",
"uptime",
"bytes",
"curr_connections",
"connection_structures",
"threads",
"pointer_size",

# Computed metrics
"get_hit_percent",
"fill_percent",
"avg_item_size"
]

RATES = [
"rusage_user",
"rusage_system",
"cmd_get",
"cmd_set",
"cmd_flush",
"get_hits",
"get_misses",
"delete_misses",
"delete_hits",
"evictions",
"bytes_read",
"bytes_written",
"cas_misses",
"cas_hits",
"cas_badval",
"total_connections"
]

SERVICE_CHECK = 'memcache.can_connect'

PORT = 11211


@attr(requires='memcache')
class TestMemCache(unittest.TestCase):
def is_travis(self):
return 'TRAVIS' in os.environ
class TestMemCache(AgentCheckTest):

CHECK_NAME = "mcache"

def setUp(self):
self.agent_config = {
"memcache_server": "localhost",
"memcache_instance_1": "localhost:11211:mytag",
"memcache_instance_2": "localhost:11211:mythirdtag",
c = memcache.Client(["localhost:{0}".format(PORT)])
c.set("foo", "bar")
c.get("foo")

def testCoverage(self):
config = {
'init_config': {},
'instances': [
{'url': "localhost"},
{'url': "localhost", 'port': PORT, 'tags': ['instance:mytag']},
{'url': "localhost", 'port': PORT, 'tags': ['foo']},
{'socket': "foo/bar"}
]
}

self.assertRaises(Exception, self.run_check, config)

tag_set = [
["url:localhost:11211"],
["url:localhost:11211", "instance:mytag"],
["url:localhost:11211", "foo"]
]

for tags in tag_set:
for m in GAUGES:
self.assertMetric("memcache.{0}".format(m), tags=tags, count=1)

good_service_check_tags = ["host:localhost", "port:{0}".format(PORT)]
bad_service_check_tags = ["host:unix", "port:foo/bar"]

self.assertServiceCheck(
SERVICE_CHECK, status=AgentCheck.OK,
tags=good_service_check_tags, count=3)
self.assertServiceCheck(
SERVICE_CHECK, status=AgentCheck.CRITICAL,
tags=bad_service_check_tags, count=1)

self.coverage_report()

config = {
'init_config': {},
'instances': [
{'url': "localhost"},
{'url': "localhost", 'port': PORT, 'tags': ['instance:mytag']},
{'url': "localhost", 'port': PORT, 'tags': ['foo']},
]
}
self.conf = {'init_config': {}, 'instances': [
{'url': "localhost"},
{'url': "localhost", 'port': 11211, 'tags': ['instance:mytag']},
{'url': "localhost", 'port': 11211, 'tags': ['instance:mythirdtag']},
]}
self.c = load_check('mcache', self.conf, self.agent_config)

self.run_check_twice(config, force_reload=True)
for tags in tag_set:
for m in GAUGES:
self.assertMetric("memcache.{0}".format(m), tags=tags, count=1)
for m in RATES:
self.assertMetric(
"memcache.{0}_rate".format(m), tags=tags, count=1)

good_service_check_tags = ["host:localhost", "port:{0}".format(PORT)]

self.assertServiceCheck(
SERVICE_CHECK, status=AgentCheck.OK,
tags=good_service_check_tags, count=3)

self.coverage_report()

def _countConnections(self, port):
pid = os.getpid()
p1 = Popen(['lsof', '-a', '-p%s' %
pid, '-i4'], stdout=PIPE)
p1 = Popen(
['lsof', '-a', '-p%s' % pid, '-i4'], stdout=PIPE)
p2 = Popen(["grep", ":%s" % port], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["wc", "-l"], stdin=p2.stdout, stdout=PIPE)
output = p3.communicate()[0]
Expand All @@ -39,73 +136,24 @@ def testConnectionLeaks(self):
for i in range(3):
# Count open connections to localhost:11211, should be 0
self.assertEquals(self._countConnections(11211), 0)
new_conf = {'init_config': {}, 'instances': [
{'url': "localhost"},]
new_conf = {'init_config': {}, 'instances': [
{'url': "localhost"}]
}
self.c.check(new_conf['instances'][0])
self.run_check(new_conf)
# Verify that the count is still 0
self.assertEquals(self._countConnections(11211), 0)

def testMetrics(self):
for instance in self.conf['instances']:
self.c.check(instance)
# Sleep for 1 second so the rate interval >=1
time.sleep(1)
self.c.check(instance)

r = self.c.get_metrics()

# Check that we got metrics from 3 hosts (aka all but the dummy host)
self.assertEquals(len([t for t in r if t[0] == "memcache.total_items"]), 3, r)

# Check that we got 23 metrics for a specific host
self.assertEquals(len([t for t in r if t[3].get('tags') == ["instance:mythirdtag"]]), 26, r)

def testTagging(self):
instance = {
'url': 'localhost',
'port': 11211,
'tags': ['regular_old_tag']
}

self.c.check(instance)
# Sleep for 1 second so the rate interval >=1
time.sleep(1)
self.c.check(instance)

r = self.c.get_metrics()

# Check the tags
self.assertEquals(len([t for t in r if t[3].get('tags') == ["regular_old_tag"]]), 26, r)

conf = {
def testMemoryLeak(self):
config = {
'init_config': {},
'instances': [{
'url': 'localhost',
'port': 11211,
'tags': ["instance:localhost_11211"],
}
'instances': [
{'url': "localhost"},
{'url': "localhost", 'port': PORT, 'tags': ['instance:mytag']},
{'url': "localhost", 'port': PORT, 'tags': ['foo']},
]
}
instance = conf['instances'][0]

self.c.check(instance)
# Sleep for 1 second so the rate interval >=1
time.sleep(1)
self.c.check(instance)

r = self.c.get_metrics()

# Check the tags
self.assertEquals(len([t for t in r if t[3].get('tags') == ["instance:localhost_11211"]]), 26, r)

def testDummyHost(self):
self.assertRaises(Exception, self.c.check, {'url': 'dummy', 'port': 11211, 'tags': ['instance:myothertag']})

def testMemoryLeak(self):
for instance in self.conf['instances']:
self.c.check(instance)
self.c.get_metrics()
self.run_check(config)

import gc
if not self.is_travis():
Expand All @@ -114,39 +162,11 @@ def testMemoryLeak(self):
try:
start = len(gc.garbage)
for i in range(10):
for instance in self.conf['instances']:
self.c.check(instance)
time.sleep(1)
self.c.get_metrics()
self.run_check(config)
time.sleep(0.3)
self.check.get_metrics()

end = len(gc.garbage)
self.assertEquals(end - start, 0, gc.garbage)
finally:
gc.set_debug(0)

def test_service_checks(self):
for instance in self.conf['instances']:
self.c.check(instance)
svc_checks = self.c.get_service_checks()
self.assertEquals(len(svc_checks), len(self.conf['instances']))

self.assertEquals(svc_checks[0]['check'], self.c.SERVICE_CHECK)
self.assertEquals(svc_checks[0]['status'], AgentCheck.OK)
assert 'up for' in svc_checks[0]['message']

# Check an invalid one.
try:
self.c.check({
'url': 'localhost',
'port': 12345
})
except Exception:
# We expect an exception here. Just ignore it.
pass
svc_checks = self.c.get_service_checks()
self.assertEquals(len(svc_checks), 1)
self.assertEquals(svc_checks[0]['check'], self.c.SERVICE_CHECK)
self.assertEquals(svc_checks[0]['status'], AgentCheck.CRITICAL)

if __name__ == '__main__':
unittest.main()