Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(symbolication): Selection logic for low priority queue [NATIVE-211] #29258

Merged
merged 27 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ files = src/sentry/api/bases/external_actor.py,
src/sentry/utils/kvstore,
src/sentry/web/decorators.py,
tests/sentry/processing/realtime_metrics/,
tests/sentry/tasks/test_low_priority_symbolication.py,
tests/sentry/utils/appleconnect/

; Enable all options used with --strict
Expand Down
13 changes: 11 additions & 2 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2374,7 +2374,11 @@ def build_cdc_postgres_init_db_volume(settings):
# so that projects that exceed a reasonable rate can be sent to the low
# priority queue. This setting determines how long we keep these rates
# around.
"counter_time_window": 300,
#
# The LPQ selection is computed using the rate of the most recent events covered by this
# time window. See sentry.tasks.low_priority_symbolication.excessive_event_rate for the
# exact implementation.
"counter_time_window": 10 * 60,
# The bucket size of the processing duration histogram.
#
# The size (in seconds) of the buckets that events are sorted into.
Expand All @@ -2385,7 +2389,12 @@ def build_cdc_postgres_init_db_volume(settings):
# so that projects that exceed a reasonable duration can be sent to the low
# priority queue. This setting determines how long we keep these duration values
# around.
"duration_time_window": 900,
#
# The LPQ selection is computed using the durations of the most recent events covered by
# this time window. See
# sentry.tasks.low_priority_symbolication.excessive_event_duration for the exact
# implementation.
"duration_time_window": 3 * 60,
# Number of seconds to wait after a project is made eligible or ineligible for the LPQ
# before its eligibility can be changed again.
#
Expand Down
131 changes: 113 additions & 18 deletions src/sentry/processing/realtime_metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,131 @@
import collections
import dataclasses
from typing import Dict, Iterable, NewType, Set
import enum
from typing import ClassVar, DefaultDict, Iterable, List, Set, Union

from sentry.utils.services import Service


class _Period(enum.Enum):
"""An enum to represent a singleton, for mypy's sake."""

TOTAL_PERIOD = 0


@dataclasses.dataclass(frozen=True)
class BucketedCount:
"""
Timestamp to count mapping. This represents some `count` amount of something performed
during `timestamp`. `timestamp` is stored in seconds.
class BucketedCounts:
"""A count of something which occurred inside a certain timespan.

The timespan is further split up in multiple buckets of ``width`` seconds each.
``timestamp`` denotes the POSIX timestamp of the start of the first bucket.
"""

timestamp: int
count: int
width: int
counts: List[int]

TOTAL_PERIOD: ClassVar[_Period] = _Period.TOTAL_PERIOD

# Duration to count mapping where the keys are durations and the values are counts. This represents
# some `count` instances of some action where each individual instance some
# [`duration`, `duration`+10) seconds of time to complete. `duration` is stored in seconds.
BucketedDurations = NewType("BucketedDurations", Dict[int, int])
def total_time(self) -> int:
"""Returns the total timespan covered by all buckets in seconds."""
return self.width * len(self.counts)

def total_count(self) -> int:
"""Returns the sum of the counts in all the buckets."""
return sum(self.counts)

@dataclasses.dataclass(frozen=True)
class DurationHistogram:
def rate(self, period: Union[int, _Period] = TOTAL_PERIOD) -> float:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loewenheim since you asked about tagged unions in python. I think generally you don't but duck-type instead. It seems with mypy this looks something like this. It was a little easier before mypy, the class attribute would have been simply TOTAL_PERIOD = object() and you get a singleton you can compare with is. Apparently with mypy you need to do this kind of enum-with-one-variant thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Rust-like tagged union would have been an arg of type:

enum Period {
    Total,
    Partial(u32),
}

in case you're lost why i bring this up here.

"""Computes the rate of counts in the buckets for the given period.

The period must either be the special value :attr:`BucketedCounts.TOTAL_PERIOD` or a
number of seconds. In the latter case the rate of the most recent number of seconds
will be computed.

