Skip to content

Commit

Permalink
quickwit: Reuse the same reqwest::Client for connection pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Dec 6, 2024
1 parent 4380529 commit 9440d5d
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions src/quickwit_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ pub struct QuickwitConnector {
collections: SharedCollections,
interval_task: JoinHandle<()>,
shutdown_tx: watch::Sender<()>,
client: Client,
}

fn filter_ast_to_query(ast: &FilterAst) -> Option<serde_json::Value> {
Expand Down Expand Up @@ -329,6 +330,7 @@ fn filter_ast_to_query(ast: &FilterAst) -> Option<serde_json::Value> {

#[instrument(skip(query), name = "GET and parse quickwit begin search results")]
async fn begin_search(
client: &Client,
base_url: &str,
index: &str,
query: Option<serde_json::Value>,
Expand All @@ -342,7 +344,6 @@ async fn begin_search(
scroll_timeout.as_millis(),
scroll_size,
);
let client = Client::new();

let mut req = client.get(&url);
if let Some(query) = query {
Expand All @@ -368,6 +369,7 @@ async fn begin_search(

#[instrument(name = "GET and parse quickwit continue search results")]
async fn continue_search(
client: &Client,
base_url: &str,
scroll_id: String,
scroll_timeout: &Duration,
Expand Down Expand Up @@ -395,9 +397,13 @@ async fn continue_search(
}

#[instrument(skip(query), name = "GET and parse quickwit count result")]
async fn count(base_url: &str, index: &str, query: Option<serde_json::Value>) -> Result<i64> {
async fn count(
client: &Client,
base_url: &str,
index: &str,
query: Option<serde_json::Value>,
) -> Result<i64> {
let url = format!("{}/api/v1/_elastic/{}/_count", base_url, index);
let client = Client::new();

let mut req = client.get(&url);
if let Some(query) = query {
Expand All @@ -423,12 +429,12 @@ async fn count(base_url: &str, index: &str, query: Option<serde_json::Value>) ->
name = "GET and parse quickwit search aggregation results"
)]
async fn search_aggregation(
client: &Client,
base_url: &str,
index: &str,
query: Option<serde_json::Value>,
) -> Result<SearchAggregationResponse> {
let url = format!("{}/api/v1/_elastic/{}/_search", base_url, index,);
let client = Client::new();

let mut req = client.get(&url);
if let Some(query) = query {
Expand All @@ -449,9 +455,8 @@ async fn search_aggregation(
}

#[instrument(name = "GET and parse quickwit indexes")]
async fn get_indexes(base_url: &str) -> Result<Vec<String>> {
async fn get_indexes(client: &Client, base_url: &str) -> Result<Vec<String>> {
let url = format!("{}/api/v1/indexes", base_url);
let client = Client::new();
let response = client.get(&url).send().await.context("http request")?;
if !response.status().is_success() {
bail!("GET {} failed with status: {}", &url, response.status());
Expand All @@ -461,8 +466,8 @@ async fn get_indexes(base_url: &str) -> Result<Vec<String>> {
Ok(data.into_iter().map(|x| x.index_config.index_id).collect())
}

async fn refresh_indexes(url: &str, collections: &SharedCollections) {
match get_indexes(url).await {
async fn refresh_indexes(client: &Client, url: &str, collections: &SharedCollections) {
match get_indexes(client, url).await {
Ok(indexes) => {
debug!("Got indexes: {:?}", &indexes);
let mut guard = collections.write().await;
Expand All @@ -479,11 +484,13 @@ async fn run_interval_task(
collections: SharedCollections,
mut shutdown_rx: watch::Receiver<()>,
) {
let client = Client::new();

let future = async {
refresh_indexes(&config.url, &collections).await;
refresh_indexes(&client, &config.url, &collections).await;
loop {
sleep(config.refresh_interval).await;
refresh_indexes(&config.url, &collections).await;
refresh_indexes(&client, &config.url, &collections).await;
}
};

Expand Down Expand Up @@ -512,10 +519,12 @@ impl QuickwitConnector {
collections,
interval_task,
shutdown_tx,
client: Client::new(),
}
}

async fn query_search(
client: Client,
url: String,
index: String,
query: Option<serde_json::Value>,
Expand All @@ -533,7 +542,7 @@ impl QuickwitConnector {

let mut streamed = 0;
let (mut logs, mut scroll_id) =
begin_search(&url, &index, query, &scroll_timeout, scroll_size).await?;
begin_search(&client, &url, &index, query, &scroll_timeout, scroll_size).await?;

let duration = start.elapsed();
debug!(elapsed_time = ?duration, "Begin search time");
Expand All @@ -554,7 +563,7 @@ impl QuickwitConnector {
}

loop {
(logs, scroll_id) = continue_search(&url, scroll_id, &scroll_timeout).await?;
(logs, scroll_id) = continue_search(&client, &url, scroll_id, &scroll_timeout).await?;
if logs.is_empty() {
return;
}
Expand Down Expand Up @@ -648,13 +657,14 @@ impl QuickwitConnector {
}

async fn query_aggregation(
client: Client,
url: String,
index: String,
query: Option<serde_json::Value>,
group_by: Vec<String>,
count_fields: Vec<String>,
) -> Result<LogTryStream> {
let mut response = search_aggregation(&url, &index, query)
let mut response = search_aggregation(&client, &url, &index, query)
.await
.context("run quickwit aggregation query")?;

Expand Down Expand Up @@ -779,7 +789,7 @@ impl Connector for QuickwitConnector {
);

if handle.count {
let mut result = count(&url, &collection, query).await?;
let mut result = count(&self.client, &url, &collection, query).await?;
if let Some(limit) = limit {
result = (limit as i64).min(result);
}
Expand All @@ -789,6 +799,7 @@ impl Connector for QuickwitConnector {
if is_aggregation_query {
return Ok(QueryResponse::Logs(
Self::query_aggregation(
self.client.clone(),
url,
collection,
query,
Expand All @@ -800,7 +811,16 @@ impl Connector for QuickwitConnector {
}

Ok(QueryResponse::Logs(
Self::query_search(url, collection, query, scroll_timeout, scroll_size, limit).await?,
Self::query_search(
self.client.clone(),
url,
collection,
query,
scroll_timeout,
scroll_size,
limit,
)
.await?,
))
}

Expand Down

0 comments on commit 9440d5d

Please sign in to comment.