Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(post-process-forwarder) Allow different types of post process forwarders #29225

Merged
merged 8 commits into from
Oct 15, 2021
1 change: 1 addition & 0 deletions src/sentry/eventstream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def requires_post_process_forwarder(self):

def run_post_process_forwarder(
self,
entity,
consumer_group,
commit_log_topic,
synchronize_commit_group,
Expand Down
19 changes: 18 additions & 1 deletion src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from sentry.eventstream.kafka.consumer import SynchronizedConsumer
from sentry.eventstream.kafka.postprocessworker import (
_CONCURRENCY_OPTION,
ErrorsPostProcessForwarderWorker,
PostProcessForwarderType,
PostProcessForwarderWorker,
TransactionsPostProcessForwarderWorker,
_sampled_eventstream_timer,
)
from sentry.eventstream.kafka.protocol import (
Expand Down Expand Up @@ -151,6 +154,7 @@ def requires_post_process_forwarder(self):

def _build_consumer(
self,
entity,
consumer_group,
commit_log_topic,
synchronize_commit_group,
Expand All @@ -169,7 +173,16 @@ def _build_consumer(
)

concurrency = options.get(_CONCURRENCY_OPTION)
worker = PostProcessForwarderWorker(concurrency=concurrency)
logger.info(f"Starting post process forwrader to consume {entity} messages")
if entity == PostProcessForwarderType.TRANSACTIONS:
worker = TransactionsPostProcessForwarderWorker(concurrency=concurrency)
elif entity == PostProcessForwarderType.ERRORS:
worker = ErrorsPostProcessForwarderWorker(concurrency=concurrency)
else:
# Default implementation which processes both errors and transactions
# irrespective of values in the header. This would most likely be the case
# for development environments.
worker = PostProcessForwarderWorker(concurrency=concurrency)

consumer = BatchingKafkaConsumer(
topics=self.topic,
Expand All @@ -183,6 +196,7 @@ def _build_consumer(

def run_batched_consumer(
self,
entity,
consumer_group,
commit_log_topic,
synchronize_commit_group,
Expand All @@ -191,6 +205,7 @@ def run_batched_consumer(
initial_offset_reset="latest",
):
consumer = self._build_consumer(
entity,
consumer_group,
commit_log_topic,
synchronize_commit_group,
Expand Down Expand Up @@ -404,6 +419,7 @@ def _get_task_kwargs_and_dispatch(self, message) -> None:

def run_post_process_forwarder(
self,
entity,
consumer_group,
commit_log_topic,
synchronize_commit_group,
Expand All @@ -416,6 +432,7 @@ def run_post_process_forwarder(
if settings.SENTRY_POST_PROCESS_FORWARDER_BATCHING:
logger.info("Starting batching consumer")
self.run_batched_consumer(
entity,
consumer_group,
commit_log_topic,
synchronize_commit_group,
Expand Down
7 changes: 7 additions & 0 deletions src/sentry/eventstream/kafka/postprocessworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import random
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from enum import Enum
from typing import Any, Generator, Mapping, Optional, Sequence

from sentry import options
Expand All @@ -25,6 +26,12 @@
_TRANSACTION_FORWARDER_HEADER = "transaction_forwarder"


class PostProcessForwarderType(str, Enum):
ERRORS = "errors"
TRANSACTIONS = "transactions"
ALL = "all"


@contextmanager
def _sampled_eventstream_timer(instance: str) -> Generator[None, None, None]:
record_metric = random.random() < 0.1
Expand Down
1 change: 1 addition & 0 deletions src/sentry/runner/commands/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"sentry",
"run",
"post-process-forwarder",
"--entity=all",
"--loglevel=debug",
"--commit-batch-size=1",
],
Expand Down
7 changes: 7 additions & 0 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ def cron(**options):
type=click.Choice(["earliest", "latest"]),
help="Position in the commit log topic to begin reading from when no prior offset has been recorded.",
)
@click.option(
"--entity",
default="all",
type=click.Choice(["all", "errors", "transactions"]),
help="The type of entity to process (all, errors, transactions).",
)
@log_options()
@configuration
def post_process_forwarder(**options):
Expand All @@ -332,6 +338,7 @@ def post_process_forwarder(**options):

try:
eventstream.run_post_process_forwarder(
entity=options["entity"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would need to cast this to the enum type to avoid future typing errors I think. I can't remember if mypy is smart about Enums.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a ticket to add mypy typing to the whole post process forwarder. I will handle it at that time.

consumer_group=options["consumer_group"],
commit_log_topic=options["commit_log_topic"],
synchronize_commit_group=options["synchronize_commit_group"],
Expand Down
1 change: 1 addition & 0 deletions tests/sentry/eventstream/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ def test_post_process_forwarder_batch_consumer(self, dispatch_post_process_group

eventstream = KafkaEventStream()
consumer = eventstream._build_consumer(
entity="all",
consumer_group=consumer_group,
commit_log_topic=self.commit_log_topic,
synchronize_commit_group=synchronize_commit_group,
Expand Down