Skip to content

Commit

Permalink
fix(postgres): guarantee the type name on a PgTypeInfo to always be set
Browse files Browse the repository at this point in the history
fixes #241
  • Loading branch information
mehcode committed Apr 10, 2020
1 parent cd6735b commit d360f68
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 231 deletions.
1 change: 1 addition & 0 deletions sqlx-core/src/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct Describe<DB>
where
DB: Database + ?Sized,
{
// TODO: Describe#param_types should probably be Option<TypeInfo[]> as we either know all the params or we know none
/// The expected types for the parameters of the query.
pub param_types: Box<[Option<DB::TypeInfo>]>,

Expand Down
86 changes: 9 additions & 77 deletions sqlx-core/src/postgres/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::sync::Arc;

use futures_core::future::BoxFuture;
Expand All @@ -7,8 +6,8 @@ use crate::connection::ConnectionSource;
use crate::cursor::Cursor;
use crate::executor::Execute;
use crate::pool::Pool;
use crate::postgres::protocol::{DataRow, Message, ReadyForQuery, RowDescription, StatementId};
use crate::postgres::row::{Column, Statement};
use crate::postgres::protocol::{DataRow, Message, ReadyForQuery, RowDescription};
use crate::postgres::row::Statement;
use crate::postgres::{PgArguments, PgConnection, PgRow, Postgres};

pub struct PgCursor<'c, 'q> {
Expand Down Expand Up @@ -53,76 +52,6 @@ impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
}
}

fn parse_row_description(conn: &mut PgConnection, rd: RowDescription) -> Statement {
let mut names = HashMap::new();
let mut columns = Vec::new();

columns.reserve(rd.fields.len());
names.reserve(rd.fields.len());

for (index, field) in rd.fields.iter().enumerate() {
if let Some(name) = &field.name {
names.insert(name.clone(), index);
}

let type_info = conn.get_type_info_by_oid(field.type_id.0);

columns.push(Column {
type_info,
format: field.type_format,
});
}

Statement {
columns: columns.into_boxed_slice(),
names,
}
}

// Used to describe the incoming results
// We store the column map in an Arc and share it among all rows
async fn expect_desc(conn: &mut PgConnection) -> crate::Result<Statement> {
let description: Option<_> = loop {
match conn.stream.receive().await? {
Message::ParseComplete | Message::BindComplete => {}

Message::RowDescription => {
break Some(RowDescription::read(conn.stream.buffer())?);
}

Message::NoData => {
break None;
}

message => {
return Err(
protocol_err!("next/describe: unexpected message: {:?}", message).into(),
);
}
}
};

if let Some(description) = description {
Ok(parse_row_description(conn, description))
} else {
Ok(Statement::default())
}
}

// A form of describe that uses the statement cache
async fn get_or_describe(
conn: &mut PgConnection,
id: StatementId,
) -> crate::Result<Arc<Statement>> {
if !conn.cache_statement.contains_key(&id) {
let statement = expect_desc(conn).await?;

conn.cache_statement.insert(id, Arc::new(statement));
}

Ok(Arc::clone(&conn.cache_statement[&id]))
}

async fn next<'a, 'c: 'a, 'q: 'a>(
cursor: &'a mut PgCursor<'c, 'q>,
) -> crate::Result<Option<PgRow<'a>>> {
Expand All @@ -136,9 +65,8 @@ async fn next<'a, 'c: 'a, 'q: 'a>(

// If there is a statement ID, this is a non-simple or prepared query
if let Some(statement) = statement {
// A prepared statement will re-use the previous column map if
// this query has been executed before
cursor.statement = get_or_describe(&mut *conn, statement).await?;
// A prepared statement will re-use the previous column map
cursor.statement = Arc::clone(&conn.cache_statement[&statement]);
}

// A non-prepared query must be described each time
Expand All @@ -164,8 +92,12 @@ async fn next<'a, 'c: 'a, 'q: 'a>(
}

Message::RowDescription => {
// NOTE: This is only encountered for unprepared statements
let rd = RowDescription::read(conn.stream.buffer())?;
cursor.statement = Arc::new(parse_row_description(conn, rd));
cursor.statement = Arc::new(
conn.parse_row_description(rd, Default::default(), None, false)
.await?,
);
}

Message::DataRow => {
Expand Down
Loading

0 comments on commit d360f68

Please sign in to comment.