From 9a7564b5331b4b355cc62020e9b7624344538abd Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Mon, 30 Aug 2021 18:22:58 -0500 Subject: [PATCH 01/11] Firs pass on fast fail in prometheus connector - Update api Status.from_error to handle aborted status - Define configuration for fast-fail functionality - Define EventAbortedError error to support aborted status - Implement generic fast-fail observer for use in connectors - Define models for SLO checking and add to userdata parsing - Implement fast-fail support in the prometheus connector - Add unit tests for FastFailObserver - Add integration test for fast fail under prometheus connector --- servo/api.py | 3 + servo/configuration.py | 25 +++ servo/connectors/prometheus.py | 38 +++- servo/errors.py | 8 + servo/fast_fail.py | 147 ++++++++++++++ servo/types.py | 92 ++++++++- tests/connectors/prometheus_test.py | 224 ++++++++++++++++++++++ tests/fast_fail_test.py | 284 ++++++++++++++++++++++++++++ 8 files changed, 816 insertions(+), 5 deletions(-) create mode 100644 servo/fast_fail.py create mode 100644 tests/fast_fail_test.py diff --git a/servo/api.py b/servo/api.py index 2864c1f4e..9df205a8f 100644 --- a/servo/api.py +++ b/servo/api.py @@ -12,6 +12,7 @@ import httpx import pydantic +import servo.errors import servo.types import servo.utilities @@ -94,6 +95,8 @@ def from_error(cls, error: servo.errors.BaseError) -> "Status": """Return a status object representation from the given error.""" if isinstance(error, servo.errors.AdjustmentRejectedError): status = ServoStatuses.rejected + elif isinstance(error, servo.errors.EventAbortedError): + status = ServoStatuses.aborted else: status = ServoStatuses.failed diff --git a/servo/configuration.py b/servo/configuration.py index e5a2894fd..806a8a8dc 100644 --- a/servo/configuration.py +++ b/servo/configuration.py @@ -553,3 +553,28 @@ class Config(types.BaseModelConfig): extra = pydantic.Extra.forbid title = "Abstract Servo Configuration Schema" env_prefix = "SERVO_" + +class FastFailConfiguration(pydantic.BaseSettings): + """Configuration providing support for fast fail behavior which returns early + from long running connector operations when SLO violations are observed""" + + disabled: pydantic.conint(ge=0, le=1, multiple_of=1) = 0 + """Toggle fast-fail behavior on or off""" + + period: servo.types.Duration = "60s" + """How often to check the SLO metrics""" + + span: servo.types.Duration = None + """The span or window of time that SLO metrics are gathered for""" + + skip: servo.types.Duration = 0 + """How long to wait before querying SLO metrics for potential violations""" + + class Config: + extra = pydantic.Extra.forbid + + @pydantic.validator('span', pre=True, always=True) + def span_defaults_to_period(cls, v, *, values, **kwargs): + if v is None: + return values['period'] + return v diff --git a/servo/connectors/prometheus.py b/servo/connectors/prometheus.py index ee6602146..cfed13b75 100644 --- a/servo/connectors/prometheus.py +++ b/servo/connectors/prometheus.py @@ -14,6 +14,8 @@ import pytz import servo +import servo.configuration +import servo.fast_fail DEFAULT_BASE_URL = "http://prometheus:9090" API_PATH = "/api/v1" @@ -676,6 +678,9 @@ class PrometheusConfiguration(servo.BaseConfiguration): scraped by the Prometheus instance being queried. """ + fast_fail: servo.configuration.FastFailConfiguration = pydantic.Field(default_factory=servo.configuration.FastFailConfiguration) + """Configuration sub section for fast fail behavior. Defines toggle and timing of SLO observation""" + @classmethod def generate(cls, **kwargs) -> "PrometheusConfiguration": """Generate a default configuration for capturing measurements from the @@ -876,7 +881,27 @@ async def measure( # Allow the maximum settlement time of eager metrics to elapse before eager return (None runs full duration) progress = servo.EventProgress(timeout=measurement_duration, settlement=eager_settlement) - await progress.watch(eager_observer.observe) + + # Handle fast fail metrics + if self.config.fast_fail.disabled == 0 and control.userdata and control.userdata.slo: + fast_fail_observer = servo.fast_fail.FastFailObserver( + config=self.config.fast_fail, + input=control.userdata.slo, + metrics_getter=functools.partial(self._query_slo_metrics, metrics=metrics__) + ) + fast_fail_progress = servo.EventProgress(timeout=measurement_duration, settlement=eager_settlement) + gather_tasks = [ + asyncio.create_task(progress.watch(eager_observer.observe)), + asyncio.create_task(fast_fail_progress.watch(fast_fail_observer.observe, every=self.config.fast_fail.period)) + ] + try: + await asyncio.gather(*gather_tasks) + except: + [task.cancel() for task in gather_tasks] + await asyncio.gather(*gather_tasks, return_exceptions=True) + raise + else: + await progress.watch(eager_observer.observe) # Capture the measurements self.logger.info(f"Querying Prometheus for {len(metrics__)} metrics...") @@ -899,7 +924,7 @@ async def _query_prometheus( self, metric: PrometheusMetric, start: datetime, end: datetime ) -> List[servo.TimeSeries]: client = Client(base_url=self.config.base_url) - response = await client.query_range(metric, start, end) + response: MetricResponse = await client.query_range(metric, start, end) self.logger.trace(f"Got response data type {response.__class__} for metric {metric}: {response}") response.raise_for_error() @@ -924,6 +949,13 @@ async def _query_prometheus( return [] + async def _query_slo_metrics(self, start: datetime, end: datetime, metrics: List[PrometheusMetric]) -> Dict[str, List[servo.TimeSeries]]: + """Query prometheus for the provided metrics and return mapping of metric names to their corresponding readings""" + readings = await asyncio.gather( + *list(map(lambda m: self._query_prometheus(m, start, end), metrics)) + ) + return dict(map(lambda tup: (tup[0].name, tup[1]), zip(metrics, readings))) + app = servo.cli.ConnectorCLI(PrometheusConnector, help="Metrics from Prometheus") @app.command() @@ -1006,7 +1038,7 @@ async def observe(self, progress: servo.EventProgress) -> None: active_data_point = self.data_points.get(metric) readings = await self._query_prometheus(metric) if readings: - data_point = readings[0][-1] + data_point: servo.DataPoint = readings[0][-1] servo.logger.trace(f"Prometheus returned reading for the `{metric.name}` metric: {data_point}") if data_point.value > 0: if active_data_point is None: diff --git a/servo/errors.py b/servo/errors.py index 220bb4259..9d8b8a8a7 100644 --- a/servo/errors.py +++ b/servo/errors.py @@ -12,6 +12,7 @@ "AdjustmentFailedError", "AdjustmentRejectedError", "UnexpectedEventError", + "EventAbortedError", ) class BaseError(RuntimeError): @@ -112,3 +113,10 @@ class AdjustmentRejectedError(AdjustmentFailedError): other such definitive error condition is encountered that excludes the applied configuration from further consideration by the optimizer. """ + +class EventAbortedError(EventError): + """Abort the currently running event + + During long running measurements (and, optionally, adjustments) it is often + neccessary to complete the operation early eg. if there are sustained SLO violations. + """ diff --git a/servo/fast_fail.py b/servo/fast_fail.py new file mode 100644 index 000000000..eed277f94 --- /dev/null +++ b/servo/fast_fail.py @@ -0,0 +1,147 @@ +import collections +import decimal +import enum +import datetime +import operator +import pydantic +import statistics +from typing import Awaitable, Callable, Dict, List, MutableMapping, Optional + +import devtools + +import servo +import servo.errors +import servo.configuration +import servo.types + +SLO_FAILED_REASON = "slo-violation" + +class SloOutcomeStatus(str, enum.Enum): + passed = "passed" + failed = "failed" + missing_metric = "missing_metric" + missing_threshold = "missing_threshold" + +class SloOutcome(pydantic.BaseModel): + status: SloOutcomeStatus + metric_readings: Optional[List[servo.types.Reading]] + threshold_readings: Optional[List[servo.types.Reading]] + metric_value: Optional[decimal.Decimal] + threshold_value: Optional[decimal.Decimal] + checked_at: datetime.datetime + + def to_message(self, condition: servo.types.SloCondition): + if self.status == SloOutcomeStatus.missing_metric: + message = f"Metric {condition.metric} was not found in readings" + elif self.status == SloOutcomeStatus.missing_threshold: + message = f"Threshold metric {condition.threshold_metric} was not found in readings" + elif self.status == SloOutcomeStatus.passed: + message = "SLO passed" + elif self.status == SloOutcomeStatus.failed: + message = f"SLO failed metric value {self.metric_value} was not {condition.keep} threshold value {self.threshold_value}" + else: + message = f"Uncrecognized outcome status {self.status}" + return f"{self.checked_at} {message}" + + +class FastFailObserver(pydantic.BaseModel): + config: servo.configuration.FastFailConfiguration + input: servo.types.SloInput + metrics_getter: Callable[[datetime.datetime, datetime.datetime], Awaitable[Dict[str, List[servo.types.Reading]]]] + + _results: Dict[servo.types.SloCondition, List[SloOutcome]] = pydantic.PrivateAttr(default=collections.defaultdict(list)) + + async def observe(self, progress: servo.EventProgress) -> None: + if progress.elapsed < self.config.skip: + return + + checked_at = datetime.datetime.now() + metrics = await self.metrics_getter(checked_at - self.config.span, checked_at) + self.check_readings(metrics=metrics, checked_at=checked_at) + + def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_at: datetime.datetime) -> None: + failures: Dict[servo.types.SloCondition, List[SloOutcome]] = {} + for condition in self.input.conditions: + result_args = dict(checked_at=checked_at) + # Evaluate target metric + metric_readings = metrics.get(condition.metric) + if not metric_readings: + self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.missing_metric)) + continue + + metric_value = _get_scalar_from_readings(metric_readings) + result_args.update(metric_value=metric_value, metric_readings=metric_readings) + + # Evaluate threshold + threshold_readings = None + if condition.threshold is not None: + threshold_value = condition.threshold * condition.threshold_multiplier + elif condition.threshold_metric is not None: + threshold_readings = metrics.get(condition.threshold_metric) + if not threshold_readings: + self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.missing_threshold)) + continue + + threshold_scalar = _get_scalar_from_readings(threshold_readings) + threshold_value = threshold_scalar * condition.threshold_multiplier + + result_args.update(threshold_value=threshold_value, threshold_readings=threshold_readings) + # Check target against threshold + check_passed_op = _get_keep_operator(condition.keep) + if check_passed_op(metric_value, threshold_value): + self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.passed)) + else: + self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.failed)) + + # Update window by slicing last n items from list where n is trigger_window + self._results[condition] = self._results[condition][-condition.trigger_window:] + + if len(list(filter(lambda res: res.status == SloOutcomeStatus.failed, self._results[condition]))) >= condition.trigger_count: + failures[condition] = self._results[condition] + + servo.logger.debug(f"SLO results: {devtools.pformat(self._results)}") + + if failures: + raise servo.errors.EventAbortedError( + f"SLO violation(s) observed: {_get_results_str(failures)}", + reason=SLO_FAILED_REASON + ) + + +# Helper methods +def _get_keep_operator(keep: servo.types.SloKeep): + if keep == servo.types.SloKeep.below: + return operator.lt + elif keep == servo.types.SloKeep.above: + return operator.gt + else: + raise ValueError(f"Unknown SloKeep type {keep}") + +def _get_scalar_from_readings(metric_readings: List[servo.types.Reading]) -> decimal.Decimal: + instance_values = [] + for reading in metric_readings: + # TODO: NewRelic APM returns 0 for missing metrics. Will need optional config to ignore 0 values + # when implementing fast fail for eventual servox newrelic connector + if isinstance(reading, servo.types.DataPoint): + instance_values.append(decimal.Decimal(reading.value)) + elif isinstance(reading, servo.types.TimeSeries): + timeseries_values = list(map(lambda dp: decimal.Decimal(dp.value), reading.data_points)) + if len(timeseries_values) > 1: + instance_values.append(statistics.mean(timeseries_values)) + else: + instance_values.append(timeseries_values[0]) + else: + raise ValueError(f"Unknown metric reading type {type(reading)}") + + if len(instance_values) > 1: + return statistics.mean(instance_values) + else: + return decimal.Decimal(instance_values[0]) + +def _get_results_str(results: Dict[servo.types.SloCondition, List[SloOutcome]]) -> str: + fmt_outcomes = [] + for condition, outcome_list in results.items(): + outcome_str_list = list(map(lambda outcome: outcome.to_message(condition), outcome_list)) + fmt_outcomes.append(f"{condition}[{', '.join(outcome_str_list)}]") + return ", ".join(fmt_outcomes) + diff --git a/servo/types.py b/servo/types.py index d4b003146..d3a5f6e3b 100644 --- a/servo/types.py +++ b/servo/types.py @@ -5,6 +5,7 @@ import abc import asyncio +import collections import datetime import decimal import enum @@ -19,6 +20,7 @@ Callable, Dict, List, + Mapping, Optional, Protocol, Tuple, @@ -1215,6 +1217,92 @@ def __opsani_repr__(self) -> dict: return {self.name: settings_dict} +class SloKeep(str, enum.Enum): + above = "above" + below = "below" + +class SloCondition(BaseModel): + description: Optional[str] = None + metric: str + threshold_multiplier: decimal.Decimal = decimal.Decimal(1) + keep: SloKeep = SloKeep.below + trigger_count: pydantic.conint(ge=1, multiple_of=1) = 1 + trigger_window: pydantic.conint(ge=1, multiple_of=1) = None + threshold: Optional[decimal.Decimal] + threshold_metric: Optional[str] + + @pydantic.root_validator + @classmethod + def _check_threshold_values(cls, values): + if values.get('threshold') is not None and values.get('threshold_metric') is not None: + raise ValueError("SLO Condition cannot specify both threshold and threshold_metric") + + if values.get('threshold') is None and values.get('threshold_metric') is None: + raise ValueError("SLO Condition must specify either threshold or threshold_metric") + + return values + + @pydantic.validator('trigger_window', pre=True, always=True) + @classmethod + def _trigger_window_defaults_to_trigger_count(cls, v, *, values, **kwargs): + if v is None: + return values['trigger_count'] + return v + + @pydantic.root_validator(skip_on_failure=True) + @classmethod + def _trigger_count_cannot_be_greater_than_window(cls, values) -> Numeric: + trigger_window, trigger_count = values["trigger_window"], values["trigger_count"] + if trigger_count > trigger_window: + raise ValueError(f"trigger_count cannot be greater than trigger_window ({trigger_count} > {trigger_window})") + + return values + + def __str__(self) -> str: + ret_str = f"{self.metric} {self.keep}" + + if self.threshold: + ret_str = f"{ret_str} {self.threshold}" + elif self.threshold_metric: + ret_str = f"{ret_str} {self.threshold_metric}" + + if self.description is None: + return f"({ret_str})" + else: + return f"({ret_str} -> {self.description})" + + def __hash__(self) -> int: + return hash(str(self)) + + class Config(BaseModel.Config): + extra = pydantic.Extra.forbid + +class SloInput(BaseModel): + conditions: List[SloCondition] + + @pydantic.validator('conditions') + def _conditions_are_unique(cls, value: List[SloCondition]): + condition_counts = collections.defaultdict(int) + for cond in value: + condition_counts[str(cond)] += 1 + + non_unique = list(filter(lambda item: item[1] > 1, condition_counts.items())) + if non_unique: + raise ValueError( + f"Slo conditions must be unique. Redundant conditions found: {', '.join(map(lambda nu: nu[0] , non_unique))}" + ) + return value + + class Config(BaseModel.Config): + extra = pydantic.Extra.forbid + +class UserData(BaseModel): + slo: Optional[SloInput] = None + + class Config(BaseModel.Config): + # Support connector level experimentation without needing to update core servox code + extra = pydantic.Extra.allow + class Control(BaseModel): """Control objects model parameters returned by the optimizer that govern aspects of the operation to be performed. @@ -1248,9 +1336,9 @@ class files into memory, just-in-time compilation being appliied to critical profile. """ - userdata: Optional[Dict[str, Any]] = None + userdata: Optional[UserData] = None """An optional dictionary of supplemental metadata with no platform defined - semantics. + semantics for most keys. """ environment: Optional[Dict[str, Any]] = None diff --git a/tests/connectors/prometheus_test.py b/tests/connectors/prometheus_test.py index 8b831a410..0d5b051f2 100644 --- a/tests/connectors/prometheus_test.py +++ b/tests/connectors/prometheus_test.py @@ -12,7 +12,9 @@ import respx import typer +import servo.connectors.kubernetes import servo.connectors.prometheus +import servo.errors import servo.utilities from servo.connectors.prometheus import ( Client, @@ -574,6 +576,228 @@ async def burst_traffic() -> None: assert time_series[0].value == 0.0 assert time_series[-1].value == 0.0 + @pytest.fixture + def tuning_config(self, kube) -> servo.connectors.kubernetes.KubernetesConfiguration: + return servo.connectors.kubernetes.KubernetesConfiguration( + namespace=kube.namespace, + timeout="60s", + deployments=[ + servo.connectors.kubernetes.DeploymentConfiguration( + name="fiber-http", + strategy = "canary", + replicas=Replicas( + min=0, + max=1, + ), + containers=[ + servo.connectors.kubernetes.ContainerConfiguration( + name="fiber-http", + cpu=servo.connectors.kubernetes.CPU(min="125m", max="875m", step="125m", request="250m", limit="250m"), + memory=servo.connectors.kubernetes.Memory(min="128MiB", max="0.75GiB", step="32MiB", request="256MiB", limit="256MiB"), + ) + ], + ) + ], + ) + + @pytest.mark.applymanifests( + "../manifests", + files=[ + "fiber-http-opsani-dev.yaml", + ], + ) + async def test_fast_fail_passes( + self, + optimizer: servo.Optimizer, + kube_port_forward: Callable[[str, int], AsyncIterator[str]], + tuning_config: servo.connectors.kubernetes.KubernetesConfiguration + ) -> None: + # NOTE: TODO add notes + servo.logging.set_level("DEBUG") + # Create a tuning instance + canary_opt = await servo.connectors.kubernetes.CanaryOptimization.create( + deployment_or_rollout_config=tuning_config.deployments[0], timeout=tuning_config.timeout + ) + await canary_opt.create_tuning_pod() + + async with kube_port_forward("deploy/prometheus", 9090) as prometheus_url: + # Ugly workaround for lack of load balancing on port forward + async with kube_port_forward("deploy/fiber-http", 9980) as main_fiber_url: + async with kube_port_forward("pod/fiber-http-tuning", 9980) as tuning_fiber_url: + config = PrometheusConfiguration.generate( + base_url=prometheus_url, + metrics=[ + servo.connectors.prometheus.PrometheusMetric( + "main_p50_latency", + servo.types.Unit.milliseconds, + query='avg(histogram_quantile(0.5,rate(envoy_cluster_upstream_rq_time_bucket{opsani_role!="tuning"}[15s])))', + absent="fail", + step="5s", + ), + servo.connectors.prometheus.PrometheusMetric( + "tuning_p50_latency", + servo.types.Unit.milliseconds, + query='avg(histogram_quantile(0.5,rate(envoy_cluster_upstream_rq_time_bucket{opsani_role="tuning"}[15s])))', + absent="fail", + step="5s", + ), + ], + ) + + # TODO: Replace this with the load tester fixture + sending_traffic = True + async def send_traffic() -> None: + # Ugly workaround for lack of load balancing on port forward + async with httpx.AsyncClient(base_url=main_fiber_url) as main_client: + async with httpx.AsyncClient(base_url=tuning_fiber_url) as tuning_client: + servo.logger.info(f"Sending traffic to {main_fiber_url} and {tuning_fiber_url}...") + count = 0 + try: + while sending_traffic: + response = await main_client.get("/") + response.raise_for_status() + response = await tuning_client.get("/") + response.raise_for_status() + count += 1 + finally: + servo.logger.success(f"Sent {count} requests to {main_fiber_url} and {tuning_fiber_url}.") + + config.fast_fail.period = Duration("20s") + connector = PrometheusConnector(config=config, optimizer=optimizer) + control = Control( + duration="10s", + warmup="10s", + userdata=UserData(slo=SloInput(conditions=[ + SloCondition( + metric="tuning_p50_latency", + threshold=0.3, + ), + SloCondition( + metric="tuning_p50_latency", + threshold_metric="main_p50_latency", + ) + ])) + ) + # Send traffic in the background + traffic_task = asyncio.create_task(send_traffic()) + + try: + measurement = await asyncio.wait_for( + connector.measure(control=control), + timeout=90 + ) + assert measurement + finally: + sending_traffic = False + await traffic_task + + @pytest.mark.applymanifests( + "../manifests", + files=[ + "fiber-http-opsani-dev.yaml", + ], + ) + async def test_fast_fail_fails( + self, + optimizer: servo.Optimizer, + kube_port_forward: Callable[[str, int], AsyncIterator[str]], + tuning_config: servo.connectors.kubernetes.KubernetesConfiguration + ) -> None: + # NOTE: TODO add notes + servo.logging.set_level("DEBUG") + # Create a tuning instance + canary_opt = await servo.connectors.kubernetes.CanaryOptimization.create( + deployment_or_rollout_config=tuning_config.deployments[0], timeout=tuning_config.timeout + ) + await canary_opt.create_tuning_pod() + + async with kube_port_forward("deploy/prometheus", 9090) as prometheus_url: + # Ugly workaround for lack of load balancing on port forward + async with kube_port_forward("deploy/fiber-http", 9980) as main_fiber_url: + async with kube_port_forward("pod/fiber-http-tuning", 9980) as tuning_fiber_url: + config = PrometheusConfiguration.generate( + base_url=prometheus_url, + metrics=[ + servo.connectors.prometheus.PrometheusMetric( + "main_p50_latency", + servo.types.Unit.milliseconds, + query='avg(histogram_quantile(0.5,rate(envoy_cluster_upstream_rq_time_bucket{opsani_role!="tuning"}[15s])))', + absent="fail", + step="5s", + ), + servo.connectors.prometheus.PrometheusMetric( + "tuning_p50_latency", + servo.types.Unit.milliseconds, + query='avg(histogram_quantile(0.5,rate(envoy_cluster_upstream_rq_time_bucket{opsani_role="tuning"}[15s])))', + absent="fail", + step="5s", + ), + ], + ) + + # TODO: Replace this with the load tester fixture + sending_traffic = True + async def send_traffic() -> None: + # Ugly workaround for lack of load balancing on port forward + async with httpx.AsyncClient(base_url=main_fiber_url) as main_client: + async with httpx.AsyncClient(base_url=tuning_fiber_url) as tuning_client: + servo.logger.info(f"Sending traffic to {main_fiber_url} and {tuning_fiber_url}...") + count = 0 + try: + while sending_traffic: + response = await main_client.get("/") + response.raise_for_status() + response = await tuning_client.get("/") + response.raise_for_status() + count += 1 + finally: + servo.logger.success(f"Sent {count} requests to {main_fiber_url} and {tuning_fiber_url}.") + + config.fast_fail.period = Duration("10s") + connector = PrometheusConnector(config=config, optimizer=optimizer) + control = Control( + duration="10s", + warmup="10s", + userdata=UserData(slo=SloInput(conditions=[ + SloCondition( + metric="tuning_p50_latency", + threshold=0.2, + trigger_count=2, + trigger_window=2, + ), + SloCondition( + metric="tuning_p50_latency", + threshold_metric="main_p50_latency", + threshold_multiplier=0.9, + trigger_count=2, + trigger_window=2, + ) + ])) + ) + # Send traffic in the background + traffic_task = asyncio.create_task(send_traffic()) + + date_matcher = r"[\s0-9-:.]*" + float_matcher = r"[0-9.]*" + error_text = (re.escape("SLO violation(s) observed: (tuning_p50_latency below 0.2)[") + date_matcher + + "SLO failed metric value " + float_matcher + re.escape(" was not below threshold value 0.2, ") + date_matcher + + "SLO failed metric value " + float_matcher + re.escape(" was not below threshold value 0.2], ") + + re.escape("(tuning_p50_latency below main_p50_latency)[") + date_matcher +" SLO failed metric value " + + float_matcher + " was not below threshold value " + float_matcher + ", " + date_matcher + + " SLO failed metric value " + float_matcher + " was not below threshold value " + float_matcher + + re.escape("]")) + + try: + with pytest.raises(servo.errors.EventAbortedError, match=error_text): + measurement = await asyncio.wait_for( + connector.measure(control=control), + timeout=90 + ) + debug(measurement) + finally: + sending_traffic = False + await traffic_task + def empty_targets_response() -> Dict[str, Any]: return json.load("{'status': 'success', 'data': {'activeTargets': [], 'droppedTargets': []}}") diff --git a/tests/fast_fail_test.py b/tests/fast_fail_test.py new file mode 100644 index 000000000..09e9a2914 --- /dev/null +++ b/tests/fast_fail_test.py @@ -0,0 +1,284 @@ +from datetime import datetime +import freezegun +import pydantic +import pytest +from typing import Callable, Dict, List + +import servo +import servo.configuration +from servo.fast_fail import FastFailObserver +from servo.types import DataPoint, Metric, Reading, SloCondition, SloInput, SloKeep, TimeSeries, Unit + +def test_non_unique_conditions() -> None: + conditions = [ + SloCondition( + metric="same", + threshold=6000, + ), + SloCondition( + metric="same", + threshold=6000, + ), + SloCondition( + metric="same2", + threshold_metric="same3", + ), + SloCondition( + metric="same2", + threshold_metric="same3", + ), + SloCondition( + metric="not_same", + threshold=6000, + ), + SloCondition( + metric="not_same", + keep=SloKeep.above, + threshold=6000, + ) + ] + with pytest.raises(pydantic.ValidationError) as err_info: + SloInput(conditions=conditions) + + assert str(err_info.value) == ("1 validation error for SloInput\n" + "conditions\n" + " Slo conditions must be unique. Redundant conditions found: (same below 6000), (same2 below same3) (type=value_error)") + +def test_trigget_count_greter_than_window() -> None: + with pytest.raises(pydantic.ValidationError) as err_info: + SloCondition( + metric="test", + threshold=1, + trigger_count=2, + trigger_window=1, + ) + assert str(err_info.value) == ("1 validation error for SloCondition\n" + "__root__\n" + " trigger_count cannot be greater than trigger_window (2 > 1) (type=value_error)") + +@pytest.fixture +def metric() -> Metric: + return Metric("throughput", Unit.requests_per_minute) + +@pytest.fixture +def tuning_metric() -> Metric: + return Metric("tuning_throughput", Unit.requests_per_minute) + +@pytest.fixture +def config() -> servo.configuration.FastFailConfiguration: + return servo.configuration.FastFailConfiguration() + +@pytest.fixture +def slo_input(metric: Metric, tuning_metric: Metric) -> SloInput: + return SloInput(conditions=[ + SloCondition( + metric=metric.name, + threshold=6000, + trigger_window=2 + ), + SloCondition( + metric=metric.name, + threshold_metric=tuning_metric.name, + trigger_window=2 + ) + ]) + +@pytest.fixture +async def metrics_getter(): + async def test_metrics_getter(start, end) -> Dict[str, List[Reading]]: + return [] + + return test_metrics_getter + +@pytest.fixture +def observer(config, slo_input, metrics_getter) -> Callable[[], FastFailObserver]: + return FastFailObserver( + config=config, + input=slo_input, + metrics_getter=metrics_getter, + ) + +@freezegun.freeze_time("2020-01-21 12:00:01", auto_tick_seconds=600) +def _make_time_series_list(metric: Metric, values: List[List[float]]) -> List[TimeSeries]: + ret_list = [] + for index, val_list in enumerate(values): + points = list(map(lambda v: DataPoint(metric, datetime.now(), v), val_list)) + ret_list.append(TimeSeries(metric=metric, data_points=points, id=index)) + return ret_list + +@pytest.mark.parametrize( + "checked_at, values, tuning_values", + [ + ( + datetime(2020, 1, 21, 12, 0, 1), + [[21337.0, 566.0, 87.0, 320.0, 59.0]], + [[31337.0, 666.0, 187.0, 420.0, 69.0]], + ), + ( + datetime(2020, 1, 21, 12, 10, 1), + [[21337.0, 566.0, 87.0, 320.0, 59.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], + [[31337.0, 666.0, 187.0, 420.0, 69.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], + ), + ( + datetime(2020, 1, 21, 12, 20, 1), + [[21337.0, 566.0, 87.0, 320.0, 59.0]], + [], + ), + ( + datetime(2020, 1, 21, 12, 30, 1), + [], + [[31337.0, 666.0, 187.0, 420.0, 69.0]], + ), + ( + datetime(2020, 1, 21, 12, 30, 1), + [[1337.0], [566.0]], + [[2337.0], [666.0]], + ) + ], +) +def test_timeseries_slos_pass( + observer: FastFailObserver, + checked_at: datetime, + metric: Metric, + tuning_metric: Metric, + values: List[List[float]], + tuning_values: List[List[float]] +) -> None: + slo_check_readings: Dict[str, List[TimeSeries]] = { + metric.name: _make_time_series_list(metric, values), + tuning_metric.name: _make_time_series_list(tuning_metric, tuning_values) + } + + servo.logging.set_level("DEBUG") + observer.check_readings(slo_check_readings, checked_at) + +@pytest.mark.parametrize( + "checked_at, values, tuning_values, error_str", + [ + ( + datetime(2020, 1, 21, 12, 0, 1), + [[31337.0, 666.0, 187.0, 420.0, 69.0]], + [[21337.0, 566.0, 87.0, 320.0, 59.0]], + "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:00:01 SLO failed metric value 6535.8 was" + " not below threshold value 6000], (throughput below tuning_throughput)[2020-01-21 12:00:01 SLO failed metric" + " value 6535.8 was not below threshold value 4473.8]", + ), + ( + datetime(2020, 1, 21, 12, 10, 1), + [[31337.0, 666.0, 187.0, 420.0, 69.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], + [[21337.0, 566.0, 87.0, 320.0, 59.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], + "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:10:01 SLO failed metric value 6535.8 was" + " not below threshold value 6000], (throughput below tuning_throughput)[2020-01-21 12:10:01 SLO failed metric" + " value 6535.8 was not below threshold value 5504.8]", + ), + ( + datetime(2020, 1, 21, 12, 20, 1), + [[31337.0, 666.0, 187.0, 420.0, 69.0]], + [], + "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:20:01 SLO failed metric value 6535.8 was" + " not below threshold value 6000]", + ), + ( + datetime(2020, 1, 21, 12, 30, 1), + [[31337.0], [666.0]], + [[21337.0], [566.0]], + "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:30:01 SLO failed metric value 16001.5 was" + " not below threshold value 6000], (throughput below tuning_throughput)[2020-01-21 12:30:01 SLO failed metric" + " value 16001.5 was not below threshold value 10951.5]" + ) + ], +) +def test_timeseries_slos_fail( + observer: FastFailObserver, + checked_at: datetime, + metric: Metric, + tuning_metric: Metric, + values: List[List[float]], + tuning_values: List[List[float]], + error_str: str, +) -> None: + slo_check_readings: Dict[str, List[TimeSeries]] = { + metric.name: _make_time_series_list(metric, values), + tuning_metric.name: _make_time_series_list(tuning_metric, tuning_values) + } + + servo.logging.set_level("DEBUG") + with pytest.raises(servo.EventAbortedError) as err_info: + observer.check_readings(slo_check_readings, checked_at) + + assert str(err_info.value) == error_str + +@freezegun.freeze_time("2020-01-21 12:00:01", auto_tick_seconds=600) +def _make_data_point_list(metric: Metric, values: List[float]) -> List[DataPoint]: + return list(map(lambda v: DataPoint(metric, datetime.now(), v), values)) + +@pytest.mark.parametrize( + "checked_at, values, tuning_values", + [ + ( + datetime(2020, 1, 21, 12, 0, 1), + [21337.0, 566.0, 87.0, 320.0, 59.0], + [31337.0, 666.0, 187.0, 420.0, 69.0], + ), + ( + datetime(2020, 1, 21, 12, 10, 1), + [2.0], + [3.0], + ), + ], +) +def test_data_point_slos_pass( + observer: FastFailObserver, + checked_at: datetime, + metric: Metric, + tuning_metric: Metric, + values: List[float], + tuning_values: List[float] +) -> None: + slo_check_readings: Dict[str, List[DataPoint]] = { + metric.name: _make_data_point_list(metric, values), + tuning_metric.name: _make_data_point_list(tuning_metric, tuning_values) + } + + servo.logging.set_level("DEBUG") + observer.check_readings(slo_check_readings, checked_at) + +@pytest.mark.parametrize( + "checked_at, values, tuning_values, error_str", + [ + ( + datetime(2020, 1, 21, 12, 0, 1), + [31337.0, 666.0, 187.0, 420.0, 69.0], + [21337.0, 566.0, 87.0, 320.0, 59.0], + "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:00:01 SLO failed metric value 6535.8 was" + " not below threshold value 6000], (throughput below tuning_throughput)[2020-01-21 12:00:01 SLO failed metric" + " value 6535.8 was not below threshold value 4473.8]" + ), + ( + datetime(2020, 1, 21, 12, 10, 1), + [3.0], + [2.0], + "SLO violation(s) observed: (throughput below tuning_throughput)[2020-01-21 12:10:01 SLO failed metric value" + " 3 was not below threshold value 2]" + ), + ], +) +def test_data_point_slos_fail( + observer: FastFailObserver, + checked_at: datetime, + metric: Metric, + tuning_metric: Metric, + values: List[float], + tuning_values: List[float], + error_str: str, +) -> None: + slo_check_readings: Dict[str, List[DataPoint]] = { + metric.name: _make_data_point_list(metric, values), + tuning_metric.name: _make_data_point_list(tuning_metric, tuning_values) + } + + servo.logging.set_level("DEBUG") + with pytest.raises(servo.EventAbortedError) as err_info: + observer.check_readings(slo_check_readings, checked_at) + + assert str(err_info.value) == error_str From 30f4802976a8fbaa75d21c7bf10a51824dc004ae Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Mon, 30 Aug 2021 18:30:13 -0500 Subject: [PATCH 02/11] Fix failing unit test --- tests/connectors/prometheus_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/connectors/prometheus_test.py b/tests/connectors/prometheus_test.py index 0d5b051f2..80c12a804 100644 --- a/tests/connectors/prometheus_test.py +++ b/tests/connectors/prometheus_test.py @@ -138,6 +138,12 @@ def test_generate_default_config(self): " absent: ignore\n" " eager: null\n" "targets: null\n" + "fast_fail:\n" + " disabled: 0\n" + " period: 1m\n" + " span: 1m\n" + " skip: '0'\n" + ) def test_generate_override_metrics(self): From 531fc96bdb92996f2c5f889561e717f48fba5d43 Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Tue, 31 Aug 2021 10:51:16 -0500 Subject: [PATCH 03/11] Fix flakey integration test and pre-commit --- servo/fast_fail.py | 5 ++--- tests/connectors/prometheus_test.py | 2 +- tests/fast_fail_test.py | 28 ++++++++++++++-------------- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/servo/fast_fail.py b/servo/fast_fail.py index eed277f94..a380b7c79 100644 --- a/servo/fast_fail.py +++ b/servo/fast_fail.py @@ -54,7 +54,7 @@ class FastFailObserver(pydantic.BaseModel): async def observe(self, progress: servo.EventProgress) -> None: if progress.elapsed < self.config.skip: return - + checked_at = datetime.datetime.now() metrics = await self.metrics_getter(checked_at - self.config.span, checked_at) self.check_readings(metrics=metrics, checked_at=checked_at) @@ -98,7 +98,7 @@ def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_ if len(list(filter(lambda res: res.status == SloOutcomeStatus.failed, self._results[condition]))) >= condition.trigger_count: failures[condition] = self._results[condition] - + servo.logger.debug(f"SLO results: {devtools.pformat(self._results)}") if failures: @@ -144,4 +144,3 @@ def _get_results_str(results: Dict[servo.types.SloCondition, List[SloOutcome]]) outcome_str_list = list(map(lambda outcome: outcome.to_message(condition), outcome_list)) fmt_outcomes.append(f"{condition}[{', '.join(outcome_str_list)}]") return ", ".join(fmt_outcomes) - diff --git a/tests/connectors/prometheus_test.py b/tests/connectors/prometheus_test.py index 80c12a804..b64892130 100644 --- a/tests/connectors/prometheus_test.py +++ b/tests/connectors/prometheus_test.py @@ -143,7 +143,6 @@ def test_generate_default_config(self): " period: 1m\n" " span: 1m\n" " skip: '0'\n" - ) def test_generate_override_metrics(self): @@ -681,6 +680,7 @@ async def send_traffic() -> None: SloCondition( metric="tuning_p50_latency", threshold_metric="main_p50_latency", + threshold_multiplier=1.1, ) ])) ) diff --git a/tests/fast_fail_test.py b/tests/fast_fail_test.py index 09e9a2914..fca3d87b3 100644 --- a/tests/fast_fail_test.py +++ b/tests/fast_fail_test.py @@ -112,12 +112,12 @@ def _make_time_series_list(metric: Metric, values: List[List[float]]) -> List[Ti ( datetime(2020, 1, 21, 12, 0, 1), [[21337.0, 566.0, 87.0, 320.0, 59.0]], - [[31337.0, 666.0, 187.0, 420.0, 69.0]], + [[31337.0, 666.0, 187.0, 420.0, 69.0]], ), ( datetime(2020, 1, 21, 12, 10, 1), [[21337.0, 566.0, 87.0, 320.0, 59.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], - [[31337.0, 666.0, 187.0, 420.0, 69.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], + [[31337.0, 666.0, 187.0, 420.0, 69.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], ), ( datetime(2020, 1, 21, 12, 20, 1), @@ -145,10 +145,10 @@ def test_timeseries_slos_pass( tuning_values: List[List[float]] ) -> None: slo_check_readings: Dict[str, List[TimeSeries]] = { - metric.name: _make_time_series_list(metric, values), + metric.name: _make_time_series_list(metric, values), tuning_metric.name: _make_time_series_list(tuning_metric, tuning_values) } - + servo.logging.set_level("DEBUG") observer.check_readings(slo_check_readings, checked_at) @@ -165,7 +165,7 @@ def test_timeseries_slos_pass( ), ( datetime(2020, 1, 21, 12, 10, 1), - [[31337.0, 666.0, 187.0, 420.0, 69.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], + [[31337.0, 666.0, 187.0, 420.0, 69.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], [[21337.0, 566.0, 87.0, 320.0, 59.0], [31337.0, 666.0, 187.0, 420.0, 69.0]], "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:10:01 SLO failed metric value 6535.8 was" " not below threshold value 6000], (throughput below tuning_throughput)[2020-01-21 12:10:01 SLO failed metric" @@ -198,10 +198,10 @@ def test_timeseries_slos_fail( error_str: str, ) -> None: slo_check_readings: Dict[str, List[TimeSeries]] = { - metric.name: _make_time_series_list(metric, values), + metric.name: _make_time_series_list(metric, values), tuning_metric.name: _make_time_series_list(tuning_metric, tuning_values) } - + servo.logging.set_level("DEBUG") with pytest.raises(servo.EventAbortedError) as err_info: observer.check_readings(slo_check_readings, checked_at) @@ -218,12 +218,12 @@ def _make_data_point_list(metric: Metric, values: List[float]) -> List[DataPoint ( datetime(2020, 1, 21, 12, 0, 1), [21337.0, 566.0, 87.0, 320.0, 59.0], - [31337.0, 666.0, 187.0, 420.0, 69.0], + [31337.0, 666.0, 187.0, 420.0, 69.0], ), ( datetime(2020, 1, 21, 12, 10, 1), [2.0], - [3.0], + [3.0], ), ], ) @@ -236,10 +236,10 @@ def test_data_point_slos_pass( tuning_values: List[float] ) -> None: slo_check_readings: Dict[str, List[DataPoint]] = { - metric.name: _make_data_point_list(metric, values), + metric.name: _make_data_point_list(metric, values), tuning_metric.name: _make_data_point_list(tuning_metric, tuning_values) } - + servo.logging.set_level("DEBUG") observer.check_readings(slo_check_readings, checked_at) @@ -248,7 +248,7 @@ def test_data_point_slos_pass( [ ( datetime(2020, 1, 21, 12, 0, 1), - [31337.0, 666.0, 187.0, 420.0, 69.0], + [31337.0, 666.0, 187.0, 420.0, 69.0], [21337.0, 566.0, 87.0, 320.0, 59.0], "SLO violation(s) observed: (throughput below 6000)[2020-01-21 12:00:01 SLO failed metric value 6535.8 was" " not below threshold value 6000], (throughput below tuning_throughput)[2020-01-21 12:00:01 SLO failed metric" @@ -256,7 +256,7 @@ def test_data_point_slos_pass( ), ( datetime(2020, 1, 21, 12, 10, 1), - [3.0], + [3.0], [2.0], "SLO violation(s) observed: (throughput below tuning_throughput)[2020-01-21 12:10:01 SLO failed metric value" " 3 was not below threshold value 2]" @@ -273,7 +273,7 @@ def test_data_point_slos_fail( error_str: str, ) -> None: slo_check_readings: Dict[str, List[DataPoint]] = { - metric.name: _make_data_point_list(metric, values), + metric.name: _make_data_point_list(metric, values), tuning_metric.name: _make_data_point_list(tuning_metric, tuning_values) } From 6b13dc8984c2ddbcf4a947fe2f93de1c78ea224e Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Wed, 1 Sep 2021 14:47:42 -0500 Subject: [PATCH 04/11] Add info logs for fast fail slo monitoring --- servo/connectors/prometheus.py | 4 ++++ servo/fast_fail.py | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/servo/connectors/prometheus.py b/servo/connectors/prometheus.py index cfed13b75..b0ed76638 100644 --- a/servo/connectors/prometheus.py +++ b/servo/connectors/prometheus.py @@ -884,6 +884,10 @@ async def measure( # Handle fast fail metrics if self.config.fast_fail.disabled == 0 and control.userdata and control.userdata.slo: + self.logger.info( + "Fast Fail enabled, the following SLO Conditions will be monitored during measurement: " + f"{', '.join(map(str, control.userdata.slo.conditions))}" + ) fast_fail_observer = servo.fast_fail.FastFailObserver( config=self.config.fast_fail, input=control.userdata.slo, diff --git a/servo/fast_fail.py b/servo/fast_fail.py index a380b7c79..5b24ec4ec 100644 --- a/servo/fast_fail.py +++ b/servo/fast_fail.py @@ -101,6 +101,18 @@ def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_ servo.logger.debug(f"SLO results: {devtools.pformat(self._results)}") + # Log the latest results + last_results_buckets: Dict[SloOutcomeStatus, List[str]] = collections.defaultdict(list) + for condition, results_list in self._results.items(): + last_result = results_list[-1] + last_results_buckets[last_result.status].append(str(condition)) + + last_results_messages: List[str] = [] + for status, condition_str_list in last_results_buckets.items(): + last_results_messages.append(f"x{len(condition_str_list)} {status} [{', '.join(condition_str_list)}]") + + servo.logger.info(f"SLO statuses from last check: {', '.join(last_results_messages)}") + if failures: raise servo.errors.EventAbortedError( f"SLO violation(s) observed: {_get_results_str(failures)}", From 3cc73655f9b0fba855fe2a614247b946d4aef5f4 Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Thu, 2 Sep 2021 14:14:51 -0500 Subject: [PATCH 05/11] Add unit test for EventAbortedError --- tests/api_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/api_test.py b/tests/api_test.py index cb93aca11..bdcc89630 100644 --- a/tests/api_test.py +++ b/tests/api_test.py @@ -10,6 +10,12 @@ def test_adjustment_rejected_from_error(self) -> None: assert status.message == 'foo' assert status.status == 'rejected' + def test_event_aborted_from_error(self) -> None: + error = servo.errors.EventAbortedError("bar") + status = servo.api.Status.from_error(error) + assert status.message == 'bar' + assert status.status == 'aborted' + def test_event_cancelled_from_error(self) -> None: error = servo.errors.EventCancelledError("Command cancelled") status = servo.api.Status.from_error(error) From d6e0132f386281a7e9772d86db2ce721fbbaa7af Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Thu, 2 Sep 2021 15:00:57 -0500 Subject: [PATCH 06/11] Remove unused type imports --- servo/fast_fail.py | 2 +- servo/types.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/servo/fast_fail.py b/servo/fast_fail.py index 5b24ec4ec..bb3503298 100644 --- a/servo/fast_fail.py +++ b/servo/fast_fail.py @@ -5,7 +5,7 @@ import operator import pydantic import statistics -from typing import Awaitable, Callable, Dict, List, MutableMapping, Optional +from typing import Awaitable, Callable, Dict, List, Optional import devtools diff --git a/servo/types.py b/servo/types.py index d3a5f6e3b..e4741117b 100644 --- a/servo/types.py +++ b/servo/types.py @@ -20,7 +20,6 @@ Callable, Dict, List, - Mapping, Optional, Protocol, Tuple, From 807da49cc03cc5128e7123ac04dd88bed6f93069 Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Thu, 2 Sep 2021 17:24:14 -0500 Subject: [PATCH 07/11] Fix MeasureParams parse failure not logged --- servo/api.py | 2 +- servo/runner.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/servo/api.py b/servo/api.py index 587ff6db2..c353980cb 100644 --- a/servo/api.py +++ b/servo/api.py @@ -142,7 +142,7 @@ class CommandResponse(pydantic.BaseModel): command: Commands = pydantic.Field(alias="cmd") param: Optional[ Union[MeasureParams, Dict[str, Any]] - ] # TODO: Switch to a union of supported types + ] # TODO: Switch to a union of supported types, remove isinstance check from ServoRunner.measure when done class Config: json_encoders = { diff --git a/servo/runner.py b/servo/runner.py index ec0d08231..04eb935e6 100644 --- a/servo/runner.py +++ b/servo/runner.py @@ -81,6 +81,9 @@ async def describe(self, control: Control) -> Description: return aggregate_description async def measure(self, param: servo.MeasureParams) -> Measurement: + if isinstance(param, dict): + # required parsing has failed in api.Mixin._post_event(), run parse_obj to surface the validation errors + servo.api.MeasureParams.parse_obj(param) servo.logger.info(f"Measuring... [metrics={', '.join(param.metrics)}]") servo.logger.trace(devtools.pformat(param)) From 048a7c3e0b1bd29ef1d421bcdc11e093b5476c4b Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Thu, 2 Sep 2021 19:01:56 -0500 Subject: [PATCH 08/11] Add condition config for equal values --- servo/fast_fail.py | 4 +++- servo/types.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/servo/fast_fail.py b/servo/fast_fail.py index bb3503298..ac9314bd0 100644 --- a/servo/fast_fail.py +++ b/servo/fast_fail.py @@ -88,7 +88,9 @@ def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_ result_args.update(threshold_value=threshold_value, threshold_readings=threshold_readings) # Check target against threshold check_passed_op = _get_keep_operator(condition.keep) - if check_passed_op(metric_value, threshold_value): + if condition.allow_equals and metric_value == threshold_value: + self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.passed)) + elif check_passed_op(metric_value, threshold_value): self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.passed)) else: self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.failed)) diff --git a/servo/types.py b/servo/types.py index e4741117b..27061494d 100644 --- a/servo/types.py +++ b/servo/types.py @@ -1225,6 +1225,7 @@ class SloCondition(BaseModel): metric: str threshold_multiplier: decimal.Decimal = decimal.Decimal(1) keep: SloKeep = SloKeep.below + allow_equals: bool = False trigger_count: pydantic.conint(ge=1, multiple_of=1) = 1 trigger_window: pydantic.conint(ge=1, multiple_of=1) = None threshold: Optional[decimal.Decimal] From 38e9c02fbd0283d7f519c1563206135c66489b89 Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Fri, 3 Sep 2021 12:48:51 -0500 Subject: [PATCH 09/11] Implicitly allow equal values in fast fail checks - remove allow_equals condition config --- servo/fast_fail.py | 8 +++----- servo/types.py | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/servo/fast_fail.py b/servo/fast_fail.py index ac9314bd0..a1f00d346 100644 --- a/servo/fast_fail.py +++ b/servo/fast_fail.py @@ -88,9 +88,7 @@ def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_ result_args.update(threshold_value=threshold_value, threshold_readings=threshold_readings) # Check target against threshold check_passed_op = _get_keep_operator(condition.keep) - if condition.allow_equals and metric_value == threshold_value: - self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.passed)) - elif check_passed_op(metric_value, threshold_value): + if check_passed_op(metric_value, threshold_value): self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.passed)) else: self._results[condition].append(SloOutcome(**result_args, status=SloOutcomeStatus.failed)) @@ -125,9 +123,9 @@ def check_readings(self, metrics: Dict[str, List[servo.types.Reading]], checked_ # Helper methods def _get_keep_operator(keep: servo.types.SloKeep): if keep == servo.types.SloKeep.below: - return operator.lt + return operator.le elif keep == servo.types.SloKeep.above: - return operator.gt + return operator.ge else: raise ValueError(f"Unknown SloKeep type {keep}") diff --git a/servo/types.py b/servo/types.py index 27061494d..e4741117b 100644 --- a/servo/types.py +++ b/servo/types.py @@ -1225,7 +1225,6 @@ class SloCondition(BaseModel): metric: str threshold_multiplier: decimal.Decimal = decimal.Decimal(1) keep: SloKeep = SloKeep.below - allow_equals: bool = False trigger_count: pydantic.conint(ge=1, multiple_of=1) = 1 trigger_window: pydantic.conint(ge=1, multiple_of=1) = None threshold: Optional[decimal.Decimal] From 89060c1ca9da606b371c6bb1c7599d16d05a66b7 Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Fri, 3 Sep 2021 13:45:44 -0500 Subject: [PATCH 10/11] Change timing of flakey integration test --- tests/connectors/prometheus_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/connectors/prometheus_test.py b/tests/connectors/prometheus_test.py index b64892130..f5a522e2a 100644 --- a/tests/connectors/prometheus_test.py +++ b/tests/connectors/prometheus_test.py @@ -759,7 +759,8 @@ async def send_traffic() -> None: finally: servo.logger.success(f"Sent {count} requests to {main_fiber_url} and {tuning_fiber_url}.") - config.fast_fail.period = Duration("10s") + config.fast_fail.skip = Duration("10s") + config.fast_fail.period = Duration("2s") connector = PrometheusConnector(config=config, optimizer=optimizer) control = Control( duration="10s", From ec9529437562b89a24c2a2b9b574024ccdce1394 Mon Sep 17 00:00:00 2001 From: Fred L Sharp Date: Fri, 3 Sep 2021 14:12:49 -0500 Subject: [PATCH 11/11] Adjust timing of flakey integration test --- tests/connectors/prometheus_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/connectors/prometheus_test.py b/tests/connectors/prometheus_test.py index f5a522e2a..8637310d4 100644 --- a/tests/connectors/prometheus_test.py +++ b/tests/connectors/prometheus_test.py @@ -759,7 +759,7 @@ async def send_traffic() -> None: finally: servo.logger.success(f"Sent {count} requests to {main_fiber_url} and {tuning_fiber_url}.") - config.fast_fail.skip = Duration("10s") + config.fast_fail.skip = Duration("14s") config.fast_fail.period = Duration("2s") connector = PrometheusConnector(config=config, optimizer=optimizer) control = Control(