diff --git a/src/quickwit_connector.rs b/src/quickwit_connector.rs index 66d610e..7bdc5ec 100644 --- a/src/quickwit_connector.rs +++ b/src/quickwit_connector.rs @@ -1,4 +1,9 @@ -use std::{any::Any, collections::BTreeMap, sync::Arc, time::Duration}; +use std::{ + any::Any, + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Duration, +}; use async_stream::try_stream; use axum::async_trait; @@ -13,14 +18,25 @@ use tokio::{ time::sleep, }; use tracing::{debug, error, info, instrument}; +use vrl::core::Value; use crate::{ connector::{Connector, QueryHandle, QueryResponse, Split}, downcast_unwrap, - log::Log, - workflow::{filter::FilterAst, sort::Sort}, + log::{Log, LogTryStream}, + workflow::{ + filter::FilterAst, + sort::Sort, + summarize::{Aggregation, Summarize}, + }, }; +static AGGREGATION_RESULTS_NAME: &str = "summarize"; + +/// Quickwit doesn't yet support pagination over aggregation queries. +/// This will be the max amount of groups we pull from it (taken from quickwit's code). +const MAX_NUM_GROUPS: usize = 65000; + #[derive(Debug, Serialize, Deserialize)] pub struct QuickwitSplit {} @@ -35,6 +51,9 @@ impl Split for QuickwitSplit { struct QuickwitHandle { queries: Vec, sorts: Option, + aggs: Option, + group_by: Vec, + count_fields: Vec, limit: Option, count: bool, } @@ -71,6 +90,19 @@ impl QuickwitHandle { handle.count = true; handle } + + fn with_summarize( + &self, + aggs: serde_json::Value, + group_by: Vec, + count_fields: Vec, + ) -> QuickwitHandle { + let mut handle = self.clone(); + handle.aggs = Some(aggs); + handle.group_by = group_by; + handle.count_fields = count_fields; + handle + } } #[derive(Debug, Deserialize)] @@ -112,6 +144,25 @@ struct CountResponse { count: u64, } +#[derive(Debug, Deserialize)] +struct SearchAggregationBucket { + doc_count: i64, + key: Value, + + #[serde(flatten)] + values: HashMap, +} + +#[derive(Debug, Deserialize)] +struct SearchAggregationBuckets { + buckets: Vec, +} + +#[derive(Debug, Deserialize)] +struct SearchAggregationResponse { + aggregations: HashMap, +} + fn default_refresh_interval() -> Duration { humantime::parse_duration("1m").expect("Invalid duration format") } @@ -345,6 +396,33 @@ async fn count(base_url: &str, index: &str, query: Option) -> Ok(data.count) } +#[instrument(name = "GET and parse quickwit search aggregation results")] +async fn search_aggregation( + base_url: &str, + index: &str, + query: Option, +) -> Result { + let url = format!("{}/api/v1/_elastic/{}/_search", base_url, index,); + let client = Client::new(); + + let mut req = client.get(&url); + if let Some(query) = query { + req = req.json(&query); + } + + let response = req.send().await.context("http request")?; + let status = response.status(); + if !status.is_success() { + if let Ok(text) = response.text().await { + bail!("GET {} failed with status {}: {}", &url, status, text); + } else { + bail!("GET {} failed with status {}", &url, status); + } + } + let text = response.text().await.context("text from response")?; + Ok(serde_json::from_str(&text)?) +} + #[instrument(name = "GET and parse quickwit indexes")] async fn get_indexes(base_url: &str) -> Result> { let url = format!("{}/api/v1/indexes", base_url); @@ -411,6 +489,137 @@ impl QuickwitConnector { shutdown_tx, } } + + fn query_search( + url: String, + index: String, + query: Option, + scroll_timeout: Duration, + scroll_size: u16, + limit: Option, + ) -> LogTryStream { + Box::pin(try_stream! { + if let Some(limit) = limit { + if limit == 0 { + return; + } + } + + let mut streamed = 0; + let (mut logs, mut scroll_id) = begin_search( + &url, + &index, + query, + &scroll_timeout, + scroll_size + ).await?; + + if logs.is_empty() { + return; + } + for log in logs { + yield log; + streamed += 1; + if let Some(limit) = limit { + if streamed >= limit { + return; + } + } + } + + loop { + (logs, scroll_id) = continue_search(&url, scroll_id, &scroll_timeout).await?; + if logs.is_empty() { + return; + } + for log in logs { + yield log; + streamed += 1; + if let Some(limit) = limit { + if streamed >= limit { + return; + } + } + } + } + }) + } + + fn parse_buckets( + mut buckets_wrap: HashMap, + index: usize, + group_by: &[String], + count_fields: &[String], + keys_stack: &mut Vec, + logs: &mut Vec, + ) -> Result<()> { + let bucket_name = format!("{}_{}", AGGREGATION_RESULTS_NAME, index); + let Some(buckets) = buckets_wrap.remove(&bucket_name) else { + bail!("Bucket '{bucket_name}' not found"); + }; + + for bucket in buckets.buckets { + if bucket.values.is_empty() { + keys_stack.push(bucket.key); + + let mut log = Log::new(); + for (key, value) in group_by.iter().zip(keys_stack.iter()) { + log.insert(key.clone().into(), value.clone()); + } + for key in count_fields { + log.insert(key.clone().into(), Value::Integer(bucket.doc_count)); + } + logs.push(log); + + keys_stack.pop(); + } else { + keys_stack.push(bucket.key); + + Self::parse_buckets( + bucket.values, + index + 1, + group_by, + count_fields, + keys_stack, + logs, + )?; + + keys_stack.pop(); + } + } + + Ok(()) + } + + async fn query_aggregation( + url: String, + index: String, + query: Option, + group_by: Vec, + count_fields: Vec, + ) -> Result { + let response = search_aggregation(&url, &index, query).await?; + + let mut logs = Vec::new(); + Self::parse_buckets( + response.aggregations, + 0, + &group_by, + &count_fields, + &mut Vec::new(), + &mut logs, + )?; + + Ok(Box::pin(try_stream! { + if logs.is_empty() { + return; + } + + for log in logs { + yield log; + } + })) + } } #[async_trait] @@ -468,6 +677,19 @@ impl Connector for QuickwitConnector { query_map.insert("sort", sorts.clone()); } + let is_aggregation_query = if let Some(aggs) = &handle.aggs { + query_map.insert("size", json!(0)); + for (key, value) in aggs.as_object().unwrap() { + query_map.insert(key, value.clone()); + } + true + } else { + if let Some(limit) = limit { + query_map.insert("size", limit.into()); + } + false + }; + let query = if !query_map.is_empty() { Some(json!(query_map)) } else { @@ -491,51 +713,27 @@ impl Connector for QuickwitConnector { return Ok(QueryResponse::Count(result)); } - Ok(QueryResponse::Logs(Box::pin(try_stream! { - if let Some(limit) = limit { - if limit == 0 { - return; - } - } - - let mut streamed = 0; - let (mut logs, mut scroll_id) = begin_search( - &url, - &collection, - query, - &scroll_timeout, - scroll_size - ).await?; - - if logs.is_empty() { - return; - } - for log in logs { - yield log; - streamed += 1; - if let Some(limit) = limit { - if streamed >= limit { - return; - } - } - } + if is_aggregation_query { + return Ok(QueryResponse::Logs( + Self::query_aggregation( + url, + collection, + query, + handle.group_by.clone(), + handle.count_fields.clone(), + ) + .await?, + )); + } - loop { - (logs, scroll_id) = continue_search(&url, scroll_id, &scroll_timeout).await?; - if logs.is_empty() { - return; - } - for log in logs { - yield log; - streamed += 1; - if let Some(limit) = limit { - if streamed >= limit { - return; - } - } - } - } - }))) + Ok(QueryResponse::Logs(Self::query_search( + url, + collection, + query, + scroll_timeout, + scroll_size, + limit, + ))) } fn apply_filter( @@ -588,9 +786,75 @@ impl Connector for QuickwitConnector { fn apply_count(&self, handle: &dyn QueryHandle) -> Option> { let handle = downcast_unwrap!(handle, QuickwitHandle); + if handle.aggs.is_some() { + // Quickwit count query returns number of items instead of number of unique groups. + // This is fine, as usually aggregation requests return few results, we can count + // them ourselves. + return None; + } Some(Box::new(handle.with_count())) } + fn apply_summarize( + &self, + config: &Summarize, + handle: &dyn QueryHandle, + ) -> Option> { + let handle = downcast_unwrap!(handle, QuickwitHandle); + + let mut count_fields = Vec::new(); + let mut inner_aggs = BTreeMap::new(); + + for (output_field, agg) in &config.aggs { + let value = match agg { + Aggregation::Min(agg_field) => { + json!({ + "min": { + "field": agg_field, + } + }) + } + Aggregation::Max(agg_field) => { + json!({ + "max": { + "field": agg_field, + } + }) + } + Aggregation::Count => { + // Count is always returned in doc_count. + count_fields.push(output_field.clone()); + continue; + } + }; + + inner_aggs.insert(output_field, value); + } + + let mut aggs = json!({}); + + let mut current_agg = &mut aggs; + for (i, field) in config.by.iter().enumerate() { + let name = format!("{}_{}", AGGREGATION_RESULTS_NAME, i); + let nested_agg = json!({ + &name: { + "terms": { + "field": field, + "size": MAX_NUM_GROUPS, + } + } + }); + current_agg["aggs"] = nested_agg; + current_agg = current_agg.get_mut("aggs").unwrap().get_mut(&name).unwrap(); + } + + Some(Box::new(handle.with_summarize( + aggs, + config.by.clone(), + count_fields, + ))) + } + async fn close(self) { if let Err(e) = self.shutdown_tx.send(()) { error!("Failed to send shutdown to quickwit interval task: {}", e); diff --git a/src/workflow/summarize.rs b/src/workflow/summarize.rs index 9b5bd39..5adbf6a 100644 --- a/src/workflow/summarize.rs +++ b/src/workflow/summarize.rs @@ -29,8 +29,8 @@ pub enum Aggregation { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Summarize { - aggs: HashMap, - by: Vec, + pub aggs: HashMap, + pub by: Vec, } /// An on-going aggregation (including the needed state to compute the next value of the