Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fred/eng 212 implement fast fail support in the prometheus connector #330

Merged
merged 12 commits into from
Sep 3, 2021
5 changes: 4 additions & 1 deletion servo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import httpx
import pydantic

import servo.errors
import servo.types
import servo.utilities

Expand Down Expand Up @@ -95,6 +96,8 @@ def from_error(cls, error: servo.errors.BaseError) -> "Status":
"""Return a status object representation from the given error."""
if isinstance(error, servo.errors.AdjustmentRejectedError):
status = ServoStatuses.rejected
elif isinstance(error, servo.errors.EventAbortedError):
status = ServoStatuses.aborted
elif isinstance(error, servo.errors.EventCancelledError):
status = ServoStatuses.cancelled
else:
Expand Down Expand Up @@ -139,7 +142,7 @@ class CommandResponse(pydantic.BaseModel):
command: Commands = pydantic.Field(alias="cmd")
param: Optional[
Union[MeasureParams, Dict[str, Any]]
] # TODO: Switch to a union of supported types
] # TODO: Switch to a union of supported types, remove isinstance check from ServoRunner.measure when done

class Config:
json_encoders = {
Expand Down
25 changes: 25 additions & 0 deletions servo/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,3 +553,28 @@ class Config(types.BaseModelConfig):
extra = pydantic.Extra.forbid
title = "Abstract Servo Configuration Schema"
env_prefix = "SERVO_"

class FastFailConfiguration(pydantic.BaseSettings):
"""Configuration providing support for fast fail behavior which returns early
from long running connector operations when SLO violations are observed"""

disabled: pydantic.conint(ge=0, le=1, multiple_of=1) = 0
"""Toggle fast-fail behavior on or off"""

period: servo.types.Duration = "60s"
"""How often to check the SLO metrics"""

span: servo.types.Duration = None
"""The span or window of time that SLO metrics are gathered for"""

skip: servo.types.Duration = 0
"""How long to wait before querying SLO metrics for potential violations"""

class Config:
extra = pydantic.Extra.forbid

@pydantic.validator('span', pre=True, always=True)
def span_defaults_to_period(cls, v, *, values, **kwargs):
if v is None:
return values['period']
return v
42 changes: 39 additions & 3 deletions servo/connectors/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import pytz

import servo
import servo.configuration
import servo.fast_fail

DEFAULT_BASE_URL = "http://prometheus:9090"
API_PATH = "/api/v1"
Expand Down Expand Up @@ -676,6 +678,9 @@ class PrometheusConfiguration(servo.BaseConfiguration):
scraped by the Prometheus instance being queried.
"""

fast_fail: servo.configuration.FastFailConfiguration = pydantic.Field(default_factory=servo.configuration.FastFailConfiguration)
"""Configuration sub section for fast fail behavior. Defines toggle and timing of SLO observation"""

@classmethod
def generate(cls, **kwargs) -> "PrometheusConfiguration":
"""Generate a default configuration for capturing measurements from the
Expand Down Expand Up @@ -876,7 +881,31 @@ async def measure(

# Allow the maximum settlement time of eager metrics to elapse before eager return (None runs full duration)
progress = servo.EventProgress(timeout=measurement_duration, settlement=eager_settlement)
await progress.watch(eager_observer.observe)

# Handle fast fail metrics
if self.config.fast_fail.disabled == 0 and control.userdata and control.userdata.slo:
self.logger.info(
"Fast Fail enabled, the following SLO Conditions will be monitored during measurement: "
f"{', '.join(map(str, control.userdata.slo.conditions))}"
)
fast_fail_observer = servo.fast_fail.FastFailObserver(
config=self.config.fast_fail,
input=control.userdata.slo,
metrics_getter=functools.partial(self._query_slo_metrics, metrics=metrics__)
)
fast_fail_progress = servo.EventProgress(timeout=measurement_duration, settlement=eager_settlement)
gather_tasks = [
asyncio.create_task(progress.watch(eager_observer.observe)),
asyncio.create_task(fast_fail_progress.watch(fast_fail_observer.observe, every=self.config.fast_fail.period))
]
try:
await asyncio.gather(*gather_tasks)
except:
[task.cancel() for task in gather_tasks]
await asyncio.gather(*gather_tasks, return_exceptions=True)
raise
else:
await progress.watch(eager_observer.observe)

# Capture the measurements
self.logger.info(f"Querying Prometheus for {len(metrics__)} metrics...")
Expand All @@ -899,7 +928,7 @@ async def _query_prometheus(
self, metric: PrometheusMetric, start: datetime, end: datetime
) -> List[servo.TimeSeries]:
client = Client(base_url=self.config.base_url)
response = await client.query_range(metric, start, end)
response: MetricResponse = await client.query_range(metric, start, end)
self.logger.trace(f"Got response data type {response.__class__} for metric {metric}: {response}")
response.raise_for_error()

Expand All @@ -924,6 +953,13 @@ async def _query_prometheus(

return []

async def _query_slo_metrics(self, start: datetime, end: datetime, metrics: List[PrometheusMetric]) -> Dict[str, List[servo.TimeSeries]]:
"""Query prometheus for the provided metrics and return mapping of metric names to their corresponding readings"""
readings = await asyncio.gather(
*list(map(lambda m: self._query_prometheus(m, start, end), metrics))
)
return dict(map(lambda tup: (tup[0].name, tup[1]), zip(metrics, readings)))

app = servo.cli.ConnectorCLI(PrometheusConnector, help="Metrics from Prometheus")

@app.command()
Expand Down Expand Up @@ -1006,7 +1042,7 @@ async def observe(self, progress: servo.EventProgress) -> None:
active_data_point = self.data_points.get(metric)
readings = await self._query_prometheus(metric)
if readings:
data_point = readings[0][-1]
data_point: servo.DataPoint = readings[0][-1]
servo.logger.trace(f"Prometheus returned reading for the `{metric.name}` metric: {data_point}")
if data_point.value > 0:
if active_data_point is None:
Expand Down
8 changes: 8 additions & 0 deletions servo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"AdjustmentFailedError",
"AdjustmentRejectedError",
"UnexpectedEventError",
"EventAbortedError",
)

class BaseError(RuntimeError):
Expand Down Expand Up @@ -112,3 +113,10 @@ class AdjustmentRejectedError(AdjustmentFailedError):
other such definitive error condition is encountered that excludes the
applied configuration from further consideration by the optimizer.
"""

class EventAbortedError(EventError):
"""Abort the currently running event

During long running measurements (and, optionally, adjustments) it is often
neccessary to complete the operation early eg. if there are sustained SLO violations.
"""
158 changes: 158 additions & 0 deletions servo/fast_fail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import collections
import decimal
import enum
import datetime
import operator
import pydantic
import statistics
from typing import Awaitable, Callable, Dict, List, Optional

import devtools

import servo
import servo.errors
import servo.configuration
import servo.types

SLO_FAILED_REASON = "slo-violation"

class SloOutcomeStatus(str, enum.Enum):
passed = "passed"
failed = "failed"
missing_metric = "missing_metric"
missing_threshold = "missing_threshold"

class SloOutcome(pydantic.BaseModel):
status: SloOutcomeStatus
metric_readings: Optional[List[servo.types.Reading]]
threshold_readings: Optional[List[servo.types.Reading]]
metric_value: Optional[decimal.Decimal]
threshold_value: Optional[decimal.Decimal]
checked_at: datetime.datetime

def to_message(self, condition: servo.types.SloCondition):
if self.status == SloOutcomeStatus.missing_metric:
message = f"Metric {condition.metric} was not found in readings"
elif self.status == SloOutcomeStatus.missing_threshold:
message = f"Threshold metric {condition.threshold_metric} was not found in readings"
elif self.status == SloOutcomeStatus.passed:
message = "SLO passed"
elif self.status == SloOutcomeStatus.failed:
message = f"SLO failed metric value {self.metric_value} was not {condition.keep} threshold value {self.threshold_value}"
else:
message = f"Uncrecognized outcome status {self.status}"
return f"{self.checked_at} {message}"


class FastFailObserver(pydantic.BaseModel):
config: servo.configuration.FastFailConfiguration
input: servo.types.SloInput
metrics_getter: Callable[[datetime.datetime, datetime.datetime], Awaitable[Dict[str, List[servo.types.Reading]]]]

_results: Dict[servo.types.SloCondition, List[SloOutcome]] = pydantic.PrivateAttr(default=collections.defaultdict(list))

async def observe(self, progress: servo.EventProgress) -> None:
if progress.elapsed < self.config.skip:
return

checked_at = datetime.datetime.now()
metrics = await self.metrics_getter(checked_at - self.config.span, checked_at)
self.check_readings(metrics=metrics, checked_at=checked_at)

def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_at: datetime.datetime) -> None:
failures: Dict[servo.types.SloCondition, List[SloOutcome]] = {}
for condition in self.input.conditions:
result_args = dict(checked_at=checked_at)
# Evaluate target metric
metric_readings = metrics.get(condition.metric)
if not metric_readings:
self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.missing_metric))
continue

metric_value = _get_scalar_from_readings(metric_readings)
result_args.update(metric_value=metric_value, metric_readings=metric_readings)

# Evaluate threshold
threshold_readings = None
if condition.threshold is not None:
threshold_value = condition.threshold * condition.threshold_multiplier
elif condition.threshold_metric is not None:
threshold_readings = metrics.get(condition.threshold_metric)
if not threshold_readings:
self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.missing_threshold))
continue

threshold_scalar = _get_scalar_from_readings(threshold_readings)
threshold_value = threshold_scalar * condition.threshold_multiplier

result_args.update(threshold_value=threshold_value, threshold_readings=threshold_readings)
# Check target against threshold
check_passed_op = _get_keep_operator(condition.keep)
if check_passed_op(metric_value, threshold_value):
self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.passed))
else:
self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.failed))

# Update window by slicing last n items from list where n is trigger_window
self._results[condition] = self._results[condition][-condition.trigger_window:]

if len(list(filter(lambda res: res.status == SloOutcomeStatus.failed, self._results[condition]))) >= condition.trigger_count:
failures[condition] = self._results[condition]

servo.logger.debug(f"SLO results: {devtools.pformat(self._results)}")

# Log the latest results
last_results_buckets: Dict[SloOutcomeStatus, List[str]] = collections.defaultdict(list)
for condition, results_list in self._results.items():
last_result = results_list[-1]
last_results_buckets[last_result.status].append(str(condition))

last_results_messages: List[str] = []
for status, condition_str_list in last_results_buckets.items():
last_results_messages.append(f"x{len(condition_str_list)} {status} [{', '.join(condition_str_list)}]")

servo.logger.info(f"SLO statuses from last check: {', '.join(last_results_messages)}")

if failures:
raise servo.errors.EventAbortedError(
f"SLO violation(s) observed: {_get_results_str(failures)}",
reason=SLO_FAILED_REASON
)


# Helper methods
def _get_keep_operator(keep: servo.types.SloKeep):
if keep == servo.types.SloKeep.below:
return operator.le
elif keep == servo.types.SloKeep.above:
return operator.ge
else:
raise ValueError(f"Unknown SloKeep type {keep}")

def _get_scalar_from_readings(metric_readings: List[servo.types.Reading]) -> decimal.Decimal:
instance_values = []
for reading in metric_readings:
# TODO: NewRelic APM returns 0 for missing metrics. Will need optional config to ignore 0 values
# when implementing fast fail for eventual servox newrelic connector
if isinstance(reading, servo.types.DataPoint):
instance_values.append(decimal.Decimal(reading.value))
elif isinstance(reading, servo.types.TimeSeries):
timeseries_values = list(map(lambda dp: decimal.Decimal(dp.value), reading.data_points))
if len(timeseries_values) > 1:
instance_values.append(statistics.mean(timeseries_values))
else:
instance_values.append(timeseries_values[0])
else:
raise ValueError(f"Unknown metric reading type {type(reading)}")

if len(instance_values) > 1:
return statistics.mean(instance_values)
else:
return decimal.Decimal(instance_values[0])

def _get_results_str(results: Dict[servo.types.SloCondition, List[SloOutcome]]) -> str:
fmt_outcomes = []
for condition, outcome_list in results.items():
outcome_str_list = list(map(lambda outcome: outcome.to_message(condition), outcome_list))
fmt_outcomes.append(f"{condition}[{', '.join(outcome_str_list)}]")
return ", ".join(fmt_outcomes)
3 changes: 3 additions & 0 deletions servo/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ async def describe(self, control: Control) -> Description:
return aggregate_description

async def measure(self, param: servo.MeasureParams) -> Measurement:
if isinstance(param, dict):
# required parsing has failed in api.Mixin._post_event(), run parse_obj to surface the validation errors
servo.api.MeasureParams.parse_obj(param)
servo.logger.info(f"Measuring... [metrics={', '.join(param.metrics)}]")
servo.logger.trace(devtools.pformat(param))

Expand Down
Loading