Skip to content

Commit

Permalink
quickwit: Query first search before iterating over stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Dec 6, 2024
1 parent 15e7d15 commit 4380529
Showing 1 changed file with 25 additions and 28 deletions.
53 changes: 25 additions & 28 deletions src/quickwit_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::{
any::Any,
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use async_stream::try_stream;
use axum::async_trait;
use color_eyre::eyre::{bail, Context, Result};
use futures_util::stream;
use reqwest::Client;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::{json, to_string};
Expand Down Expand Up @@ -514,33 +515,34 @@ impl QuickwitConnector {
}
}

fn query_search(
async fn query_search(
url: String,
index: String,
query: Option<serde_json::Value>,
scroll_timeout: Duration,
scroll_size: u16,
limit: Option<u32>,
) -> LogTryStream {
Box::pin(try_stream! {
if let Some(limit) = limit {
if limit == 0 {
return;
}
) -> Result<LogTryStream> {
if let Some(limit) = limit {
if limit == 0 {
return Ok(Box::pin(stream::empty()));
}
}

let mut streamed = 0;
let (mut logs, mut scroll_id) = begin_search(
&url,
&index,
query,
&scroll_timeout,
scroll_size
).await?;
let start = Instant::now();

if logs.is_empty() {
return;
}
let mut streamed = 0;
let (mut logs, mut scroll_id) =
begin_search(&url, &index, query, &scroll_timeout, scroll_size).await?;

let duration = start.elapsed();
debug!(elapsed_time = ?duration, "Begin search time");

if logs.is_empty() {
return Ok(Box::pin(stream::empty()));
}

Ok(Box::pin(try_stream! {
for log in logs {
yield log;
streamed += 1;
Expand All @@ -566,7 +568,7 @@ impl QuickwitConnector {
}
}
}
})
}))
}

fn parse_last_bucket(
Expand Down Expand Up @@ -797,14 +799,9 @@ impl Connector for QuickwitConnector {
));
}

Ok(QueryResponse::Logs(Self::query_search(
url,
collection,
query,
scroll_timeout,
scroll_size,
limit,
)))
Ok(QueryResponse::Logs(
Self::query_search(url, collection, query, scroll_timeout, scroll_size, limit).await?,
))
}

fn apply_filter(
Expand Down

0 comments on commit 4380529

Please sign in to comment.