Skip to content

Commit

Permalink
fix: node_id may be empty in query response.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Nov 29, 2024
1 parent e8141dd commit a3050ba
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
18 changes: 11 additions & 7 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,18 +405,20 @@ impl APIClient {
&self,
query_id: &str,
next_uri: &str,
node_id: &str,
node_id: &Option<String>,
) -> Result<QueryResponse> {
info!("query page: {}", next_uri);
let endpoint = self.endpoint.join(next_uri)?;
let headers = self.make_headers(Some(query_id))?;
let mut builder = self.cli.get(endpoint.clone());
builder = self.wrap_auth_or_session_token(builder)?;
let request = builder
builder = self
.wrap_auth_or_session_token(builder)?
.headers(headers.clone())
.header(HEADER_STICKY_NODE, node_id)
.timeout(self.page_request_timeout)
.build()?;
.timeout(self.page_request_timeout);
if let Some(node_id) = node_id {
builder = builder.header(HEADER_STICKY_NODE, node_id)
}
let request = builder.build()?;

let response = self.query_request_helper(request, false, true).await?;
let body = response.bytes().await?;
Expand Down Expand Up @@ -452,7 +454,9 @@ impl APIClient {
pub async fn wait_for_query(&self, resp: QueryResponse) -> Result<QueryResponse> {
info!("wait for query: {}", resp.id);
let node_id = resp.node_id.clone();
self.set_last_node_id(node_id.clone());
if let Some(node_id) = self.last_node_id() {
self.set_last_node_id(node_id.clone());
}
if let Some(next_uri) = &resp.next_uri {
let schema = resp.schema;
let mut data = resp.data;
Expand Down
3 changes: 2 additions & 1 deletion core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub struct SchemaField {
#[derive(Deserialize, Debug)]
pub struct QueryResponse {
pub id: String,
pub node_id: String,
// response from gateway do not contain node_id
pub node_id: Option<String>,
pub session_id: Option<String>,
pub session: Option<SessionState>,
pub schema: Vec<SchemaField>,
Expand Down
6 changes: 4 additions & 2 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ impl<'o> RestAPIConnection {
return Ok(resp);
}
let node_id = resp.node_id.clone();
self.client.set_last_node_id(node_id.clone());
if let Some(node_id) = &node_id {
self.client.set_last_node_id(node_id.clone());
}
let mut result = resp;
// preserve schema since it is not included in the final response
while let Some(next_uri) = result.next_uri {
Expand Down Expand Up @@ -247,7 +249,7 @@ pub struct RestAPIRows {
data: VecDeque<Vec<Option<String>>>,
stats: Option<ServerStats>,
query_id: String,
node_id: String,
node_id: Option<String>,
next_uri: Option<String>,
next_page: Option<PageFut>,
}
Expand Down

0 comments on commit a3050ba

Please sign in to comment.