Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom threshold support for Nebula #115

Merged
merged 7 commits into from
Jun 20, 2024
Merged

Add custom threshold support for Nebula #115

merged 7 commits into from
Jun 20, 2024

Conversation

DJAndries
Copy link
Collaborator

No description provided.

@DJAndries DJAndries force-pushed the custom-threshold branch 2 times, most recently from 26ae9ca to d4b46eb Compare March 20, 2024 07:22
@DJAndries DJAndries changed the title WIP: Add custom threshold support for Nebula Add custom threshold support for Nebula Mar 20, 2024
@DJAndries DJAndries marked this pull request as ready for review March 20, 2024 19:16
@DJAndries DJAndries requested a review from a team as a code owner March 20, 2024 19:16
Copy link
Contributor

@rillian rillian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. So the idea here is to group submissions by their threshold value (stated or implicit) and decode and aggregate them separately, so the db will have separate entries for each threshold class and we can join them in the analysis dashboard queries?

let record: FutureRecord<str, [u8]> = FutureRecord::to(&self.topic).payload(record);
let mut record: FutureRecord<str, [u8]> = FutureRecord::to(&self.topic).payload(record);
if let Some(threshold) = request_threshold {
let threshold = (threshold as u32).to_le_bytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anything about kafka, but it looks like this field really is just 'bytes' and it's custom serialization up to the application even for a base type like this. Is that right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, rust-rdkafka requires a &[u8] for header values, so we're responsible for serialization.

Comment on lines +121 to +122
if !state.request_threshold_range.contains(&threshold) {
return Err(WebError::BadThreshold);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the server only accepts threshold values within whatever range we set (between 20 and 50 by default) so there's a bound to how many queues it will build. Excellent.

@DJAndries
Copy link
Collaborator Author

DJAndries commented Mar 20, 2024

So the idea here is to group submissions by their threshold value (stated or implicit) and decode and aggregate them separately, so the db will have separate entries for each threshold class and we can join them in the analysis dashboard queries?

yes. pending messages (messages stored in PG from previous aggregation attempts) and new messages (messages that were just received via Kafka during the current iteration) will be grouped by tag, and then by threshold. New pending messages will be stored by tag and threshold.

However, recovered messages (which include the key, metric name and metric value) will continue to be grouped by tag. If there are multiple threshold values for a given tag (i.e. if we switched the Nebula k-threshold at some point), we're covered. It turns out that the key for a given tag remains the same, regardless of the multiple threshold values in the wild.

let msgs = if recovery_threshold == Some(threshold) && key_recovery_msgs.is_some() {
// messages for this threshold were already drained in the key
// recovery step, so use this existing vec
key_recovery_msgs.take().unwrap()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reported by reviewdog 🐶
[semgrep] expect or unwrap called in function returning a Result

Source: https://semgrep.dev/r/trailofbits.rs.panic-in-function-returning-result.panic-in-function-returning-result


Cc @thypon @bcaller

@@ -112,22 +198,15 @@
id: 0,
msg_tag: msg_tag.clone(),
epoch_tag: *epoch as i16,
metric_name: measurement.0,
metric_value: measurement.1,
metric_name: metric_name.unwrap(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reported by reviewdog 🐶
[semgrep] expect or unwrap called in function returning a Result

Source: https://semgrep.dev/r/trailofbits.rs.panic-in-function-returning-result.panic-in-function-returning-result


Cc @thypon @bcaller

metric_name: measurement.0,
metric_value: measurement.1,
metric_name: metric_name.unwrap(),
metric_value: metric_value.unwrap(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reported by reviewdog 🐶
[semgrep] expect or unwrap called in function returning a Result

Source: https://semgrep.dev/r/trailofbits.rs.panic-in-function-returning-result.panic-in-function-returning-result


Cc @thypon @bcaller

let key = if let Some(rec_msg) = existing_rec_msg {
rec_msg.key.clone()
} else {
let threshold = recovery_threshold.unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reported by reviewdog 🐶
[semgrep] expect or unwrap called in function returning a Result

Source: https://semgrep.dev/r/trailofbits.rs.panic-in-function-returning-result.panic-in-function-returning-result


Cc @thypon @bcaller

AppSTARError::Recovery(ConstellationError::ShareRecovery) => {
// Store new messages until we receive more shares in the future.
for msg in msgs.drain(..new_msg_count) {
chunk.new_msgs.get_mut(&threshold).unwrap().push(msg);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reported by reviewdog 🐶
[semgrep] expect or unwrap called in function returning a Result

Source: https://semgrep.dev/r/trailofbits.rs.panic-in-function-returning-result.panic-in-function-returning-result


Cc @thypon @bcaller

@diracdeltas
Copy link
Member

i see this adds new dependencies; is there something in the pre-merge pipeline which runs cargo audit? if not it should be added.

@thypon
Copy link
Member

thypon commented Apr 8, 2024

Is the modified code part of a command or a server? @DJAndries

@DJAndries
Copy link
Collaborator Author

Is the modified code part of a command or a server? @DJAndries

modifications are mostly for the server + server-side aggregator. some changes were made to the test client as well

@rillian
Copy link
Contributor

rillian commented Apr 11, 2024

i see this adds new dependencies; is there something in the pre-merge pipeline which runs cargo audit? if not it should be added.

I filed #133 to address this. An unmaintained warning about the rusoto crates is the only thing reported after running cargo update.

@DJAndries DJAndries force-pushed the custom-threshold branch 2 times, most recently from 6878772 to e81c32d Compare April 11, 2024 18:41
@rillian rillian mentioned this pull request Jun 3, 2024
Copy link

[puLL-Merge] - brave/constellation-processors@115

Description

This PR makes changes to support custom k-threshold values in requests to the encrypted message submission endpoint. The k-threshold value is passed in a request header, and if present, overrides the default threshold. Pending messages are now stored in the database with the threshold value. Message grouping and key recovery is done separately for each threshold group.

Changes

Changes

  • migrations/: Adds a new migration to add a threshold column to the pending_msgs table
  • misc/test-client/:
    • Adds support for submitting requests to a randomness server instead of using a local fetcher
    • Adds a threshold header to requests based on a CLI argument
    • Updates dependencies
  • src/aggregator/:
    • consume.rs: Passes the default k-threshold value to message grouping
    • group.rs:
      • Updates MessageChunk to store pending/new messages in a map keyed by threshold
      • Updates GroupedMessages.add() to use MessageWithThreshold
      • Updates pending message storage to include threshold
    • mod.rs: Renames k-threshold constants
    • processing.rs:
      • Updates layer processing to handle each threshold group separately
      • Removes k-threshold argument since it's no longer a single value
  • src/models/:
    • Adds MessageWithThreshold struct
    • Adds threshold field to PendingMessage
  • src/record_stream.rs:
    • Adds support for threshold header in Kafka records
    • Introduces ConsumedRecord to include the threshold value
  • src/server.rs:
    • Adds support for brave-p3a-constellation-threshold header
    • Validates the threshold header value is within configured range
    • Passes threshold value when producing Kafka record

Security Hotspots

  • The brave-p3a-constellation-threshold header value is user controlled. The value is validated to be within a configured range to mitigate abuse.
  • The randomness server URL is user controlled in the test client. Ensure the randomness server is trusted.

@DJAndries DJAndries merged commit 3283da2 into master Jun 20, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants