Skip to content

Commit

Permalink
test(metrics): Write all session metrics in test [INGEST-386] (#28667)
Browse files Browse the repository at this point in the history
Write more session metrics to snuba in metrics test case, mimicking the
extract_session_metrics function in relay.

This prepares for https://getsentry.atlassian.net/browse/INGEST-386.
  • Loading branch information
jjbayer authored Sep 20, 2021
1 parent 5e359d1 commit 575932d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 48 deletions.
1 change: 0 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ files = src/sentry/api/bases/external_actor.py,
src/sentry/utils/dates.py,
src/sentry/utils/jwt.py,
src/sentry/utils/kvstore,
src/sentry/utils/snql.py,
src/sentry/web/decorators.py,
tests/sentry/utils/appleconnect/

Expand Down
76 changes: 49 additions & 27 deletions src/sentry/sentry_metrics/indexer/mock.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,64 @@
from typing import Optional
import itertools
from collections import defaultdict
from typing import DefaultDict, Dict, Optional

from sentry.models import Organization

from .base import StringIndexer, UseCase

_STRINGS = {
"abnormal": 0,
"crashed": 1,
"environment": 2,
"errored": 3,
"healthy": 4,
"production": 5,
"release": 6,
"session.duration": 7,
"session.status": 8,
"session": 9,
"staging": 10,
"user": 11,
"init": 12,
}
_REVERSE = {v: k for k, v in _STRINGS.items()}


class MockIndexer(StringIndexer):
"""
Mock string indexer
"""
_STRINGS = (
"abnormal",
"crashed",
"environment",
"errored",
"healthy",
"production",
"release",
"session.duration",
"session.status",
"session",
"staging",
"user",
"init",
"session.error",
)


class SimpleIndexer(StringIndexer):

"""Simple indexer with in-memory store. Do not use in production."""

def __init__(self) -> None:
self._counter = itertools.count()
self._strings: DefaultDict[str, int] = defaultdict(self._counter.__next__)
self._reverse: Dict[int, str] = {}

def record(self, organization: Organization, use_case: UseCase, string: str) -> int:
"""Mock indexer cannot record."""
raise NotImplementedError()
# NOTE: Ignores ``use_case`` for simplicity.
return self._record(string)

def resolve(self, organization: Organization, use_case: UseCase, string: str) -> Optional[int]:
# NOTE: Ignores ``use_case`` for simplicity.
return _STRINGS.get(string)
return self._strings.get(string)

def reverse_resolve(
self, organization: Organization, use_case: UseCase, id: int
) -> Optional[str]:
# NOTE: Ignores ``use_case`` for simplicity.
return _REVERSE.get(id)
return self._reverse.get(id)

def _record(self, string: str) -> int:
index = self._strings[string]
self._reverse[index] = string
return index


class MockIndexer(SimpleIndexer):
"""
Mock string indexer. Comes with a prepared set of strings.
"""

def __init__(self) -> None:
super().__init__()
for string in _STRINGS:
self._record(string)
70 changes: 50 additions & 20 deletions src/sentry/testutils/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"SCIMAzureTestCase",
)

import hashlib
import inspect
import os
import os.path
Expand Down Expand Up @@ -941,56 +942,85 @@ class SessionMetricsTestCase(SnubaTestCase):
# NOTE: This endpoint does not exist yet, but we need something alike
# because /tests/<dataset>/insert always writes to the default entity
# (in the case of metrics, that's "metrics_sets")
snuba_endpoint = "/tests/entities/metrics_counters/insert"
snuba_endpoint = "/tests/entities/{entity}/insert"

def store_session(self, session):
"""Mimic relays behavior of always emitting a metric for a started session,
and emitting an additional one if the session is fatal
https://github.com/getsentry/relay/blob/e3c064e213281c36bde5d2b6f3032c6d36e22520/relay-server/src/actors/envelopes.rs#L357
"""
user = session["distinct_id"]

self._push_metric(session, "counter", "session", {"session.status": "init"}, +1)
self._push_metric(session, "set", "user", {"session.status": "init"}, user)

status = session["status"]

if status in ("abnormal", "crashed"):

self._push_metric(session, "counter", "session", {"session.status": status}, +1)
self._push_metric(session, "set", "user", {"session.status": status}, user)

# Mark the session as errored, which includes fatal sessions.
if session.get("errors", 0) > 0 or status not in ("ok", "exited"):

self._push_metric(session, "set", "session.error", {}, session["session_id"])
self._push_metric(session, "set", "user", {"session.status": status}, user)

if status != "ok": # terminal
self._push_metric(session, "distribution", "session.duration", {}, session["duration"])

@classmethod
def _push_metric(cls, session, type, name, tags, value):
def metric_id(name):
res = indexer.resolve(session["org_id"], UseCase.METRIC, name)
res = indexer.record(session["org_id"], UseCase.METRIC, name)
assert res is not None, name
return res

def tag_key(name):
res = indexer.resolve(session["org_id"], UseCase.TAG_KEY, name)
res = indexer.record(session["org_id"], UseCase.TAG_KEY, name)
assert res is not None, name
return res

def tag_value(name):
res = indexer.resolve(session["org_id"], UseCase.TAG_KEY, name)
res = indexer.record(session["org_id"], UseCase.TAG_KEY, name)
assert res is not None, name
return res

base_tags = {
tag_key(tag): tag_value(session[tag])
for tag in (
"release",
"environment",
)
}

extra_tags = {tag_key(k): tag_value(v) for k, v in tags.items()}

if type == "set":
# Relay uses a different hashing algorithm, but that's ok
value = [int.from_bytes(hashlib.md5(value.encode()).digest()[:8], "big")]
elif type == "distribution":
value = [value]

msg = {
"org_id": session["org_id"],
"project_id": session["project_id"],
"metric_id": metric_id("session"),
"metric_id": metric_id(name),
"timestamp": session["started"],
"tags": {tag_key("session.status"): tag_value("init")},
"type": "c",
"value": 1.0,
"tags": {**base_tags, **extra_tags},
"type": {"counter": "c", "set": "s", "distribution": "d"}[type],
"value": value,
"retention_days": 90,
}

self._send(msg)

status = session["status"]

if status in ("abnormal", "crashed"):
# Count as fatal
msg["tags"][tag_key("session.status")] = tag_value(status)
self._send(msg)

# TODO: emit metric "session.error" of type "set"
cls._send(msg, entity=f"metrics_{type}s")

@classmethod
def _send(cls, msg):
def _send(cls, msg, entity):
assert (
requests.post(
settings.SENTRY_SNUBA + cls.snuba_endpoint,
settings.SENTRY_SNUBA + cls.snuba_endpoint.format(entity=entity),
data=json.dumps([msg]),
).status_code
== 200
Expand Down

0 comments on commit 575932d

Please sign in to comment.