Skip to content

Commit

Permalink
fix: the first resp may not contain schema. (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun authored Mar 20, 2024
1 parent 58a0c12 commit def62dd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct ProgressValues {
pub bytes: usize,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct SchemaField {
pub name: String,
#[serde(rename = "type")]
Expand Down
24 changes: 21 additions & 3 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl Connection for RestAPIConnection {
async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator> {
info!("query iter ext: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp).await?;
let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?;
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
}
Expand Down Expand Up @@ -199,16 +200,33 @@ impl<'o> RestAPIConnection {
if !pre.data.is_empty() {
return Ok(pre);
}
// preserve schema since it is not included in the final response in old servers
let pre_schema = pre.schema.clone();
let mut result = pre;
// preserve schema since it is no included in the final response
let schema = result.schema;
while let Some(next_uri) = result.next_uri {
result = self.client.query_page(&result.id, &next_uri).await?;
if !result.data.is_empty() {
break;
}
}
result.schema = schema;
if result.schema.is_empty() {
result.schema = pre_schema;
}
Ok(result)
}

async fn wait_for_schema(&self, pre: QueryResponse) -> Result<QueryResponse> {
if !pre.data.is_empty() || !pre.schema.is_empty() {
return Ok(pre);
}
let mut result = pre;
// preserve schema since it is no included in the final response
while let Some(next_uri) = result.next_uri {
result = self.client.query_page(&result.id, &next_uri).await?;
if !result.data.is_empty() || !result.schema.is_empty() {
break;
}
}
Ok(result)
}

Expand Down

0 comments on commit def62dd

Please sign in to comment.