Skip to content

Commit

Permalink
[ENH] Add timeout for sysdb client. Also add interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Jun 18, 2024
1 parent 56d90e1 commit eef20fe
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
4 changes: 4 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ query_service:
Grpc:
host: "sysdb.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma-storage"
Expand Down Expand Up @@ -54,6 +56,8 @@ compaction_service:
Grpc:
host: "sysdb.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma-storage"
Expand Down
16 changes: 16 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -204,6 +206,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -262,6 +266,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -296,6 +302,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -372,6 +380,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -406,6 +416,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -476,6 +488,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
Expand Down Expand Up @@ -508,6 +522,8 @@ mod tests {
Grpc:
host: "localhost"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 1000
log:
Grpc:
host: "localhost"
Expand Down
2 changes: 2 additions & 0 deletions rust/worker/src/sysdb/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::Deserialize;
pub(crate) struct GrpcSysDbConfig {
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) connect_timeout_ms: u64,
pub(crate) request_timeout_ms: u64,
}

#[derive(Deserialize)]
Expand Down
43 changes: 37 additions & 6 deletions rust/worker/src/sysdb/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use super::config::SysDbConfig;
use super::test_sysdb::TestSysDb;
use crate::chroma_proto;
use crate::chroma_proto::sys_db_client;
use crate::chroma_proto::sys_db_client::SysDbClient;
use crate::config::Configurable;
use crate::errors::ChromaError;
use crate::errors::ErrorCodes;
use crate::tracing::util::client_interceptor;
use crate::types::Collection;
use crate::types::CollectionConversionError;
use crate::types::FlushCompactionResponse;
Expand All @@ -18,7 +20,12 @@ use crate::types::Tenant;
use async_trait::async_trait;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tonic::service::interceptor;
use tonic::transport::Endpoint;
use tonic::Request;
use tonic::Status;
use uuid::Uuid;

const DEFAULT_DATBASE: &str = "default_database";
Expand Down Expand Up @@ -122,7 +129,12 @@ impl SysDb {
// Since this uses tonic transport channel, cloning is cheap. Each client only supports
// one inflight request at a time, so we need to clone the client for each requester.
pub(crate) struct GrpcSysDb {
client: sys_db_client::SysDbClient<tonic::transport::Channel>,
client: SysDbClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
>,
}

#[derive(Error, Debug)]
Expand All @@ -148,15 +160,34 @@ impl Configurable<SysDbConfig> for GrpcSysDb {
let port = &my_config.port;
println!("Connecting to sysdb at {}:{}", host, port);
let connection_string = format!("http://{}:{}", host, port);
let client = sys_db_client::SysDbClient::connect(connection_string).await;
match client {
Ok(client) => {
let endpoint = match Endpoint::from_shared(connection_string) {
Ok(endpoint) => endpoint,
Err(e) => {
return Err(Box::new(GrpcSysDbError::FailedToConnect(
tonic::transport::Error::from(e),
)));
}
};

let endpoint = endpoint
.connect_timeout(Duration::from_millis(my_config.connect_timeout_ms))
.timeout(Duration::from_millis(my_config.request_timeout_ms));
match endpoint.connect().await {
Ok(channel) => {
let client: SysDbClient<
interceptor::InterceptedService<
tonic::transport::Channel,
fn(Request<()>) -> Result<Request<()>, Status>,
>,
> = SysDbClient::with_interceptor(channel, client_interceptor);
return Ok(GrpcSysDb { client });
}
Err(e) => {
return Err(Box::new(GrpcSysDbError::FailedToConnect(e)));
return Err(Box::new(GrpcSysDbError::FailedToConnect(
tonic::transport::Error::from(e),
)));
}
}
};
}
}
}
Expand Down

0 comments on commit eef20fe

Please sign in to comment.