Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for target_info metric w/ shared label cache #19397

Merged
merged 26 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
961ca03
Collect target info metric
UTXOnly Jan 14, 2025
1ceb079
Corrected cache error
UTXOnly Jan 14, 2025
27a1a97
Log cleanup
UTXOnly Jan 14, 2025
cf56931
Debug log cleanup
UTXOnly Jan 14, 2025
9bae29c
Add tests to test cache w/ target_info
UTXOnly Jan 14, 2025
8836f4b
Add test for unordered target info with cache shared labels
UTXOnly Jan 14, 2025
fcd9b4a
Add transformer for info metric type
UTXOnly Jan 14, 2025
940c741
Cleaned but not working first iteration
UTXOnly Jan 15, 2025
6a32e2e
Refactor tag caching logic
UTXOnly Jan 15, 2025
9e8877a
Validate type for config options
UTXOnly Jan 15, 2025
b5ffdd1
Added changelog entry
UTXOnly Jan 15, 2025
6b8bdf4
Fix validate shared labels failing test
UTXOnly Jan 15, 2025
3060d91
Fix whitespace causing lint failure
UTXOnly Jan 15, 2025
690adb0
Fix target info with cache, add tests
UTXOnly Jan 16, 2025
89793e8
Linting
UTXOnly Jan 16, 2025
a1d937a
Cleanup debug log
UTXOnly Jan 16, 2025
8d33715
Comment cleanup
UTXOnly Jan 16, 2025
066036d
Refactor label population
UTXOnly Jan 16, 2025
707d427
Reformat imports
UTXOnly Jan 16, 2025
77e635d
Reorder target_info_labels definition
UTXOnly Jan 16, 2025
f9311c2
Merge branch 'master' into UTXOnly/openmetrics-target-info-4
UTXOnly Jan 16, 2025
fd82661
Add consume w/ target info method
UTXOnly Jan 22, 2025
5b43847
Merge branch 'UTXOnly/openmetrics-target-info-4' of github.com:DataDo…
UTXOnly Jan 22, 2025
6d09d4c
Add comments
UTXOnly Jan 24, 2025
97d9433
Update datadog_checks_base/tests/base/checks/openmetrics/test_v2/test…
UTXOnly Feb 6, 2025
898e55b
lint test
UTXOnly Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/19397.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for target_info metric w/ shared label cache
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
# (C) Datadog, Inc. 2020-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from collections import ChainMap

from ....utils.functions import no_op


class LabelAggregator:
def __init__(self, check, config):
share_labels = config.get('share_labels', {})
if not isinstance(share_labels, dict):
raise TypeError('Setting `share_labels` must be a mapping')
elif not share_labels:
self.target_info = config.get('target_info', False)
self.target_info_labels = {}

self._validate_type(share_labels, dict, "Setting `share_labels` must be a mapping")
self._validate_type(self.target_info, bool, "Setting `target_info` must be a boolean")

if not share_labels and not self.target_info:
self.populate = no_op
return

self.cache_shared_labels = config.get('cache_shared_labels', True)
self.shared_labels_cached = False

self.info_metric = {'target_info': {}}
self.metric_config = {}
for metric, config in share_labels.items():
data = self.metric_config[metric] = {}
Expand All @@ -31,6 +37,7 @@ def __init__(self, check, config):
raise TypeError(f'Option `values` for metric `{metric}` of setting `share_labels` must be an array')

allowed_values = set()

for i, value in enumerate(values, 1):
value = str(value)

Expand Down Expand Up @@ -72,34 +79,32 @@ def __init__(self, check, config):

self.unconditional_labels = {}

def _validate_type(self, value, expected_type, error_message):
if not isinstance(value, expected_type):
raise TypeError(error_message)

def __call__(self, metrics):
if self.cache_shared_labels:
if self.shared_labels_cached:
yield from metrics
else:
metric_config = self.metric_config.copy()
metric_config, target_info_metric = self.copy_configs()

for metric in metrics:
if metric_config and metric.name in metric_config:
self.collect(metric, metric_config.pop(metric.name))

self.process_metric(metric, metric_config, target_info_metric)
yield metric

self.shared_labels_cached = True
else:
try:
metric_config = self.metric_config.copy()

# Cache every encountered metric until the desired labels have been collected
metric_config, target_info_metric = self.copy_configs()
cached_metrics = []

for metric in metrics:
if metric.name in metric_config:
self.collect(metric, metric_config.pop(metric.name))

self.process_metric(metric, metric_config, target_info_metric)
cached_metrics.append(metric)

if not metric_config:
if not (metric_config or target_info_metric):
break

