Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default prometheus metrics to clients #652

Merged
merged 16 commits into from
Apr 27, 2022
31 changes: 29 additions & 2 deletions baseplate/clients/memcache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Tuple
from typing import Union

from prometheus_client import Gauge
from pymemcache.client.base import PooledClient

from baseplate import Span
Expand Down Expand Up @@ -112,7 +113,7 @@ def parse(self, key_path: str, raw_config: config.RawConfig) -> "MemcacheContext
serializer=self.serializer,
deserializer=self.deserializer,
)
return MemcacheContextFactory(pool)
return MemcacheContextFactory(pool, key_path)


class MemcacheContextFactory(ContextFactory):
Expand All @@ -129,9 +130,35 @@ class MemcacheContextFactory(ContextFactory):

"""

def __init__(self, pooled_client: PooledClient):
PROM_PREFIX = "bp_memcached_pool"
PROM_LABELS = ["pool"]

pool_size_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this pool",
PROM_LABELS,
)

used_connections_gauge = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of connections in this pool currently in use",
PROM_LABELS,
)

free_connections_gauge = Gauge(
f"{PROM_PREFIX}_free_connections",
"Number of free connections in this pool",
PROM_LABELS,
)

def __init__(self, pooled_client: PooledClient, name: str = "default"):
self.pooled_client = pooled_client

pool = self.pooled_client.client_pool
self.pool_size_gauge.labels(name).set_function(lambda: pool.max_size)
self.free_connections_gauge.labels(name).set_function(lambda: len(pool.free))
self.used_connections_gauge.labels(name).set_function(lambda: len(pool.used))

def report_memcache_runtime_metrics(self, batch: metrics.Client) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder why this memcache method is report_memcache_runtime_metrics but all the other clients the equivalent method is report_runtime_metrics. I wonder why the difference.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this might be a bug unless it's called outside of the baseplate. It's not referenced anywhere, while those with the latter name are called from

elif hasattr(value, "report_runtime_metrics"):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. i see. thanks for the explanation

pool = self.pooled_client.client_pool
batch.gauge("pool.in_use").replace(len(pool.used))
Expand Down
32 changes: 30 additions & 2 deletions baseplate/clients/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import redis

from prometheus_client import Gauge

# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0
try:
from redis.client import StrictPipeline as Pipeline # type: ignore
Expand Down Expand Up @@ -75,7 +77,7 @@ def __init__(self, **kwargs: Any):

def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory":
connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return RedisContextFactory(connection_pool)
return RedisContextFactory(connection_pool, key_path)


class RedisContextFactory(ContextFactory):
Expand All @@ -92,9 +94,35 @@ class RedisContextFactory(ContextFactory):

"""

def __init__(self, connection_pool: redis.ConnectionPool):
PROM_PREFIX = "bp_redis_pool"
PROM_LABELS = ["pool"]

max_connections = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this redisbp pool",
PROM_LABELS,
)
idle_connections = Gauge(
f"{PROM_PREFIX}_idle_connections",
"Number of idle connections in this redisbp pool",
PROM_LABELS,
)
open_connections = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of open connections in this redisbp pool",
PROM_LABELS,
)

def __init__(self, connection_pool: redis.ConnectionPool, name: str = "redis"):
self.connection_pool = connection_pool

if isinstance(connection_pool, redis.BlockingConnectionPool):
self.max_connections.labels(name).set_function(lambda: connection_pool.max_connections)
self.idle_connections.labels(name).set_function(connection_pool.pool.qsize)
self.open_connections.labels(name).set_function(
lambda: len(connection_pool._connections) # type: ignore
)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
if not isinstance(self.connection_pool, redis.BlockingConnectionPool):
return
Expand Down
29 changes: 27 additions & 2 deletions baseplate/clients/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import rediscluster

from prometheus_client import Gauge
from rediscluster.pipeline import ClusterPipeline

from baseplate import Span
Expand Down Expand Up @@ -331,7 +332,7 @@ def __init__(self, **kwargs: Any):

def parse(self, key_path: str, raw_config: config.RawConfig) -> "ClusterRedisContextFactory":
connection_pool = cluster_pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return ClusterRedisContextFactory(connection_pool)
return ClusterRedisContextFactory(connection_pool, key_path)


class ClusterRedisContextFactory(ContextFactory):
Expand All @@ -346,9 +347,33 @@ class ClusterRedisContextFactory(ContextFactory):
:param connection_pool: A connection pool.
"""

def __init__(self, connection_pool: rediscluster.ClusterConnectionPool):
PROM_PREFIX = "bp_redis_cluster_pool"
PROM_LABELS = ["pool"]

max_connections_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this redis cluster pool",
PROM_LABELS,
)
open_connections_gauge = Gauge(
f"{PROM_PREFIX}_open_connections",
"Number of open connections in this redis cluster pool",
PROM_LABELS,
)

def __init__(
self, connection_pool: rediscluster.ClusterConnectionPool, name: str = "redis_cluster"
):
self.connection_pool = connection_pool

if isinstance(connection_pool, rediscluster.ClusterBlockingConnectionPool):
self.max_connections_gauge.labels(name).set_function(
lambda: connection_pool.max_connections
)
self.open_connections_gauge.labels(name).set_function(
lambda: len(connection_pool._connections)
)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
if not isinstance(self.connection_pool, rediscluster.ClusterBlockingConnectionPool):
return
Expand Down
39 changes: 37 additions & 2 deletions baseplate/clients/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Tuple
from typing import Union

from prometheus_client import Gauge
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy.engine import Connection
Expand Down Expand Up @@ -125,7 +126,7 @@ def parse(
engine = engine_from_config(
raw_config, secrets=self.secrets, prefix=f"{key_path}.", **self.kwargs
)
return SQLAlchemySessionContextFactory(engine)
return SQLAlchemySessionContextFactory(engine, key_path)


Parameters = Optional[Union[Dict[str, Any], Sequence[Any]]]
Expand Down Expand Up @@ -155,12 +156,46 @@ class SQLAlchemyEngineContextFactory(ContextFactory):

"""

