Skip to content

Commit

Permalink
feat(feedback): produce to ingest-feedback-events topic (#3344)
Browse files Browse the repository at this point in the history
Option PR:
depends on getsentry/sentry#67839 (MERGED)

Feature flag PRs (outdated/unused):
~getsentry/getsentry#13426 (REVERTED)~
~getsentry/sentry#67747 (REVERTED)~
~getsentry/sentry-options-automator#973
(CLOSED)~

Relates to getsentry/sentry#66100
  • Loading branch information
aliu39 authored Apr 9, 2024
1 parent 9856073 commit 01311a4
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Drop `event_id` and `remote_addr` from all outcomes. ([#3319](https://github.com/getsentry/relay/pull/3319))
- Support for AI token metrics ([#3250](https://github.com/getsentry/relay/pull/3250))
- Accept integers in `event.user.username`. ([#3328](https://github.com/getsentry/relay/pull/3328))
- Produce user feedback to ingest-feedback-events topic, with rollout rate. ([#3344](https://github.com/getsentry/relay/pull/3344))
- Extract `cache.item_size` and `cache.hit` data into span indexed ([#3367]https://github.com/getsentry/relay/pull/3367)
- Allow IP addresses in metrics domain tag. ([#3365](https://github.com/getsentry/relay/pull/3365))
- Support the full unicode character set via UTF-8 encoding for metric tags submitted via the statsd format. Certain restricted characters require escape sequences, see [docs](https://develop.sentry.dev/sdk/metrics/#normalization) for the precise rules. ([#3358](https://github.com/getsentry/relay/pull/3358))
Expand Down
11 changes: 11 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ pub struct Options {
)]
pub metric_stats_rollout_rate: f32,

/// Rollout rate for producing to the ingest-feedback-events topic.
///
/// Rate needs to be between `0.0` and `1.0`.
/// If set to `1.0` all organizations will ingest to the feedback topic.
#[serde(
rename = "feedback.ingest-topic.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub feedback_ingest_topic_rollout_rate: f32,

/// Overall sampling of span extraction.
///
/// This number represents the fraction of transactions for which
Expand Down
10 changes: 9 additions & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ pub enum KafkaTopic {
MetricsSummaries,
/// COGS measurements topic.
Cogs,
/// Feedback events topic.
Feedback,
}

impl KafkaTopic {
/// Returns iterator over the variants of [`KafkaTopic`].
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 14] = [
static TOPICS: [KafkaTopic; 15] = [
Events,
Attachments,
Transactions,
Expand All @@ -75,6 +77,7 @@ impl KafkaTopic {
Spans,
MetricsSummaries,
Cogs,
Feedback,
];
TOPICS.iter()
}
Expand Down Expand Up @@ -124,6 +127,9 @@ pub struct TopicAssignments {
/// COGS measurements.
#[serde(alias = "shared-resources-usage")]
pub cogs: TopicAssignment,
/// Feedback events topic name.
#[serde(alias = "ingest-feedback-events")]
pub feedback: TopicAssignment,
}

impl TopicAssignments {
Expand All @@ -145,6 +151,7 @@ impl TopicAssignments {
KafkaTopic::Spans => &self.spans,
KafkaTopic::MetricsSummaries => &self.metrics_summaries,
KafkaTopic::Cogs => &self.cogs,
KafkaTopic::Feedback => &self.feedback,
}
}
}
Expand All @@ -166,6 +173,7 @@ impl Default for TopicAssignments {
spans: "snuba-spans".to_owned().into(),
metrics_summaries: "snuba-metrics-summaries".to_owned().into(),
cogs: "shared-resources-usage".to_owned().into(),
feedback: "ingest-feedback-events".to_owned().into(),
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::RelayCounters;
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope};
use crate::utils::{
self, is_rolled_out, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope,
};

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
Expand Down Expand Up @@ -184,6 +186,7 @@ impl StoreService {
) -> Result<(), StoreError> {
let retention = envelope.retention();
let event_id = envelope.event_id();

let event_item = envelope.as_mut().take_item_by(|item| {
matches!(
item.ty(),
Expand All @@ -199,6 +202,17 @@ impl StoreService {
KafkaTopic::Attachments
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
KafkaTopic::Transactions
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::UserReportV2) {
let feedback_ingest_topic_rollout_rate = self
.global_config
.current()
.options
.feedback_ingest_topic_rollout_rate;
if is_rolled_out(scoping.organization_id, feedback_ingest_topic_rollout_rate) {
KafkaTopic::Feedback
} else {
KafkaTopic::Events
}
} else {
KafkaTopic::Events
};
Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
profiles_consumer,
metrics_summaries_consumer,
cogs_consumer,
feedback_consumer,
)


Expand Down
4 changes: 4 additions & 0 deletions tests/integration/fixtures/mini_sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def add_full_project_config(self, project_id, dsn_public_key=None, extra=None):
self.project_configs[project_id] = ret_val
return ret_val

def set_global_config_option(self, option_name, value):
# must be called before initializing relay fixture
self.global_config["options"][option_name] = value


def _get_project_id(public_key, project_configs):
for project_id, project_config in project_configs.items():
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def inner(options=None):
"profiles": get_topic_name("profiles"),
"metrics_summaries": get_topic_name("metrics_summaries"),
"cogs": get_topic_name("cogs"),
"feedback": get_topic_name("feedback"),
}

if not processing.get("redis"):
Expand Down Expand Up @@ -303,6 +304,16 @@ def replay_events_consumer(kafka_consumer):
)


@pytest.fixture
def feedback_consumer(kafka_consumer):
return lambda timeout=None: FeedbackConsumer(
timeout=timeout,
*kafka_consumer(
"feedback"
), # Corresponds to key in processing_config["processing"]["topics"]
)


@pytest.fixture
def monitors_consumer(kafka_consumer):
return lambda timeout=None: MonitorsConsumer(
Expand Down Expand Up @@ -449,6 +460,16 @@ def get_replay_event(self):
return payload, event


class FeedbackConsumer(ConsumerBase):
def get_event(self, timeout=None):
message = self.poll(timeout)
assert message is not None
assert message.error() is None

message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False)
return json.loads(message_dict["payload"].decode("utf8")), message_dict


class MonitorsConsumer(ConsumerBase):
def get_check_in(self):
message = self.poll()
Expand Down
41 changes: 30 additions & 11 deletions tests/integration/test_feedback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
import json


Expand Down Expand Up @@ -42,24 +43,35 @@ def generate_feedback_sdk_event():
}


@pytest.mark.parametrize("use_feedback_topic", (False, True))
def test_feedback_event_with_processing(
mini_sentry, relay_with_processing, events_consumer
mini_sentry,
relay_with_processing,
events_consumer,
feedback_consumer,
use_feedback_topic,
):
relay = relay_with_processing()
mini_sentry.add_basic_project_config(
42, extra={"config": {"features": ["organizations:user-feedback-ingest"]}}
)

_events_consumer = events_consumer(timeout=20)
feedback = generate_feedback_sdk_event()
if use_feedback_topic:
mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 1.0)
consumer = feedback_consumer(timeout=20)
other_consumer = events_consumer(timeout=20)
else:
mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 0.0)
consumer = events_consumer(timeout=20)
other_consumer = feedback_consumer(timeout=20)

feedback = generate_feedback_sdk_event()
relay = relay_with_processing()
relay.send_user_feedback(42, feedback)

replay_event, replay_event_message = _events_consumer.get_event()
assert replay_event["type"] == "feedback"
# assert replay_event_message["retention_days"] == 90
event, message = consumer.get_event()
assert event["type"] == "feedback"

parsed_feedback = json.loads(bytes(replay_event_message["payload"]))
parsed_feedback = json.loads(message["payload"])
# Assert required fields were returned.
assert parsed_feedback["event_id"]
assert parsed_feedback["type"] == feedback["type"]
Expand Down Expand Up @@ -101,18 +113,25 @@ def test_feedback_event_with_processing(
},
}

# test message wasn't dup'd to the wrong topic
other_consumer.assert_empty()

def test_feedback_events_without_processing(mini_sentry, relay_chain):
relay = relay_chain(min_relay_version="latest")

@pytest.mark.parametrize("use_feedback_topic", (False, True))
def test_feedback_events_without_processing(
mini_sentry, relay_chain, use_feedback_topic
):
project_id = 42
mini_sentry.add_basic_project_config(
project_id,
extra={"config": {"features": ["organizations:user-feedback-ingest"]}},
)
mini_sentry.set_global_config_option(
"feedback.ingest-topic.rollout-rate", 1.0 if use_feedback_topic else 0.0
)

replay_item = generate_feedback_sdk_event()

relay = relay_chain(min_relay_version="latest")
relay.send_user_feedback(42, replay_item)

envelope = mini_sentry.captured_events.get(timeout=20)
Expand Down

0 comments on commit 01311a4

Please sign in to comment.