Skip to content

Commit

Permalink
workflow: Send count as log with one field named "count"
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Dec 6, 2024
1 parent b6f7da0 commit c139fa2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ macro_rules! downcast_unwrap {

pub enum QueryResponse {
Logs(LogTryStream),
Count(u64),
Count(i64),
}

#[typetag::serde(tag = "type")]
Expand Down
6 changes: 3 additions & 3 deletions src/quickwit_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ struct ContinueSearchRequest {

#[derive(Debug, Deserialize)]
struct CountResponse {
count: u64,
count: i64,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -396,7 +396,7 @@ async fn continue_search(
}

#[instrument(skip(query), name = "GET and parse quickwit count result")]
async fn count(base_url: &str, index: &str, query: Option<serde_json::Value>) -> Result<u64> {
async fn count(base_url: &str, index: &str, query: Option<serde_json::Value>) -> Result<i64> {
let url = format!("{}/api/v1/_elastic/{}/_count", base_url, index);
let client = Client::new();

Expand Down Expand Up @@ -781,7 +781,7 @@ impl Connector for QuickwitConnector {
if handle.count {
let mut result = count(&url, &collection, query).await?;
if let Some(limit) = limit {
result = (limit as u64).min(result);
result = (limit as i64).min(result);
}
return Ok(QueryResponse::Count(result));
}
Expand Down
23 changes: 19 additions & 4 deletions src/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use summarize::{summarize_stream, Summarize};
use tokio::{spawn, sync::mpsc, task::JoinHandle};
use topn::topn_stream;
use tracing::debug;
use vrl::core::Value;

use crate::{
connector::{Connector, QueryHandle, QueryResponse, Split},
Expand All @@ -28,6 +29,8 @@ pub mod summarize;
pub mod topn;
pub mod vrl_utils;

const COUNT_LOG_FIELD_NAME: &str = "count";

type WorkflowTasks = FuturesUnordered<JoinHandle<Result<()>>>;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -81,6 +84,13 @@ fn rx_stream(mut rx: mpsc::Receiver<Log>) -> LogStream {
})
}

async fn count_to_tx(count: i64, tx: mpsc::Sender<Log>) -> Result<()> {
let mut count_log = Log::new();
count_log.insert(COUNT_LOG_FIELD_NAME.into(), Value::Integer(count));
tx.send(count_log).await.context("send count to tx")?;
Ok(())
}

async fn stream_to_tx(mut stream: LogTryStream, tx: mpsc::Sender<Log>, tag: &str) -> Result<()> {
while let Some(log) = stream.next().await {
if let Err(e) = tx.send(log.context(format!("tx {tag}"))?).await {
Expand Down Expand Up @@ -134,7 +144,7 @@ impl WorkflowStep {
QueryResponse::Count(count) => return Ok(Some(count)),
}

Ok::<Option<u64>, color_eyre::eyre::Error>(None)
Ok::<Option<i64>, color_eyre::eyre::Error>(None)
}));
}

Expand All @@ -154,7 +164,9 @@ impl WorkflowStep {
}

if let Some(inner) = count {
println!("{}", inner);
count_to_tx(inner, tx)
.await
.context("send count from scan")?;
}
}
WorkflowStep::Filter(ast) => {
Expand Down Expand Up @@ -209,11 +221,14 @@ impl WorkflowStep {
WorkflowStep::Count => {
let mut rx = rx.unwrap();

let mut count: u64 = 0;
let mut count: i64 = 0;
while rx.recv().await.is_some() {
count += 1;
}
println!("{}", count);

count_to_tx(count, tx)
.await
.context("send count from count")?;
}
}

Expand Down

0 comments on commit c139fa2

Please sign in to comment.