def __init__(self, engine: Engine):
PROM_PREFIX = "bp_sqlalchemy_pool"
PROM_LABELS = ["pool"]

max_connections_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections allowed in this pool",
PROM_LABELS,
)

checked_in_connections_gauge = Gauge(
f"{PROM_PREFIX}_idle_connections",
"Number of available, checked in, connections in this pool",
PROM_LABELS,
)

checked_out_connections_gauge = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of connections in use, or checked out, in this pool",
PROM_LABELS,
)

overflow_connections_gauge = Gauge(
f"{PROM_PREFIX}_overflow_connections",
"Number of connections over the desired size of this pool",
PROM_LABELS,
)

def __init__(self, engine: Engine, name: str = "sqlalchemy"):
self.engine = engine.execution_options()
event.listen(self.engine, "before_cursor_execute", self.on_before_execute, retval=True)
event.listen(self.engine, "after_cursor_execute", self.on_after_execute)
event.listen(self.engine, "handle_error", self.on_error)

pool = self.engine.pool
if isinstance(pool, QueuePool):
self.max_connections_gauge.labels(name).set_function(pool.size)
self.checked_in_connections_gauge.labels(name).set_function(pool.checkedin)
self.checked_out_connections_gauge.labels(name).set_function(pool.checkedout)
self.overflow_connections_gauge.labels(name).set_function(pool.overflow)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
pool = self.engine.pool
if not isinstance(pool, QueuePool):
Expand Down
20 changes: 20 additions & 0 deletions baseplate/clients/thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Iterator
from typing import Optional

from prometheus_client import Gauge
from thrift.protocol.TProtocol import TProtocolException
from thrift.Thrift import TApplicationException
from thrift.Thrift import TException
Expand Down Expand Up @@ -70,6 +71,21 @@ class ThriftContextFactory(ContextFactory):

"""

PROM_PREFIX = "bp_thrift_pool"
PROM_LABELS = ["client_cls"]

max_connections_gauge = Gauge(
f"{PROM_PREFIX}_max_size",
"Maximum number of connections in this thrift pool before blocking",
PROM_LABELS,
)

active_connections_gauge = Gauge(
f"{PROM_PREFIX}_active_connections",
"Number of connections currently in use in this thrift pool",
PROM_LABELS,
)

def __init__(self, pool: ThriftConnectionPool, client_cls: Any):
self.pool = pool
self.client_cls = client_cls
Expand All @@ -83,6 +99,10 @@ def __init__(self, pool: ThriftConnectionPool, client_cls: Any):
},
)

pool_name = type(self.client_cls).__name__
self.max_connections_gauge.labels(pool_name).set_function(lambda: self.pool.size)
self.active_connections_gauge.labels(pool_name).set_function(lambda: self.pool.checkedout)

def report_runtime_metrics(self, batch: metrics.Client) -> None:
batch.gauge("pool.size").replace(self.pool.size)
batch.gauge("pool.in_use").replace(self.pool.checkedout)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"requests>=2.21.0,<3.0",
"thrift-unofficial>=0.14.1,<1.0",
"gevent>=20.5.0",
"prometheus-client>=0.12.0",
],
extras_require=extras_require,
scripts=[
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/clients/memcache_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
del pymemcache

from baseplate.lib.config import ConfigurationError
from baseplate.clients.memcache import pool_from_config
from baseplate.clients.memcache import pool_from_config, MemcacheContextFactory
from baseplate.clients.memcache import lib as memcache_lib


Expand Down Expand Up @@ -54,6 +54,16 @@ def test_nodelay(self):
)
self.assertEqual(pool.no_delay, False)

def test_metrics(self):
max_pool_size = "123"
ctx = MemcacheContextFactory(
pool_from_config(
{"memcache.endpoint": "localhost:1234", "memcache.max_pool_size": max_pool_size}
)
)
metric = ctx.total_connections_gauge.collect()
self.assertEqual(metric[0].samples[0].value, float(max_pool_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be useful to assert the labels are set correctly?



class SerdeTests(unittest.TestCase):
def test_serialize_str(self):
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/clients/redis_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
del redis

from baseplate.lib.config import ConfigurationError
from baseplate.clients.redis import pool_from_config
from baseplate.clients.redis import pool_from_config, RedisContextFactory


class PoolFromConfigTests(unittest.TestCase):
Expand All @@ -23,6 +23,16 @@ def test_basic_url(self):
self.assertEqual(pool.connection_kwargs["port"], 1234)
self.assertEqual(pool.connection_kwargs["db"], 0)

def test_metrics(self):
max_connections = "123"
ctx = RedisContextFactory(
pool_from_config(
{"redis.url": "redis://localhost:1234/0", "redis.max_connections": max_connections}
)
)
metric = ctx.max_connections.collect()
self.assertEqual(metric[0].samples[0].value, float(max_connections))

def test_timeouts(self):
pool = pool_from_config(
{
Expand Down