:raises ValueError: if the given number of seconds is smaller than the
:attr:`BucketedCounts.width`.
"""
if period is self.TOTAL_PERIOD:
timespan = len(self.counts) * self.width
else:
if period < self.width:
raise ValueError(
f"Buckets of {self.width}s are too small to compute rate over {period}s"
)
timespan = period
bucket_count = int(timespan / self.width)
return sum(self.counts[-bucket_count:]) / timespan


class DurationsHistogram:
"""A histogram with fixed bucket sizes.

:ivar bucket_size: the size of the buckets in the histogram.

The counters are stored in a sparse dict, with the keys being the start of a bucket
duration, so e.g. with a bucket_size of ``10`` key ``30`` would cover durations in the
range [30, 40).

TODO: Not sure how much we gain from being sparse, maybe not being sparse and using a
list as storage is more efficient.
"""
Mapping of timestamp to histogram-like dict of durations. This represents some `count` amount of
some action performed during `timestamp`, where `counts` are grouped by how long that action
took. `timestamp` is stored in seconds.

def __init__(self, bucket_size: int = 10):
self.bucket_size = bucket_size
self._data: DefaultDict[int, int] = collections.defaultdict(lambda: 0)

def incr(self, duration: int, count: int = 1) -> None:
"""Increments the histogram counter of the bucket in which `duration` falls."""
bucket_key = duration - (duration % self.bucket_size)
self._data[bucket_key] += count

def incr_from(self, other: "DurationsHistogram") -> None:
"""Add the counts from another histogram to this one.

The bucket_size must match.
"""
assert self.bucket_size == other.bucket_size
for key, count in other._data.items():
self._data[key] += count

def percentile(self, percentile: float) -> int:
"""Returns the requested percentile of the histogram.

The percentile should be expressed as a number between 0 and 1, i.e. 0.75 is the
75th percentile.
"""
required_count = percentile * self.total_count()
running_count = 0
for (duration, count) in self._data.items():
running_count += count
if running_count >= required_count:
return duration
else:
raise ValueError("Failed to compute percentile")

def total_count(self) -> int:
"""Returns the sum of the counts of all the buckets in the histogram."""
return sum(self._data.values())

def __repr__(self) -> str:
return f"<DurationsHistogram [{sorted(self._data.items())}]>"


@dataclasses.dataclass(frozen=True)
class BucketedDurationsHistograms:
"""histograms of counters for a certain timespan.

The timespan is split up in multiple buckets of ``width`` seconds, with ``timestamp``
denoting the POSIX timestamp of the start of the first bucket. Each bucket contains a
:class:`DurationHistogram`.
"""

timestamp: int
histogram: BucketedDurations
width: int
histograms: List[DurationsHistogram]

def total_time(self) -> int:
"""Returns the total timespan covered by the buckets in seconds."""
return self.width * len(self.histograms)


class RealtimeMetricsStore(Service): # type: ignore
Expand Down Expand Up @@ -75,7 +170,7 @@ def projects(self) -> Iterable[int]:
"""
raise NotImplementedError