yield from cached_metrics
Expand All @@ -108,6 +113,24 @@ def __call__(self, metrics):
self.label_sets.clear()
self.unconditional_labels.clear()

def copy_configs(self):
return self.metric_config.copy(), self.info_metric.copy()

def process_metric(self, metric, *configs):
"""
Collects labels from shared_labels + target_info metrics
"""
for config in configs:
if config and metric.name in config:
self.collect(metric, config.pop(metric.name))

def process_target_info(self, metric):
"""
Updates cached target info metrics
"""
if metric.samples[0].labels != self.target_info_labels:
self.target_info_labels = metric.samples[0].labels

def collect(self, metric, config):
allowed_values = config.get('values')

Expand Down Expand Up @@ -147,13 +170,28 @@ def collect(self, metric, config):
if label in labels:
self.unconditional_labels[label] = value
else:
for sample in self.allowed_samples(metric, allowed_values):
for label, value in sample.labels.items():
self.unconditional_labels[label] = value
# Store target_info metric labels to be applied to other metrics in payload
if metric.name == 'target_info':
self.target_info_labels.update(
{
label: value
for sample in self.allowed_samples(metric, allowed_values)
for label, value in sample.labels.items()
}
)
else:
# Store shared labels in a seperate attribute
self.unconditional_labels.update(
{
label: value
for sample in self.allowed_samples(metric, allowed_values)
for label, value in sample.labels.items()
}
)

def populate(self, labels):
label_set = frozenset(labels.items())
labels.update(self.unconditional_labels)
labels.update(ChainMap(self.unconditional_labels, self.target_info_labels))

for matching_label_set, shared_labels in self.label_sets:
# Check for subset without incurring the cost of a `.issubset` lookup and call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, check, config):

# Parse the configuration
self.endpoint = config['openmetrics_endpoint']
self.target_info = config.get('target_info', False)

self.metric_transformer = MetricTransformer(self.check, config)
self.label_aggregator = LabelAggregator(self.check, config)
Expand Down Expand Up @@ -238,7 +239,13 @@ def scrape(self):
"""
runtime_data = {'flush_first_value': self.flush_first_value, 'static_tags': self.static_tags}

for metric in self.consume_metrics(runtime_data):
# Determine which consume method to use based on target_info config
if self.target_info:
consume_method = self.consume_metrics_w_target_info
else:
consume_method = self.consume_metrics

for metric in consume_method(runtime_data):
transformer = self.metric_transformer.get(metric)
if transformer is None:
continue
Expand All @@ -248,23 +255,52 @@ def scrape(self):
self.flush_first_value = True

def consume_metrics(self, runtime_data):
"""
Yield the processed metrics and filter out excluded metrics, without checking for target_info metrics.
"""

metric_parser = self.parse_metrics()

if not self.flush_first_value and self.use_process_start_time:
metric_parser = first_scrape_handler(metric_parser, runtime_data, datadog_agent.get_process_start_time())
if self.label_aggregator.configured:
metric_parser = self.label_aggregator(metric_parser)

for metric in metric_parser:
# Skip excluded metrics
if metric.name in self.exclude_metrics or (
self.exclude_metrics_pattern is not None and self.exclude_metrics_pattern.search(metric.name)
):
self.submit_telemetry_number_of_ignored_metric_samples(metric)
continue

yield metric

def consume_metrics_w_target_info(self, runtime_data):
"""
Yield the processed metrics and filter out excluded metrics.
Additionally, handle target_info metrics.
"""

metric_parser = self.parse_metrics()

if not self.flush_first_value and self.use_process_start_time:
metric_parser = first_scrape_handler(metric_parser, runtime_data, datadog_agent.get_process_start_time())
if self.label_aggregator.configured:
metric_parser = self.label_aggregator(metric_parser)

for metric in metric_parser:
# Skip excluded metrics
if metric.name in self.exclude_metrics or (
self.exclude_metrics_pattern is not None and self.exclude_metrics_pattern.search(metric.name)
):
self.submit_telemetry_number_of_ignored_metric_samples(metric)
continue

# Process target_info metrics
if metric.name == 'target_info':
self.label_aggregator.process_target_info(metric)

yield metric

def parse_metrics(self):
Expand Down Expand Up @@ -312,7 +348,6 @@ def generate_sample_data(self, metric):
"""
Yield a sample of processed data.
"""

label_normalizer = get_label_normalizer(metric.type)

for sample in metric.samples:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@


def test_submission(dd_run_check, datadog_agent):

class TestLogStream(LogStream):
def __init__(self, start, **kwargs):
super().__init__(**kwargs)
Expand Down
Loading
Loading