Skip to content

Commit

Permalink
quickwit: Fix aggregation query
Browse files Browse the repository at this point in the history
To work on both empty `by` (meaning no group by), and to actually get
the aggregation results other than count (min & max).
  • Loading branch information
tontinton committed Dec 6, 2024
1 parent 912098f commit 3498a15
Showing 1 changed file with 110 additions and 35 deletions.
145 changes: 110 additions & 35 deletions src/quickwit_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,40 @@ struct SearchAggregationBucket {
key: Value,

#[serde(flatten)]
values: HashMap<String, SearchAggregationBuckets>,
buckets_or_value: HashMap<String, SearchAggregationBucketsOrValue>,
}

#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum SearchAggregationBucketsOrValue {
Value(SearchAggregationValue),
Buckets(SearchAggregationBuckets),
}

#[derive(Debug, Deserialize)]
struct SearchAggregationValue {
value: Value,
}

#[derive(Debug, Deserialize)]
struct SearchAggregationBuckets {
buckets: Vec<SearchAggregationBucket>,
}

#[derive(Debug, Deserialize)]
struct SearchAggregationHitsTotal {
value: i64,
}

#[derive(Debug, Deserialize)]
struct SearchAggregationHits {
total: SearchAggregationHitsTotal,
}

#[derive(Debug, Deserialize)]
struct SearchAggregationResponse {
aggregations: HashMap<String, SearchAggregationBuckets>,
hits: SearchAggregationHits,
aggregations: HashMap<String, SearchAggregationBucketsOrValue>,
}

fn default_refresh_interval() -> Duration {
Expand Down Expand Up @@ -396,7 +419,10 @@ async fn count(base_url: &str, index: &str, query: Option<serde_json::Value>) ->
Ok(data.count)
}

#[instrument(skip(query), name = "GET and parse quickwit search aggregation results")]
#[instrument(
skip(query),
name = "GET and parse quickwit search aggregation results"
)]
async fn search_aggregation(
base_url: &str,
index: &str,
Expand All @@ -419,8 +445,8 @@ async fn search_aggregation(
bail!("GET {} failed with status {}", &url, status);
}
}
let text = response.text().await.context("text from response")?;
Ok(serde_json::from_str(&text)?)
let text = response.text().await.context("get text from response")?;
serde_json::from_str(&text).context("parse response")
}

#[instrument(name = "GET and parse quickwit indexes")]
Expand Down Expand Up @@ -545,47 +571,77 @@ impl QuickwitConnector {
})
}

fn parse_last_bucket(
buckets_or_value_wrap: HashMap<String, SearchAggregationBucketsOrValue>,
doc_count: i64,
group_by: &[String],
count_fields: &[String],
keys_stack: &[Value],
logs: &mut Vec<Log>,
) -> Result<()> {
let mut log = Log::new();

for (key, value) in group_by.iter().zip(keys_stack.iter()) {
log.insert(key.clone().into(), value.clone());
}
for key in count_fields {
log.insert(key.clone().into(), Value::Integer(doc_count));
}

for (field, buckets_or_value) in buckets_or_value_wrap {
let SearchAggregationBucketsOrValue::Value(value_wrap) = buckets_or_value else {
bail!("expected value, not bucket");
};
log.insert(field.into(), value_wrap.value);
}

logs.push(log);

Ok(())
}

fn parse_buckets(
mut buckets_wrap: HashMap<String, SearchAggregationBuckets>,
buckets_or_value: SearchAggregationBucketsOrValue,
index: usize,
group_by: &[String],
count_fields: &[String],
keys_stack: &mut Vec<Value>,
logs: &mut Vec<Log>,
) -> Result<()> {
let bucket_name = format!("{}_{}", AGGREGATION_RESULTS_NAME, index);
let Some(buckets) = buckets_wrap.remove(&bucket_name) else {
bail!("Bucket '{bucket_name}' not found");
let SearchAggregationBucketsOrValue::Buckets(buckets_wrap) = buckets_or_value else {
bail!("expected buckets, not value");
};

for bucket in buckets.buckets {
if bucket.values.is_empty() {
keys_stack.push(bucket.key);
for mut bucket in buckets_wrap.buckets {
keys_stack.push(bucket.key);

let mut log = Log::new();
for (key, value) in group_by.iter().zip(keys_stack.iter()) {
log.insert(key.clone().into(), value.clone());
}
for key in count_fields {
log.insert(key.clone().into(), Value::Integer(bucket.doc_count));
}
logs.push(log);

keys_stack.pop();
if keys_stack.len() == group_by.len() {
Self::parse_last_bucket(
bucket.buckets_or_value,
bucket.doc_count,
group_by,
count_fields,
keys_stack,
logs,
)?;
} else {
keys_stack.push(bucket.key);
let bucket_name = format!("{}_{}", AGGREGATION_RESULTS_NAME, index);
let Some(next_buckets_or_value) = bucket.buckets_or_value.remove(&bucket_name)
else {
bail!("bucket '{bucket_name}' not found");
};

Self::parse_buckets(
bucket.values,
next_buckets_or_value,
index + 1,
group_by,
count_fields,
keys_stack,
logs,
)?;

keys_stack.pop();
}

keys_stack.pop();
}

Ok(())
Expand All @@ -598,17 +654,34 @@ impl QuickwitConnector {
group_by: Vec<String>,
count_fields: Vec<String>,
) -> Result<LogTryStream> {
let response = search_aggregation(&url, &index, query).await?;
let mut response = search_aggregation(&url, &index, query)
.await
.context("run quickwit aggregation query")?;

let mut logs = Vec::new();
Self::parse_buckets(
response.aggregations,
0,
&group_by,
&count_fields,
&mut Vec::new(),
&mut logs,
)?;

let first_bucket_name = format!("{}_0", AGGREGATION_RESULTS_NAME);
if let Some(buckets_or_value) = response.aggregations.remove(&first_bucket_name) {
Self::parse_buckets(
buckets_or_value,
1,
&group_by,
&count_fields,
&mut Vec::new(),
&mut logs,
)
.context("parse quickwit aggregation response (group by)")?;
} else {
Self::parse_last_bucket(
response.aggregations,
response.hits.total.value,
&[],
&count_fields,
&[],
&mut logs,
)
.context("parse quickwit aggregation response (no group by)")?;
}

Ok(Box::pin(try_stream! {
if logs.is_empty() {
Expand Down Expand Up @@ -848,6 +921,8 @@ impl Connector for QuickwitConnector {
current_agg = current_agg.get_mut("aggs").unwrap().get_mut(&name).unwrap();
}

current_agg["aggs"] = json!(inner_aggs);

Some(Box::new(handle.with_summarize(
aggs,
config.by.clone(),
Expand Down

0 comments on commit 3498a15

Please sign in to comment.