def get_counts_for_project(self, project_id: int, timestamp: int) -> Iterable[BucketedCount]:
def get_counts_for_project(self, project_id: int, timestamp: int) -> BucketedCounts:
"""
Returns a sorted list of bucketed timestamps paired with the count of symbolicator requests
made during that time for some given project.
Expand All @@ -84,7 +179,7 @@ def get_counts_for_project(self, project_id: int, timestamp: int) -> Iterable[Bu

def get_durations_for_project(
self, project_id: int, timestamp: int
) -> Iterable[DurationHistogram]:
) -> BucketedDurationsHistograms:
"""
Returns a sorted list of bucketed timestamps paired with a histogram-like dictionary of
symbolication durations made during some timestamp for some given project.
Expand Down
60 changes: 31 additions & 29 deletions src/sentry/processing/realtime_metrics/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ def __init__(
self.validate()

def validate(self) -> None:
if self._counter_bucket_size <= 0:
raise InvalidConfiguration("counter bucket size must be at least 1")
if not 0 < self._counter_bucket_size <= 60:
raise InvalidConfiguration("counter bucket size must be 1-60 seconds")

if self._duration_bucket_size <= 0:
raise InvalidConfiguration("duration bucket size must be at least 1")
if not 0 < self._duration_bucket_size <= 60:
raise InvalidConfiguration("duration bucket size must be 1-60 seconds")

if self._counter_time_window < 0:
raise InvalidConfiguration("counter time window must be nonnegative")
if self._counter_time_window < 60:
raise InvalidConfiguration("counter time window must be at least a minute")

if self._duration_time_window < 0:
raise InvalidConfiguration("duration time window must be nonnegative")
if self._duration_time_window < 60:
raise InvalidConfiguration("duration time window must be at least a minute")

def _counter_key_prefix(self) -> str:
return f"{self._prefix}:counter:{self._counter_bucket_size}"
Expand Down Expand Up @@ -154,14 +154,12 @@ def projects(self) -> Iterable[int]:
already_seen.add(project_id)
yield project_id

def get_counts_for_project(
self, project_id: int, timestamp: int
) -> Iterable[base.BucketedCount]:
"""
Returns a sorted list of bucketed timestamps paired with the count of symbolicator requests
def get_counts_for_project(self, project_id: int, timestamp: int) -> base.BucketedCounts:
"""Returns a sorted list of bucketed timestamps paired with the count of symbolicator requests
made during that time for some given project.

The first bucket returned is the one that `timestamp - self._counter_time_window` falls into. The last bucket returned is the one that `timestamp` falls into.
The first bucket returned is the one that `timestamp - self._counter_time_window`
falls into. The last bucket returned is the one that `timestamp` falls into.

This may throw an exception if there is some sort of issue fetching counts from the redis
store.
Expand All @@ -174,20 +172,19 @@ def get_counts_for_project(

buckets = range(first_bucket, now_bucket + bucket_size, bucket_size)
keys = [f"{self._counter_key_prefix()}:{project_id}:{ts}" for ts in buckets]

counts = self.cluster.mget(keys)
for ts, count_raw in zip(buckets, counts):
count = int(count_raw) if count_raw else 0
yield base.BucketedCount(timestamp=ts, count=count)
return base.BucketedCounts(
timestamp=buckets[0], width=bucket_size, counts=[int(c) if c else 0 for c in counts]
)

def get_durations_for_project(
self, project_id: int, timestamp: int
) -> Iterable[base.DurationHistogram]:
"""
Returns a sorted list of bucketed timestamps paired with a histogram-like dictionary of
) -> base.BucketedDurationsHistograms:
"""Returns a sorted list of bucketed timestamps paired with a histogram-like dictionary of
symbolication durations made during some timestamp for some given project.

The first bucket returned is the one that `timestamp - self._duration_time_window` falls into. The last bucket returned is the one that `timestamp` falls into.
The first bucket returned is the one that `timestamp - self._duration_time_window`
falls into. The last bucket returned is the one that `timestamp` falls into.

For a given `{duration:count}` entry in the dictionary bound to a specific `timestamp`:

Expand All @@ -214,13 +211,18 @@ def get_durations_for_project(
pipeline.hgetall(f"{self._duration_key_prefix()}:{project_id}:{ts}")
histograms = pipeline.execute()

for ts, histogram_redis_raw in zip(buckets, histograms):
histogram = {duration: 0 for duration in range(0, 600, 10)}
histogram_redis = {
int(duration): int(count) for duration, count in histogram_redis_raw.items()
}
histogram.update(histogram_redis)
yield base.DurationHistogram(timestamp=ts, histogram=base.BucketedDurations(histogram))
all_histograms = []
for ts, histogram_redis in zip(buckets, histograms):
histogram = base.DurationsHistogram(bucket_size=10)
for duration, count in histogram_redis.items():
histogram.incr(int(duration), int(count))
all_histograms.append(histogram)

return base.BucketedDurationsHistograms(
timestamp=first_bucket,
width=bucket_size,
histograms=all_histograms,
)

def get_lpq_projects(self) -> Set[int]:
"""
Expand Down
Loading