Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(symbolicator): Automatically add and remove projects to the LPQ based on their historical perf #28714

Merged
merged 37 commits into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
33a12b3
general start to start scanning for lpq candidates
relaxolotl Sep 21, 2021
b575c67
fill in the basics to get candidates and their associated counts
relaxolotl Sep 22, 2021
4945a9a
add some basic feedback
relaxolotl Sep 22, 2021
fa291f8
add soft timeouts to tasks
relaxolotl Sep 22, 2021
179ad70
be consistent with wording
relaxolotl Sep 22, 2021
13a0690
fix type
relaxolotl Sep 22, 2021
7c3511b
make timings sensible
relaxolotl Sep 22, 2021
06651a4
types
relaxolotl Sep 24, 2021
c08012d
missing celery import
relaxolotl Sep 28, 2021
5540347
use a better name for the task that recomputes the lpq project list
relaxolotl Sep 28, 2021
d602767
use a more project() instead of get_lpq_candidates()
relaxolotl Sep 29, 2021
c4ad504
remove bucketed from function name
relaxolotl Sep 29, 2021
3afe121
calculate commonly used key prefixes in one place
relaxolotl Sep 29, 2021
af46bb4
fix key prefixes to match new format
relaxolotl Sep 29, 2021
f8f9857
use a class for BucketedCount so the fields are typed
relaxolotl Sep 29, 2021
8648509
add the ability to grab durations
relaxolotl Sep 29, 2021
1e8c5f1
fix tests
relaxolotl Sep 29, 2021
02d1251
left logs behind
relaxolotl Sep 29, 2021
2faecf1
mention exception throwing everywhere if it's going to be mentioned i…
relaxolotl Sep 29, 2021
7606322
don't let eligibility calculations block scanning
relaxolotl Sep 29, 2021
1ebdc3a
remove a project from the LPQ to be safe if it isn't eligible for it
relaxolotl Sep 29, 2021
3cf56ac
gooder logging
relaxolotl Sep 29, 2021
11942c0
docstrings
relaxolotl Sep 29, 2021
3af74e1
patch up tests
relaxolotl Sep 29, 2021
8bc782d
style(lint): Auto commit lint changes
getsantry[bot] Sep 29, 2021
991cfbe
make typing happy and fix a silly bug
relaxolotl Sep 29, 2021
6c4ca3e
update tests
relaxolotl Sep 29, 2021
5f192ba
thanks sentrybot
relaxolotl Sep 29, 2021
460e71c
more test updates
relaxolotl Sep 29, 2021
59b4c78
raise an error instead of passing if an unimplemented method is invoked
relaxolotl Sep 29, 2021
b973c84
underscores are good visibility hints
relaxolotl Sep 29, 2021
4a502e1
lost doc changes from base
relaxolotl Sep 29, 2021
00b8a03
remove trycatches and just fail on conversion errors
relaxolotl Sep 29, 2021
9fb4316
compute also updates eligibility
relaxolotl Sep 29, 2021
48dc8cf
update docstring to match rename
relaxolotl Sep 29, 2021
1644da0
match the pattern that exists for this naming scheme
relaxolotl Sep 29, 2021
934093c
missed a rename
relaxolotl Oct 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME():
"sentry.tasks.files",
"sentry.tasks.groupowner",
"sentry.tasks.integrations",
"sentry.tasks.low_priority_symbolication",
"sentry.tasks.members",
"sentry.tasks.merge",
"sentry.tasks.releasemonitor",
Expand Down Expand Up @@ -638,6 +639,10 @@ def SOCIAL_AUTH_DEFAULT_USERNAME():
Queue("sleep", routing_key="sleep"),
Queue("stats", routing_key="stats"),
Queue("subscriptions", routing_key="subscriptions"),
Queue(
"symbolications.compute_low_priority_projects",
routing_key="symbolications.compute_low_priority_projects",
),
Queue("unmerge", routing_key="unmerge"),
Queue("update", routing_key="update"),
]
Expand Down Expand Up @@ -779,6 +784,11 @@ def create_partitioned_queues(name):
"schedule": timedelta(minutes=20),
"options": {"expires": 20 * 60},
},
"check-symbolicator-lpq-project-eligibility": {
"task": "sentry.tasks.low_priority_symbolication.scan_for_suspect_projects",
"schedule": timedelta(seconds=10),
"options": {"expires": 10},
},
}

