Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add timeout for sysdb client. Also add interceptor #2371

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ query_service:
Grpc:
host: "sysdb.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma-storage"
Expand Down Expand Up @@ -54,6 +56,8 @@ compaction_service:
Grpc:
host: "sysdb.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma-storage"
Expand Down
16 changes: 16 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -204,6 +206,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -262,6 +266,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -296,6 +302,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -372,6 +380,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -406,6 +416,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -476,6 +488,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -508,6 +522,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
log:
Grpc:
host: "localhost"
Expand Down
2 changes: 2 additions & 0 deletions rust/worker/src/sysdb/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::Deserialize;
pub(crate) struct GrpcSysDbConfig {
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) connect_timeout_ms: u64,
pub(crate) request_timeout_ms: u64,
}

#[derive(Deserialize)]
Expand Down
43 changes: 37 additions & 6 deletions rust/worker/src/sysdb/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use super::config::SysDbConfig;
use super::test_sysdb::TestSysDb;
use crate::chroma_proto;
use crate::chroma_proto::sys_db_client;
use crate::chroma_proto::sys_db_client::SysDbClient;
use crate::config::Configurable;
use crate::errors::ChromaError;
use crate::errors::ErrorCodes;
use crate::tracing::util::client_interceptor;
use crate::types::Collection;
use crate::types::CollectionConversionError;
use crate::types::FlushCompactionResponse;
Expand All @@ -18,7 +20,12 @@ use crate::types::Tenant;
use async_trait::async_trait;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tonic::service::interceptor;
use tonic::transport::Endpoint;
use tonic::Request;
use tonic::Status;
use uuid::Uuid;

const DEFAULT_DATBASE: &str = "default_database";
Expand Down Expand Up @@ -122,7 +129,12 @@ impl SysDb {
// Since this uses tonic transport channel, cloning is cheap. Each client only supports
// one inflight request at a time, so we need to clone the client for each requester.
pub(crate) struct GrpcSysDb {
client: sys_db_client::SysDbClient<tonic::transport::Channel>,
client: SysDbClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
>,
}

#[derive(Error, Debug)]
Expand All @@ -148,15 +160,34 @@ impl Configurable<SysDbConfig> for GrpcSysDb {
let port = &my_config.port;
println!("Connecting to sysdb at {}:{}", host, port);
let connection_string = format!("http://{}:{}", host, port);
let client = sys_db_client::SysDbClient::connect(connection_string).await;
match client {
Ok(client) => {
let endpoint = match Endpoint::from_shared(connection_string) {
Ok(endpoint) => endpoint,
Err(e) => {
return Err(Box::new(GrpcSysDbError::FailedToConnect(
tonic::transport::Error::from(e),
)));
}
};

let endpoint = endpoint
.connect_timeout(Duration::from_millis(my_config.connect_timeout_ms))
.timeout(Duration::from_millis(my_config.request_timeout_ms));
match endpoint.connect().await {
Ok(channel) => {
let client: SysDbClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
> = SysDbClient::with_interceptor(channel, client_interceptor);
return Ok(GrpcSysDb { client });
}
Err(e) => {
return Err(Box::new(GrpcSysDbError::FailedToConnect(e)));
return Err(Box::new(GrpcSysDbError::FailedToConnect(
tonic::transport::Error::from(e),
)));
}
}
};
}
}
}
Expand Down
Loading