Skip to content

Commit

Permalink
make redis mock indexer separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
MeredithAnya committed Sep 20, 2021
1 parent 7d8cfce commit 23021e9
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 12 deletions.
17 changes: 8 additions & 9 deletions src/sentry/sentry_metrics/indexer/indexer_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ def get_metrics_consumer(topic=None, **options) -> None:

class MetricsIndexerWorker(AbstractBatchWorker):
def process_message(self, message):
original: Dict[str, Any] = json.loads(message.value())
new_message = original.copy()
parsed_message: Dict[str, Any] = json.loads(message.value(), use_rapid_json=True)

org_id = int(new_message["org_id"])
metric_name = new_message["name"]
tags = new_message["tags"]
org_id = int(parsed_message["org_id"])
metric_name = parsed_message["name"]
tags = parsed_message["tags"]

strings = {metric_name}
strings.update(tags.keys())
Expand All @@ -41,10 +40,10 @@ def process_message(self, message):
new_v = mapping[tag_v]
new_tags[new_k] = new_v

new_message["tags"] = new_tags
new_message["metric_id"] = mapping[metric_name]
new_message["retention_days"] = 90
return new_message
parsed_message["tags"] = new_tags
parsed_message["metric_id"] = mapping[metric_name]
parsed_message["retention_days"] = 90
return parsed_message

def flush_batch(self, batch):
# produce the translated message to snuba-metrics topic
Expand Down
4 changes: 1 addition & 3 deletions src/sentry/sentry_metrics/indexer/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
from collections import defaultdict
from typing import DefaultDict, Dict, Optional

from django.conf import settings

from sentry.utils.redis import redis_clusters
from sentry.models import Organization

from .base import StringIndexer, UseCase

Expand Down
154 changes: 154 additions & 0 deletions src/sentry/sentry_metrics/indexer/redis_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import hashlib
from typing import Dict, List, Optional

from django.conf import settings

from sentry.utils.redis import redis_clusters

from .base import StringIndexer, UseCase


def get_client():
return redis_clusters.get(settings.SENTRY_METRICS_INDEXER_REDIS_CLUSTER)


class RedisMockIndexer(StringIndexer):
"""
Temporary mock string indexer that uses Redis to store data.
"""

def _get_key(self, org_id, instance) -> str:
if isinstance(instance, str):
return f"temp-metrics-indexer:{org_id}:1:str:{instance}"
elif isinstance(instance, int):
return f"temp-metrics-indexer:{org_id}:1:int:{instance}"
else:
raise Exception("Invalid: must be string or int")

def _bulk_record(self, org_id: str, mapping: Dict[str, None]) -> Dict[str, int]:
"""
Take a mapping of strings {"metric_id`": None} and populate the ints
for the corresponding strings.
This includes save the following key value pairs in Redis
"temp-metrics-indexer:{org_id}:1:str:{instance}" -> int
"temp-metrics-indexer:{org_id}:1:int:{instance}" -> string
"""

redis_key_values = {}

for string in mapping.keys():
# use hashlib instead of hash() because the latter uses a random value (unless PYTHONHASHSEED
# is set to an integer) to seed hashes of strs and bytes
# https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED
int_value = int.from_bytes(hashlib.md5(string.encode("utf-8")).digest(), "big") % (
10 ** 8
)
mapping[string] = int_value

int_key = self._get_key(org_id, int_value)
string_key = self._get_key(org_id, string)

redis_key_values[string_key] = int_value
redis_key_values[int_key] = string

get_client().mset(redis_key_values)

return mapping

def bulk_record(self, org_id: str, strings: List[str]) -> Dict[str, int]:
"""
Takes a list of strings that could be a metric names, tag keys or values
and returns a string -> int mapping.
1. Given a list of strings:
['release', 'production', 'environment', 'measurement.fp', '1.8.10']
2. We look up in Redis to see what int values we already have:
['1025825', '58876432', '98539986', None, '46186005']
3. Separate results into resolved and unresolved dictionaries:
{
"release": 1025825,
"production": 8876432,
"environment": 98539986,
"1.8.10": 6186005,
}
{ "measurement.fp": None }
4. If no unresolved, then return our resolved dict, otherwise
call self._bulk_record() on the unresolved dict, and updated
the resolved dictionary
{
"release": 1025825,
"production": 8876432,
"environment": 98539986,
"1.8.10": 6186005,
"measurement.fp": 83361614
}
"""
client = get_client()
string_keys = [self._get_key(org_id, s) for s in strings]
results = client.mget(string_keys)

resolved = {}
unresolved = {}
for i, result in enumerate(results):
if result:
resolved[strings[i]] = int(result)
else:
unresolved[strings[i]] = None

if len(unresolved.keys()) == 0:
return resolved

newly_resolved = self._bulk_record(org_id, unresolved)
resolved.update(newly_resolved)
return resolved

def record(self, org_id: str, string: str) -> int:
"""
If key already exists, grab that value, otherwise record both the
string to int and int to string relationships.
"""
client = get_client()

string_key = f"temp-metrics-indexer:{org_id}:1:str:{string}"
value = client.get(string_key)
if value is None:
value: int = abs(hash(string)) % (10 ** 8)
client.set(string_key, value)

# reverse record (int to string)
int_key = f"temp-metrics-indexer:{org_id}:1:int:{value}"
client.set(int_key, string)

return int(value)

def resolve(self, org_id: str, use_case: UseCase, string: str) -> Optional[int]:
client = get_client()
key = f"temp-metrics-indexer:{org_id}:1:str:{string}"

try:
return int(client.get(key))
except TypeError:
return None

def reverse_resolve(self, org_id: str, use_case: UseCase, id: int) -> Optional[str]:
# NOTE: Ignores ``use_case`` for simplicity.

client = get_client()
key = f"temp-metrics-indexer:{org_id}:1:int:{id}"

return client.get(key)

def delete_records(self):
"""
Easy way to delete all the data for the temporary indexer.
"""
client = get_client()
keys = list(client.scan_iter(match="temp-metrics-indexer*"))
client.delete(*keys)

0 comments on commit 23021e9

Please sign in to comment.