BGTASKS = {
Expand Down
95 changes: 80 additions & 15 deletions src/sentry/processing/realtime_metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
from typing import Set
import dataclasses
from typing import Dict, Iterable, NewType, Set

from sentry.utils.services import Service


@dataclasses.dataclass(frozen=True)
class BucketedCount:
"""
Timestamp to count mapping. This represents some `count` amount of something performed
during `timestamp`. `timestamp` is stored in seconds.
"""

timestamp: int
count: int


# Duration to count mapping where the keys are durations and the values are counts. This represents
# some `count` instances of some action where each individual instance some
# [`duration`, `duration`+10) seconds of time to complete. `duration` is stored in seconds.
BucketedDurations = NewType("BucketedDurations", Dict[int, int])


@dataclasses.dataclass(frozen=True)
class DurationHistogram:
"""
Mapping of timestamp to histogram-like dict of durations. This represents some `count` amount of
some action performed during `timestamp`, where `counts` are grouped by how long that action
took. `timestamp` is stored in seconds.
"""

timestamp: int
histogram: BucketedDurations


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like all the nice custom types 😄

class RealtimeMetricsStore(Service): # type: ignore
"""A service for storing metrics about incoming requests within a given time window."""

Expand All @@ -23,7 +53,7 @@ def increment_project_event_counter(self, project_id: int, timestamp: int) -> No
time-window bucket with "timestamp" providing the time of the event
in seconds since the UNIX epoch (i.e., as returned by time.time()).
"""
pass
raise NotImplementedError

def increment_project_duration_counter(
self, project_id: int, timestamp: int, duration: int
Expand All @@ -34,32 +64,67 @@ def increment_project_duration_counter(
Calling this increments the counter of the current time-window bucket with "timestamp" providing
the time of the event in seconds since the UNIX epoch and "duration" the processing time in seconds.
"""
pass
raise NotImplementedError

def projects(self) -> Iterable[int]:
"""
Returns IDs of all projects that should be considered for the low priority queue.
"""
Comment on lines +70 to +72
Copy link
Contributor

@loewenheim loewenheim Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better as

Suggested change
"""
Returns IDs of all projects that should be considered for the low priority queue.
"""
"""
Returns IDs of all projects for which metrics have been recorded in the store.
"""

or something like that. RealtimeMetricsStore is not billed as having anything to do with the low priority queue, so this mention of it comes a bit out of left field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks for catching this!

raise NotImplementedError

def get_counts_for_project(self, project_id: int) -> Iterable[BucketedCount]:
"""
Returns a sorted list of bucketed timestamps paired with the count of symbolicator requests
made during that time for some given project.
"""
raise NotImplementedError

def get_durations_for_project(self, project_id: int) -> Iterable[DurationHistogram]:
"""
Returns a sorted list of bucketed timestamps paired with a dictionary of symbolicator
durations grouped in 10 second durations made during that time for some given project.
"""
raise NotImplementedError

def get_lpq_projects(self) -> Set[int]:
"""
Fetches the list of projects that are currently using the low priority queue.

Returns a list of project IDs.
"""
pass
raise NotImplementedError

def add_project_to_lpq(self, project_id: int) -> bool:
"""
Assigns a project to the low priority queue.

This registers an intent to redirect all symbolication events triggered by the specified
project to be redirected to the low priority queue.

Returns True if the project was a new addition to the list. Returns False if it was already
assigned to the low priority queue.
Comment on lines +104 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this return if the project wasn't in the lpq and hasn't been added because it's on the never list?

"""
raise NotImplementedError

def add_project_to_lpq(self, project_id: int) -> None:
def remove_project_from_lpq(self, project_id: int) -> bool:
"""
Moves a project to the low priority queue.
Removes a project from the low priority queue.

This registers an intent to restore all specified projects back to the regular queue.

This forces all symbolication events triggered by the specified project to be redirected to
the low priority queue, unless the project is manually excluded from the low priority queue
via the `store.symbolicate-event-lpq-never` kill switch.
Returns True if the project was assigned to the queue prior to its removal. Returns False if
it wasn't assigned to the queue to begin with.
Comment on lines +115 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above: what happens if the project was in the queue, but hasn't been removed because it's in the always list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try to address both questions in one comment:

i think i phrased the documentation poorly, particularly when it comes to mentioning these two kill switches. any mutations of the value stored in redis (ie store.symbolicate-event-lpq-selected) do not read or take into consideration the two manual kill switches (store.symbolicate-event-lpq-never and store.symbolicate-event-lpq-always).

i mentioned the switches as a way to provide context if a project's events continued to be routed into the "wrong" queue despite these clearly being invoked, but i think that just made things more confusing. the only place that does collect all three of -selected, -never, and -always exclusively reads the contents of those values, but they do not mutate them.

i've changed the wording on the docstring to better reflect this in this PR and the parent PR: #28757

let me know what you think of the updated docstrings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me see if I understand this correctly. Before, my main issue with the documentation was this: does the return value of (e.g.) add_project_to_lpq reflect whether the project was added or whether it is now in the lpq? But if the killswitches aren't taken into account, the latter is meaningless because an addition always results in the project being in the lpq. Thus returning whether it was added (or removed) is the only useful option.

"""
pass
raise NotImplementedError

def remove_projects_from_lpq(self, project_ids: Set[int]) -> None:
def remove_projects_from_lpq(self, project_ids: Set[int]) -> int:
"""
Removes projects from the low priority queue.

This restores all specified projects back to the regular queue, unless they have been
manually forced into the low priority queue via the `store.symbolicate-event-lpq-always`
kill switch.
This registers an intent to restore all specified projects back to the regular queue.

Returns the number of projects that were actively removed from the queue. Any projects that
were not assigned to the low priority queue to begin with will be omitted from the return
value.
"""
pass
raise NotImplementedError
135 changes: 125 additions & 10 deletions src/sentry/processing/realtime_metrics/redis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
from typing import Set
import logging
from itertools import chain
from typing import Iterable, Set

from sentry.exceptions import InvalidConfiguration
from sentry.utils import redis
Expand All @@ -9,6 +11,8 @@
# redis key for entry storing current list of LPQ members
LPQ_MEMBERS_KEY = "store.symbolicate-event-lpq-selected"

logger = logging.getLogger(__name__)


class RedisRealtimeMetricsStore(base.RealtimeMetricsStore):
"""An implementation of RealtimeMetricsStore based on a Redis backend."""
Expand Down Expand Up @@ -46,6 +50,12 @@ def validate(self) -> None:
if self._histogram_bucket_size <= 0:
raise InvalidConfiguration("histogram bucket size must be at least 1")

def _counter_key_prefix(self) -> str:
return f"{self._prefix}:counter:{self._counter_bucket_size}"

def _histogram_key_prefix(self) -> str:
return f"{self._prefix}:histogram:{self._histogram_bucket_size}"

def increment_project_event_counter(self, project_id: int, timestamp: int) -> None:
"""Increment the event counter for the given project_id.

Expand All @@ -58,7 +68,7 @@ def increment_project_event_counter(self, project_id: int, timestamp: int) -> No
if self._counter_bucket_size > 1:
timestamp -= timestamp % self._counter_bucket_size

key = f"{self._prefix}:counter:{self._counter_bucket_size}:{project_id}:{timestamp}"
key = f"{self._counter_key_prefix()}:{project_id}:{timestamp}"

with self.cluster.pipeline() as pipeline:
pipeline.incr(key)
Expand All @@ -77,23 +87,112 @@ def increment_project_duration_counter(
if self._histogram_bucket_size > 1:
timestamp -= timestamp % self._histogram_bucket_size

key = f"{self._prefix}:histogram:{self._histogram_bucket_size}:{project_id}:{timestamp}"
key = f"{self._histogram_key_prefix()}:{project_id}:{timestamp}"
duration -= duration % 10

with self.cluster.pipeline() as pipeline:
pipeline.hincrby(key, duration, 1)
pipeline.pexpire(key, self._histogram_ttl)
pipeline.execute()

def projects(self) -> Iterable[int]:
"""
Returns IDs of all projects for which metrics have been recorded in the store.

This may throw an exception if there is some sort of issue scanning the redis store for
projects.
"""

already_seen = set()
# Normally if there's a histogram entry for a project then there should be a counter
# entry for it as well, but double check both to be safe
all_keys = chain(
self.cluster.scan_iter(
match=self._counter_key_prefix() + ":*",
),
self.cluster.scan_iter(
match=self._histogram_key_prefix() + ":*",
),
)

for item in all_keys:
# Because this could be one of two patterns, this splits based on the most basic
# delimiter ":" instead of splitting on known prefixes
_prefix, _metric_type, _bucket_size, project_id_raw, _else = item.split(":", maxsplit=4)
project_id = int(project_id_raw)
if project_id not in already_seen:
already_seen.add(project_id)
yield project_id

def get_counts_for_project(self, project_id: int) -> Iterable[base.BucketedCount]:
"""
Returns a sorted list of bucketed timestamps paired with the count of symbolicator requests
made during that time for some given project.

This may throw an exception if there is some sort of issue fetching counts from the redis
store.
"""
key_prefix = f"{self._counter_key_prefix()}:{project_id}:"

keys = sorted(
self.cluster.scan_iter(
match=key_prefix + "*",
)
)
counts = self.cluster.mget(keys)
for key, count_raw in zip(keys, counts):
_, timestamp_raw = key.split(key_prefix)

timestamp_bucket = int(timestamp_raw)
count = int(count_raw)
yield base.BucketedCount(timestamp=timestamp_bucket, count=count)

def get_durations_for_project(self, project_id: int) -> Iterable[base.DurationHistogram]:
"""
Returns a sorted list of bucketed timestamps paired with a histogram-like dictionary of
symbolication durations made during some timestamp for some given project.

For a given `{duration:count}` entry in the dictionary bound to a specific `timestamp`:

- `duration` represents the amount of time it took for a symbolication request to complete.
Durations are bucketed by 10secs, meaning that a `duration` of `30` covers all requests that
took between 30-39 seconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[30, 40) if you want to go mathematical 😉 (which you do below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep! added this in just in case the user's unfamiliar with the syntax: i would imagine trying to look this up would be a pain if all you knew was "square bracket" and "round bracket"


- `count` is the number of symbolication requests that took some amount of time within the
range of `[duration, duration+10)` to complete.

This may throw an exception if there is some sort of issue fetching durations from the redis
store.
"""
key_prefix = f"{self._histogram_key_prefix()}:{project_id}:"
keys = sorted(
self.cluster.scan_iter(
match=key_prefix + "*",
)
)

for key in keys:
_, timestamp_raw = key.split(key_prefix)
timestamp_bucket = int(timestamp_raw)

histogram_raw = self.cluster.hgetall(key)
histogram = base.BucketedDurations(
{int(duration): int(count) for duration, count in histogram_raw.items()}
)
yield base.DurationHistogram(timestamp=timestamp_bucket, histogram=histogram)

def get_lpq_projects(self) -> Set[int]:
"""
Fetches the list of projects that are currently using the low priority queue.

Returns a list of project IDs.

This may throw an exception if there is some sort of issue fetching the list from the redis
store.
"""
return {int(project_id) for project_id in self.cluster.smembers(LPQ_MEMBERS_KEY)}

def add_project_to_lpq(self, project_id: int) -> None:
def add_project_to_lpq(self, project_id: int) -> bool:
"""
Assigns a project to the low priority queue.

Expand All @@ -105,11 +204,25 @@ def add_project_to_lpq(self, project_id: int) -> None:
"""

# This returns 0 if project_id was already in the set, 1 if it was added, and throws an
# exception if there's a problem so it's fine if we just ignore the return value of this as
# the project is always added if this successfully completes.
self.cluster.sadd(LPQ_MEMBERS_KEY, project_id)
# exception if there's a problem. If this successfully completes then the project is
# expected to be in the set.
return int(self.cluster.sadd(LPQ_MEMBERS_KEY, project_id)) > 0

def remove_project_from_lpq(self, project_id: int) -> bool:
"""
Removes a project from the low priority queue.

This restores the specified project back to the regular queue, unless it has been
manually forced into the low priority queue via the `store.symbolicate-event-lpq-always`
kill switch.

This may throw an exception if there is some sort of issue deregistering the projects from
the queue.
"""

return self.remove_projects_from_lpq({project_id}) > 0

def remove_projects_from_lpq(self, project_ids: Set[int]) -> None:
def remove_projects_from_lpq(self, project_ids: Set[int]) -> int:
"""
Removes projects from the low priority queue.

Expand All @@ -119,6 +232,8 @@ def remove_projects_from_lpq(self, project_ids: Set[int]) -> None:
the queue.
"""
if len(project_ids) == 0:
return
return 0

self.cluster.srem(LPQ_MEMBERS_KEY, *project_ids)
# This returns the number of projects removed, and throws an exception if there's a problem.
# If this successfully completes then the projects are expected to no longer be in the set.
return int(self.cluster.srem(LPQ_MEMBERS_KEY, *project_ids))
Loading