Skip to content

Commit

Permalink
ref(releasehealth): switch metric querires to manual aggregation [ING…
Browse files Browse the repository at this point in the history
…EST-249] (#29319)

switch metric querires to manual aggregation
  • Loading branch information
RaduW authored Oct 19, 2021
1 parent 3da86f8 commit c8bd11e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 33 deletions.
45 changes: 33 additions & 12 deletions src/sentry/release_health/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _get_crash_free_rate_data(
count_query = Query(
dataset=Dataset.Metrics.value,
match=Entity(EntityKey.MetricsCounters.value),
select=[Column("value")],
select=[Function("sum", [Column("value")], "value")],
where=[
Condition(Column("org_id"), Op.EQ, org_id),
Condition(Column("project_id"), Op.IN, project_ids),
Expand Down Expand Up @@ -322,7 +322,7 @@ def _count_sessions(total: bool, referrer: str) -> Dict[Any, int]:
query = Query(
dataset=Dataset.Metrics.value,
match=Entity(EntityKey.MetricsCounters.value),
select=[Column("value")],
select=[Function("sum", [Column("value")], "value")],
where=_get_common_where(total)
+ [
Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "session")),
Expand All @@ -343,7 +343,7 @@ def _count_users(total: bool, referrer: str) -> Dict[Any, int]:
query = Query(
dataset=Dataset.Metrics.value,
match=Entity(EntityKey.MetricsSets.value),
select=[Column("value")],
select=[Function("uniq", [Column("value")], "value")],
where=_get_common_where(total)
+ [
Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "user")),
Expand Down Expand Up @@ -680,7 +680,14 @@ def _get_session_duration_data_for_overview(
Query(
dataset=Dataset.Metrics.value,
match=Entity(EntityKey.MetricsDistributions.value),
select=aggregates + [Column("percentiles")],
select=aggregates
+ [
Function(
alias="percentiles",
function="quantiles(0.5,0.9)",
parameters=[Column("value")],
)
],
where=where
+ [
Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "session.duration")),
Expand All @@ -698,7 +705,7 @@ def _get_session_duration_data_for_overview(
key = (row["project_id"], reverse_tag_value(org_id, row[release_column_name]))
rv_durations[key] = {
"duration_p50": row["percentiles"][0],
"duration_p90": row["percentiles"][2],
"duration_p90": row["percentiles"][1],
}

return rv_durations
Expand All @@ -722,7 +729,7 @@ def _get_errored_sessions_for_overview(
Query(
dataset=Dataset.Metrics.value,
match=Entity(EntityKey.MetricsSets.value),
select=aggregates + [Column("value")],
select=aggregates + [Function("uniq", [Column("value")], "value")],
where=where
+ [
Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "session.error")),
Expand Down Expand Up @@ -758,7 +765,7 @@ def _get_session_by_status_for_overview(
Query(
dataset=Dataset.Metrics.value,
match=Entity(EntityKey.MetricsCounters.value),
select=aggregates + [Column("value")],
select=aggregates + [Function("sum", [Column("value")], "value")],
where=where
+ [
Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "session")),
Expand Down Expand Up @@ -798,7 +805,7 @@ def _get_users_and_crashed_users_for_overview(
rv_users: Dict[Tuple[int, str, str], int] = {}

# Avoid mutating input parameters here
select = aggregates + [Column("value")]
select = aggregates + [Function("uniq", [Column("value")], "value")]
where = where + [
Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "user")),
Condition(
Expand Down Expand Up @@ -856,13 +863,18 @@ def _get_health_stats_for_overview(
"sessions": EntityKey.MetricsCounters.value,
}[stat]

value_column = {
"users": Function("uniq", [Column("value")], "value"),
"sessions": Function("sum", [Column("value")], "value"),
}[stat]

metric_name = metric_id(org_id, {"sessions": "session", "users": "user"}[stat])

for row in raw_snql_query(
Query(
dataset=Dataset.Metrics.value,
match=Entity(entity),
select=aggregates + [Column("value")],
select=aggregates + [value_column],
where=where
+ [
Condition(Column("metric_id"), Op.EQ, metric_name),
Expand Down Expand Up @@ -1075,11 +1087,20 @@ def _get_data(entity_key: EntityKey, metric_name: str) -> Tuple[int, int]:
Condition(Column("metric_id"), Op.EQ, metric_id),
Condition(Column("timestamp"), Op.LT, end),
]

if entity_key == EntityKey.MetricsCounters:
aggregation_function = "sum"
elif entity_key == EntityKey.MetricsSets:
aggregation_function = "uniq"
else:
raise NotImplementedError(f"No support for entity: {entity_key}")
columns = [Function(aggregation_function, [Column("value")], "value")]

data = raw_snql_query(
Query(
dataset=Dataset.Metrics.value,
match=Entity(entity_key.value),
select=[Column("value")],
select=columns,
where=where,
groupby=[Column(status_key)],
),
Expand Down Expand Up @@ -1296,7 +1317,7 @@ def get_project_releases_count(

# Filter out releases with zero users when sorting by either `users` or `crash_free_users`
if scope in ["users", "crash_free_users"]:
having.append(Condition(Column("value"), Op.GT, 0))
having.append(Condition(Function("uniq", [Column("value")], "value"), Op.GT, 0))
match = Entity(EntityKey.MetricsSets.value)
else:
match = Entity(EntityKey.MetricsCounters.value)
Expand Down Expand Up @@ -1657,7 +1678,7 @@ def get_project_sessions_count(
) -> int:

org_id = self._get_org_id([project_id])
columns = [Column("value")]
columns = [Function("sum", [Column("value")], "value")]

try:
status_key = tag_key(org_id, "session.status")
Expand Down
97 changes: 76 additions & 21 deletions src/sentry/release_health/metrics_sessions_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from snuba_sdk.expressions import Granularity
from snuba_sdk.function import Function
from snuba_sdk.legacy import json_to_snql
from snuba_sdk.query import SelectableExpression
from typing_extensions import Literal, TypedDict

from sentry.release_health.base import (
Expand Down Expand Up @@ -103,7 +104,6 @@ class _SessionStatusValue:

_MetricName = Literal["session", "session.duration", "session.error", "user"]


#: Actual column name in snuba
_SnubaColumnName = Literal["value", "avg", "max", "percentiles"]

Expand All @@ -112,6 +112,54 @@ class _SessionStatusValue:
_VirtualColumnName = Literal["value", "avg", "max", "p50", "p75", "p90", "p95", "p99"]


def _to_column(query_func: SessionsQueryFunction) -> SelectableExpression:
"""
Converts query a function into an expression that can be directly plugged into anywhere
columns are used (like the select argument of a Query)
"""
# distribution columns
if query_func in [
"p50(session.duration)",
"p75(session.duration)",
"p90(session.duration)",
"p95(session.duration)",
"p99(session.duration)",
]:
return Function(
alias="percentiles",
function="quantiles(0.5,0.75,0.9,0.95,0.99)",
parameters=[Column("value")],
)
if query_func == "avg(session.duration)":
return Function(
alias="avg",
function="avg",
parameters=[Column("value")],
)
if query_func == "max(session.duration)":
return Function(
alias="max",
function="max",
parameters=[Column("value")],
)
# counters
if query_func == "sum(session)":
return Function(
alias="sum",
function="sum",
parameters=[Column("value")],
)
# sets
if query_func == "count_unique(user)":
return Function(
alias="count_unique",
function="uniq",
parameters=[Column("value")],
)

raise ValueError("Unmapped metrics column", query_func)


@dataclass(frozen=True)
class _DataPointKey:
"""A key to access a data point in _DataPoints.
Expand Down Expand Up @@ -263,7 +311,7 @@ def _get_snuba_query(
query: QueryDefinition,
entity_key: EntityKey,
metric_id: int,
columns: Sequence[str],
columns: Sequence[SelectableExpression],
series: bool,
extra_conditions: List[Condition],
remove_groupby: Set[Column],
Expand Down Expand Up @@ -298,7 +346,7 @@ def _get_snuba_query(
return Query(
dataset=Dataset.Metrics.value,
match=Entity(entity_key.value),
select=[Column(column) for column in columns],
select=list(columns),
groupby=list(full_groupby),
where=conditions,
granularity=Granularity(query.rollup),
Expand All @@ -311,7 +359,7 @@ def _get_snuba_query_data(
entity_key: EntityKey,
metric_name: _MetricName,
metric_id: int,
columns: Sequence[str],
columns: Sequence[SelectableExpression],
extra_conditions: Optional[List[Condition]] = None,
remove_groupby: Optional[Set[Column]] = None,
) -> Generator[Tuple[_MetricName, _SnubaData], None, None]:
Expand Down Expand Up @@ -361,7 +409,12 @@ def _fetch_data(
if metric_id is not None:
data.extend(
_get_snuba_query_data(
org_id, query, EntityKey.MetricsSets, "user", metric_id, ["value"]
org_id,
query,
EntityKey.MetricsSets,
"user",
metric_id,
[Function("uniq", [Column("value")], "value")],
)
)
metric_to_output_field[("user", "value")] = _UserField()
Expand All @@ -374,15 +427,15 @@ def _fetch_data(
def get_virtual_column(field: SessionsQueryFunction) -> _VirtualColumnName:
return cast(_VirtualColumnName, field[:3])

def get_snuba_column(field: SessionsQueryFunction) -> _SnubaColumnName:
"""Get the actual snuba column needed by this function"""
virtual_column = get_virtual_column(field)
if virtual_column in ("p50", "p75", "p90", "p95", "p99"):
return "percentiles"

return cast(_SnubaColumnName, virtual_column)

snuba_columns = {get_snuba_column(field) for field in duration_fields}
# eliminate duplicate fields (p50...p99) all generate the same field "percentiles"
seen_fields = set()
snuba_columns = []
for field in duration_fields:
column = _to_column(field)
if column.alias not in seen_fields:
# a new field add it to the columns and remember it
snuba_columns.append(column)
seen_fields.add(column.alias)

# sessions_v2 only exposes healthy session's durations
healthy = _resolve("exited")
Expand All @@ -399,7 +452,7 @@ def get_snuba_column(field: SessionsQueryFunction) -> _SnubaColumnName:
EntityKey.MetricsDistributions,
"session.duration",
metric_id,
list(snuba_columns),
snuba_columns,
extra_conditions=extra_conditions,
remove_groupby=remove_groupby,
)
Expand All @@ -420,7 +473,12 @@ def get_snuba_column(field: SessionsQueryFunction) -> _SnubaColumnName:
# 1 session counters
data.extend(
_get_snuba_query_data(
org_id, query, EntityKey.MetricsCounters, "session", metric_id, ["value"]
org_id,
query,
EntityKey.MetricsCounters,
"session",
metric_id,
[Function("sum", [Column("value")], "value")],
)
)

Expand All @@ -435,7 +493,7 @@ def get_snuba_column(field: SessionsQueryFunction) -> _SnubaColumnName:
EntityKey.MetricsSets,
"session.error",
error_metric_id,
["value"],
[Function("uniq", [Column("value")], "value")],
remove_groupby=remove_groupby,
)
)
Expand All @@ -453,7 +511,7 @@ def get_snuba_column(field: SessionsQueryFunction) -> _SnubaColumnName:
EntityKey.MetricsCounters,
"session",
metric_id,
["value"],
[Function("sum", [Column("value")], "value")],
extra_conditions,
)
)
Expand Down Expand Up @@ -615,21 +673,18 @@ def _translate_conditions(org_id: int, input_: Any) -> Any:
return _resolve(input_)

if isinstance(input_, Function):

return Function(
function=input_.function, parameters=_translate_conditions(org_id, input_.parameters)
)

if isinstance(input_, Condition):

return Condition(
lhs=_translate_conditions(org_id, input_.lhs),
op=input_.op,
rhs=_translate_conditions(org_id, input_.rhs),
)

if isinstance(input_, (int, float)):

return input_

assert isinstance(input_, list), input_
Expand Down

0 comments on commit c8bd11e

Please sign in to comment.