Skip to content

Commit

Permalink
workflow: summarize: Validate summarize field types are the same
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Dec 5, 2024
1 parent 94dfe40 commit cd85c62
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions src/workflow/summarize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use futures_util::StreamExt;

use color_eyre::Result;
use color_eyre::{eyre::bail, Result};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tracing::info;
Expand Down Expand Up @@ -150,19 +150,40 @@ pub async fn summarize_stream(config: Summarize, mut input_stream: LogStream) ->

let by: Vec<KeyString> = config.by.into_iter().map(|x| x.into()).collect();

// Will be fixed once vrl is removed (value will be immutable).
// All of HashMap, HashSet, BTreeMap and BtreeSet rely on either the hash or the order of keys
// be unchanging, so having types with interior mutability is a bad idea.
// We don't mutate the key, so we ignore the lint error here.
#[allow(clippy::mutable_key_type)]
let mut group_aggregates: BTreeMap<Vec<Sortable>, Vec<Arc<dyn Aggregate>>> = BTreeMap::new();

// TODO: validate "by" types are all the same.
let mut tracked_types = vec![None; by.len()];

while let Some(log) = input_stream.next().await {
let group_keys: Vec<Sortable> = by
.iter()
.map(|x| log.get(x).unwrap_or_else(|| &Value::Null))
.cloned()
.map(Sortable)
.collect();
let mut group_keys = Vec::with_capacity(by.len());

for (tracked_type, key) in tracked_types.iter_mut().zip(&by) {
let value = log.get(key).unwrap_or_else(|| &Value::Null);
if value == &Value::Null {
group_keys.push(Sortable(value.clone()));
continue;
}

let value_type = std::mem::discriminant(value);
if let Some(t) = tracked_type {
if *t != value_type {
bail!(
"Cannot summarize over differing types (key '{}'): {:?} != {:?}",
key,
*t,
value_type
);
}
} else {
*tracked_type = Some(value_type);
}

group_keys.push(Sortable(value.clone()));
}

let entry = group_aggregates
.entry(group_keys)
Expand Down

0 comments on commit cd85c62

Please sign in to comment.