Skip to content
This repository has been archived by the owner on Oct 8, 2024. It is now read-only.

Commit

Permalink
Tempo client config fix (#89)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Kulewicz <mateusz.kulewicz@canonical.com>
  • Loading branch information
PietroPasotti and mmkay authored Apr 17, 2024
1 parent 3a28770 commit 35b960b
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 244 deletions.
60 changes: 59 additions & 1 deletion charmcraft.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,64 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

name: tempo-k8s
type: charm

assumes:
- k8s-api

# Juju 3.0.3+ needed for secrets and open-port
- juju >= 3.0.3

description: |
Tempo is a distributed tracing backend by Grafana.
summary: |
Tempo is a distributed tracing backend by Grafana.
containers:
tempo:
resource: tempo-image
mounts:
- storage: data
location: /tmp/tempo

resources:
tempo-image:
type: oci-image
description: OCI image for Tempo
# Included for simplicity in integration tests
# see https://hub.docker.com/r/grafana/tempo/tags
upstream-source: grafana/tempo:2.4.0

provides:
profiling-endpoint:
interface: parca_scrape
grafana-dashboard:
interface: grafana_dashboard
grafana-source:
interface: grafana_datasource
metrics-endpoint:
interface: prometheus_scrape
tracing:
interface: tracing

requires:
logging:
interface: loki_push_api
ingress:
interface: ingress
limit: 1

storage:
data:
type: filesystem
location: /tempo-data

actions:
list-receivers:
description: |
Returns a list of all enabled receiver endpoints.
bases:
- build-on:
- name: "ubuntu"
Expand All @@ -15,3 +72,4 @@ parts:
charm-binary-python-packages:
- "pydantic>=2"
- "opentelemetry-exporter-otlp-proto-http==1.21.0"

4 changes: 3 additions & 1 deletion lib/charms/loki_k8s/v0/loki_push_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,9 @@ def _alert_rules_error(self, event):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 28
LIBPATCH = 29

PYDEPS = ["cosl"]

logger = logging.getLogger(__name__)

Expand Down
13 changes: 6 additions & 7 deletions lib/charms/prometheus_k8s/v0/prometheus_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def _on_scrape_targets_changed(self, event):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 44
LIBPATCH = 45

PYDEPS = ["cosl"]

Expand Down Expand Up @@ -1537,12 +1537,11 @@ def set_scrape_job_spec(self, _=None):
relation.data[self._charm.app]["scrape_metadata"] = json.dumps(self._scrape_metadata)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(self._scrape_jobs)

if alert_rules_as_dict:
# Update relation data with the string representation of the rule file.
# Juju topology is already included in the "scrape_metadata" field above.
# The consumer side of the relation uses this information to name the rules file
# that is written to the filesystem.
relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict)
# Update relation data with the string representation of the rule file.
# Juju topology is already included in the "scrape_metadata" field above.
# The consumer side of the relation uses this information to name the rules file
# that is written to the filesystem.
relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict)

def _set_unit_ip(self, _=None):
"""Set unit host address.
Expand Down
54 changes: 34 additions & 20 deletions lib/charms/tempo_k8s/v1/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self, *args):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 3
LIBPATCH = 4

PYDEPS = ["pydantic>=2"]

Expand All @@ -106,7 +106,7 @@ def __init__(self, *args):
"otlp_grpc", "otlp_http", "zipkin", "tempo", "jaeger_http_thrift", "jaeger_grpc"
]

RawIngester = Tuple[IngesterProtocol, int]
RawIngester = Tuple[IngesterProtocol, int, str]
BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"}


Expand All @@ -126,6 +126,8 @@ class DatabagModel(BaseModel):
"""Base databag model."""

model_config = ConfigDict(
# ignore extra fields in config
extra="ignore",
# Allow instantiating this class by field name (instead of forcing alias).
populate_by_name=True,
# Custom config key: whether to nest the whole datastructure (as json)
Expand Down Expand Up @@ -172,7 +174,7 @@ def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True):
databag = {}
nest_under = self.model_config.get("_NEST_UNDER")
if nest_under:
databag[nest_under] = self.json()
databag[nest_under] = self.json(exclude_none=True)

dct = self.model_dump()
for key, field in self.model_fields.items(): # type: ignore
Expand All @@ -188,6 +190,7 @@ class Ingester(BaseModel): # noqa: D101

protocol: IngesterProtocol
port: int
path: Optional[str] = None


class TracingProviderAppData(DatabagModel): # noqa: D101
Expand Down Expand Up @@ -394,8 +397,8 @@ def _on_relation_event(self, _):
TracingProviderAppData(
host=self._host,
ingesters=[
Ingester(port=port, protocol=protocol)
for protocol, port in self._ingesters
Ingester(port=port, protocol=protocol, path=path)
for protocol, port, path in self._ingesters
],
).dump(relation.data[self._charm.app])

Expand Down Expand Up @@ -547,28 +550,39 @@ def get_all_endpoints(
self, relation: Optional[Relation] = None
) -> Optional[TracingProviderAppData]:
"""Unmarshalled relation data."""
if not self.is_ready(relation or self._relation):
relation = relation or self._relation
if not self.is_ready(relation):
return
return TracingProviderAppData.load(relation.data[relation.app]) # type: ignore

def _get_ingester(
self, relation: Optional[Relation], protocol: IngesterProtocol, ssl: bool = False
):
ep = self.get_all_endpoints(relation)
if not ep:
return None
try:
ingester: Ingester = next(filter(lambda i: i.protocol == protocol, ep.ingesters))
if ingester.protocol in ["otlp_grpc", "jaeger_grpc"]:
if ssl:
logger.warning("unused ssl argument - was the right protocol called?")
return f"{ep.host}:{ingester.port}"
if ssl:
return f"https://{ep.host}:{ingester.port}"
return f"http://{ep.host}:{ingester.port}"
except StopIteration:
logger.error(f"no ingester found with protocol={protocol!r}")
app_data = self.get_all_endpoints(relation)
if not app_data:
return None
receivers: List[Ingester] = list(
filter(lambda i: i.protocol == protocol, app_data.ingesters)
)
if not receivers:
logger.error(f"no receiver found with protocol={protocol!r}")
return
if len(receivers) > 1:
logger.error(
f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}"
)
return

receiver = receivers[0]
# FIXME: when to use https? ASSUME HTTP
base_url = f"http://{app_data.host}:{receiver.port}"

if receiver.protocol.endswith("grpc"):
# TCP protocols don't want an http/https scheme prefix
base_url = base_url.split("://")[1]

suffix = receiver.path or ""
return f"{base_url}{suffix}"

def otlp_grpc_endpoint(self, relation: Optional[Relation] = None) -> Optional[str]:
"""Ingester endpoint for the ``otlp_grpc`` protocol."""
Expand Down
74 changes: 49 additions & 25 deletions lib/charms/tempo_k8s/v2/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self, *args):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2
LIBPATCH = 3

PYDEPS = ["pydantic"]

Expand All @@ -117,15 +117,13 @@ def __init__(self, *args):
"zipkin",
"kafka",
"opencensus",
"tempo", # legacy, renamed to tempo_http
"tempo_http",
"tempo_grpc",
"otlp_grpc",
"otlp_http",
"jaeger_grpc",
# "jaeger_grpc",
"jaeger_thrift_compact",
"jaeger_thrift_http",
"jaeger_http_thrift", # legacy, renamed to jaeger_thrift_http
"jaeger_thrift_binary",
]

Expand Down Expand Up @@ -302,11 +300,14 @@ class TracingProviderAppData(DatabagModel): # noqa: D101
"""Application databag model for the tracing provider."""

host: str
"""Server hostname."""
"""Server hostname (local fqdn)."""

receivers: List[Receiver]
"""Enabled receivers and ports at which they are listening."""

external_url: Optional[str] = None
"""Server url. If an ingress is present, it will be the ingress address."""


class TracingRequirerAppData(DatabagModel): # noqa: D101
"""Application databag model for the tracing requirer."""
Expand Down Expand Up @@ -492,13 +493,16 @@ def __init__(
self,
charm: CharmBase,
host: str,
external_url: Optional[str] = None,
relation_name: str = DEFAULT_RELATION_NAME,
):
"""Initialize.
Args:
charm: a `CharmBase` instance that manages this instance of the Tempo service.
host: address of the node hosting the tempo server.
external_url: external address of the node hosting the tempo server,
if an ingress is present.
relation_name: an optional string name of the relation between `charm`
and the Tempo charmed service. The default is "tracing".
Expand All @@ -519,6 +523,7 @@ def __init__(
super().__init__(charm, relation_name + "tracing-provider-v2")
self._charm = charm
self._host = host
self._external_url = external_url
self._relation_name = relation_name
self.framework.observe(
self._charm.on[relation_name].relation_joined, self._on_relation_event
Expand Down Expand Up @@ -585,6 +590,7 @@ def publish_receivers(self, receivers: Sequence[RawReceiver]):
try:
TracingProviderAppData(
host=self._host,
external_url=self._external_url,
receivers=[
Receiver(port=port, protocol=protocol) for protocol, port in receivers
],
Expand Down Expand Up @@ -612,16 +618,17 @@ class EndpointRemovedEvent(RelationBrokenEvent):
class EndpointChangedEvent(_AutoSnapshotEvent):
"""Event representing a change in one of the receiver endpoints."""

__args__ = ("host", "_ingesters")
__args__ = ("host", "external_url", "_receivers")

if TYPE_CHECKING:
host = "" # type: str
_ingesters = [] # type: List[dict]
external_url = "" # type: str
_receivers = [] # type: List[dict]

@property
def receivers(self) -> List[Receiver]:
"""Cast receivers back from dict."""
return [Receiver(**i) for i in self._ingesters]
return [Receiver(**i) for i in self._receivers]


class TracingEndpointRequirerEvents(CharmEvents):
Expand Down Expand Up @@ -776,7 +783,9 @@ def _on_tracing_relation_changed(self, event):
return

data = TracingProviderAppData.load(relation.data[relation.app])
self.on.endpoint_changed.emit(relation, data.host, [i.dict() for i in data.receivers]) # type: ignore
self.on.endpoint_changed.emit( # type: ignore
relation, data.host, data.external_url, [i.dict() for i in data.receivers]
)

def _on_tracing_relation_broken(self, event: RelationBrokenEvent):
"""Notify the providers that the endpoint is broken."""
Expand All @@ -787,28 +796,43 @@ def get_all_endpoints(
self, relation: Optional[Relation] = None
) -> Optional[TracingProviderAppData]:
"""Unmarshalled relation data."""
if not self.is_ready(relation or self._relation):
relation = relation or self._relation
if not self.is_ready(relation):
return
return TracingProviderAppData.load(relation.data[relation.app]) # type: ignore

def _get_endpoint(
self, relation: Optional[Relation], protocol: ReceiverProtocol, ssl: bool = False
):
ep = self.get_all_endpoints(relation)
if not ep:
self, relation: Optional[Relation], protocol: ReceiverProtocol
) -> Optional[str]:
app_data = self.get_all_endpoints(relation)
if not app_data:
return None
try:
receiver: Receiver = next(filter(lambda i: i.protocol == protocol, ep.receivers))
if receiver.protocol in ["otlp_grpc", "jaeger_grpc"]:
if ssl:
logger.warning("unused ssl argument - was the right protocol called?")
return f"{ep.host}:{receiver.port}"
if ssl:
return f"https://{ep.host}:{receiver.port}"
return f"http://{ep.host}:{receiver.port}"
except StopIteration:
receivers: List[Receiver] = list(
filter(lambda i: i.protocol == protocol, app_data.receivers)
)
if not receivers:
logger.error(f"no receiver found with protocol={protocol!r}")
return None
return
if len(receivers) > 1:
logger.error(
f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}"
)
return

receiver = receivers[0]
# if there's an external_url argument (v2.5+), use that. Otherwise, we use the tempo local fqdn
if app_data.external_url:
url = app_data.external_url
else:
# FIXME: if we don't get an external url but only a
# hostname, we don't know what scheme we need to be using. ASSUME HTTP
url = f"http://{app_data.host}:{receiver.port}"

if receiver.protocol.endswith("grpc"):
# TCP protocols don't want an http/https scheme prefix
url = url.split("://")[1]

return url

def get_endpoint(
self, protocol: ReceiverProtocol, relation: Optional[Relation] = None
Expand Down
Loading

0 comments on commit 35b960b

Please sign in to comment.