Skip to content

Commit

Permalink
Support more SHOW ... WHERE
Browse files Browse the repository at this point in the history
Fixes #1529.

The only remaining unsupported case is SHOW VIEWS/SOURCES, because we
don't plumb down the needed info via the catalog.

I got rid of some ordering stuff, I can add it back in the form of a
RowSetFinishing, but it doesn't seem to have been tested so I'm not sure
it was actually important.
  • Loading branch information
justinj committed Apr 15, 2020
1 parent b5f136b commit 1a021ee
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 113 deletions.
7 changes: 6 additions & 1 deletion src/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ pub fn plan_show_where(
Ok((
row_expr,
RowSetFinishing {
order_by: vec![],
order_by: (0..num_cols)
.map(|c| ColumnOrder {
column: c,
desc: false,
})
.collect(),
limit: None,
offset: 0,
project: (0..num_cols).collect(),
Expand Down
223 changes: 119 additions & 104 deletions src/sql/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,52 @@ use tokio::io::AsyncBufReadExt;
lazy_static! {
static ref SHOW_DATABASES_DESC: RelationDesc =
{ RelationDesc::empty().add_column("Database", ScalarType::String) };
static ref SHOW_INDEXES_DESC: RelationDesc = RelationDesc::new(
RelationType::new(vec![
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::Bool),
ColumnType::new(ScalarType::Int64),
]),
vec![
"Source_or_view",
"Key_name",
"Column_name",
"Expression",
"Null",
"Seq_in_index",
]
.into_iter()
.map(Some),
);
static ref SHOW_COLUMNS_DESC: RelationDesc = RelationDesc::empty()
.add_column("Field", ScalarType::String)
.add_column("Nullable", ScalarType::String)
.add_column("Type", ScalarType::String);
}

pub fn make_show_objects_desc(
object_type: ObjectType,
materialized: bool,
full: bool,
) -> RelationDesc {
let col_name = object_type_as_plural_str(object_type);
if full {
let mut relation_desc = RelationDesc::empty()
.add_column(col_name, ScalarType::String)
.add_column("TYPE", ScalarType::String);
if ObjectType::View == object_type {
relation_desc = relation_desc.add_column("QUERYABLE", ScalarType::Bool);
}
if !materialized && (ObjectType::View == object_type || ObjectType::Source == object_type) {
relation_desc = relation_desc.add_column("MATERIALIZED", ScalarType::Bool);
}
relation_desc
} else {
RelationDesc::empty().add_column(col_name, ScalarType::String)
}
}

pub fn describe_statement(
Expand Down Expand Up @@ -124,40 +170,9 @@ pub fn describe_statement(
vec![],
),

Statement::ShowColumns { .. } => (
Some(
RelationDesc::empty()
.add_column("Field", ScalarType::String)
.add_column("Nullable", ScalarType::String)
.add_column("Type", ScalarType::String),
),
vec![],
),
Statement::ShowColumns { .. } => (Some(SHOW_COLUMNS_DESC.clone()), vec![]),

Statement::ShowIndexes { .. } => (
Some(RelationDesc::new(
RelationType::new(vec![
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::Bool),
ColumnType::new(ScalarType::Int64),
]),
vec![
"Source_or_view",
"Key_name",
"Column_name",
"Expression",
"Null",
"Seq_in_index",
]
.iter()
.map(|s| Some(*s))
.collect::<Vec<_>>(),
)),
vec![],
),
Statement::ShowIndexes { .. } => (Some(SHOW_INDEXES_DESC.clone()), vec![]),

Statement::ShowDatabases { .. } => (Some(SHOW_DATABASES_DESC.clone()), vec![]),

Expand All @@ -166,28 +181,10 @@ pub fn describe_statement(
full,
materialized,
..
} => {
let col_name = object_type_as_plural_str(object_type);
(
Some(if full {
let mut relation_desc = RelationDesc::empty()
.add_column(col_name, ScalarType::String)
.add_column("TYPE", ScalarType::String);
if ObjectType::View == object_type {
relation_desc = relation_desc.add_column("QUERYABLE", ScalarType::Bool);
}
if !materialized
&& (ObjectType::View == object_type || ObjectType::Source == object_type)
{
relation_desc = relation_desc.add_column("MATERIALIZED", ScalarType::Bool);
}
relation_desc
} else {
RelationDesc::empty().add_column(col_name, ScalarType::String)
}),
vec![],
)
}
} => (
Some(make_show_objects_desc(object_type, materialized, full)),
vec![],
),
Statement::ShowVariable { variable, .. } => {
if variable.value == unicase::Ascii::new("ALL") {
(
Expand Down Expand Up @@ -281,7 +278,7 @@ pub fn handle_statement(
from,
materialized,
filter,
} => handle_show_objects(scx, extended, full, materialized, ot, from, filter),
} => handle_show_objects(scx, extended, full, materialized, ot, from, filter.as_ref()),
Statement::ShowIndexes {
extended,
table_name,
Expand Down Expand Up @@ -346,17 +343,13 @@ fn handle_tail(scx: &StatementContext, from: ObjectName) -> Result<Plan, failure
}
}

fn handle_show_databases(
fn finish_show_where(
scx: &StatementContext,
filter: Option<&ShowStatementFilter>,
rows: Vec<Vec<Datum>>,
desc: &RelationDesc,
) -> Result<Plan, failure::Error> {
let rows = scx
.catalog
.databases()
.map(|database| vec![Datum::from(database)])
.collect();

let (r, finishing) = query::plan_show_where(scx, filter, rows, &SHOW_DATABASES_DESC)?;
let (r, finishing) = query::plan_show_where(scx, filter, rows, desc)?;

Ok(Plan::Peek {
source: r.decorrelate()?,
Expand All @@ -366,32 +359,45 @@ fn handle_show_databases(
})
}

fn handle_show_databases(
scx: &StatementContext,
filter: Option<&ShowStatementFilter>,
) -> Result<Plan, failure::Error> {
let rows = scx
.catalog
.databases()
.map(|database| vec![Datum::from(database)])
.collect();

finish_show_where(scx, filter, rows, &SHOW_DATABASES_DESC)
}

fn handle_show_objects(
scx: &StatementContext,
extended: bool,
full: bool,
materialized: bool,
object_type: ObjectType,
from: Option<ObjectName>,
filter: Option<ShowStatementFilter>,
filter: Option<&ShowStatementFilter>,
) -> Result<Plan, failure::Error> {
let classify_id = |id| match id {
GlobalId::System(_) => "SYSTEM",
GlobalId::User(_) => "USER",
};
let make_row = |name: &str, class| {
let arena = RowArena::new();
let make_row = |name: &str, class: &str| {
if full {
Row::pack(&[Datum::from(name), Datum::from(class)])
vec![
Datum::from(arena.push_string(name.to_string())),
Datum::from(arena.push_string(class.to_string())),
]
} else {
Row::pack(&[Datum::from(name)])
vec![Datum::from(arena.push_string(name.to_string()))]
}
};

if let ObjectType::Schema = object_type {
if filter.is_some() {
bail!("SHOW SCHEMAS ... {LIKE | WHERE} is not supported");
}

let schemas = if let Some(from) = from {
if from.0.len() != 1 {
bail!(
Expand Down Expand Up @@ -427,15 +433,15 @@ fn handle_show_objects(
rows.push(make_row(name, "SYSTEM"));
}
}
rows.sort_unstable_by(move |a, b| a.unpack_first().cmp(&b.unpack_first()));
Ok(Plan::SendRows(rows))
// TODO(justin): it's unfortunate that we call make_show_objects_desc twice, I think we
// should be able to restructure this so that it only gets called once.
finish_show_where(
scx,
filter,
rows,
&make_show_objects_desc(object_type, materialized, full),
)
} else {
let like_regex = match filter {
Some(ShowStatementFilter::Like(pattern)) => like_pattern::build_regex(&pattern)?,
Some(ShowStatementFilter::Where(_)) => bail!("SHOW ... WHERE is not supported"),
None => like_pattern::build_regex("%")?,
};

let empty_schema = BTreeMap::new();
let items = if let Some(mut from) = from {
if from.0.len() > 2 {
Expand Down Expand Up @@ -469,12 +475,20 @@ fn handle_show_objects(
let filtered_items = items
.iter()
.map(|(name, id)| (name, scx.catalog.get_by_id(id)))
.filter(|(_name, entry)| {
object_type_matches(object_type, entry.item())
&& like_regex.is_match(&entry.name().to_string())
});
.filter(|(_name, entry)| object_type_matches(object_type, entry.item()));

if object_type == ObjectType::View || object_type == ObjectType::Source {
// TODO(justin): we can't handle SHOW ... WHERE here yet because the coordinator adds
// extra columns to this result that we don't have access to here yet. This could be
// fixed by passing down this extra catalog info somehow.
let like_regex = match filter {
Some(ShowStatementFilter::Like(pattern)) => like_pattern::build_regex(&pattern)?,
Some(ShowStatementFilter::Where(_)) => bail!("SHOW ... WHERE is not supported"),
None => like_pattern::build_regex("%")?,
};

let filtered_items = filtered_items
.filter(|(_name, entry)| like_regex.is_match(&entry.name().to_string()));
Ok(Plan::ShowViews {
ids: filtered_items
.map(|(name, entry)| (name.clone(), entry.id()))
Expand All @@ -484,11 +498,16 @@ fn handle_show_objects(
limit_materialized: materialized,
})
} else {
let mut rows = filtered_items
let rows = filtered_items
.map(|(name, entry)| make_row(name, classify_id(entry.id())))
.collect::<Vec<_>>();
rows.sort_unstable_by(move |a, b| a.unpack_first().cmp(&b.unpack_first()));
Ok(Plan::SendRows(rows))

finish_show_where(
scx,
filter,
rows,
&make_show_objects_desc(object_type, materialized, full),
)
}
}
}
Expand All @@ -502,9 +521,6 @@ fn handle_show_indexes(
if extended {
bail!("SHOW EXTENDED INDEXES is not supported")
}
if filter.is_some() {
bail!("SHOW INDEXES ... WHERE is not supported");
}
let from_name = scx.resolve_name(from_name)?;
let from_entry = scx.catalog.get(&from_name)?;
if !object_type_matches(ObjectType::View, from_entry.item())
Expand All @@ -516,6 +532,7 @@ fn handle_show_indexes(
from_entry.item().type_string()
);
}
let arena = RowArena::new();
let rows = scx
.catalog
.iter()
Expand All @@ -541,7 +558,6 @@ fn handle_show_indexes(
for (i, (key_expr, key_sql)) in keys.iter().zip_eq(key_sqls).enumerate() {
let desc = scx.catalog.get_by_id(&on).desc().unwrap();
let key_sql = key_sql.to_string();
let arena = RowArena::new();
let (col_name, func) = match key_expr {
expr::ScalarExpr::Column(i) => {
let col_name = match desc.get_unambiguous_name(*i) {
Expand All @@ -552,21 +568,22 @@ fn handle_show_indexes(
}
_ => (Datum::Null, Datum::String(arena.push_string(key_sql))),
};
row_subset.push(Row::pack(&vec![
Datum::String(&from_entry.name().to_string()),
Datum::String(&entry.name().to_string()),
row_subset.push(vec![
Datum::String(arena.push_string(from_entry.name().to_string())),
Datum::String(arena.push_string(entry.name().to_string())),
col_name,
func,
Datum::from(key_expr.typ(desc.typ()).nullable),
Datum::from((i + 1) as i64),
]));
]);
}
row_subset
}
_ => unreachable!(),
})
.collect();
Ok(Plan::SendRows(rows))

finish_show_where(scx, filter, rows, &SHOW_INDEXES_DESC)
}

/// Create an immediate result that describes all the columns for the given table
Expand All @@ -583,27 +600,25 @@ fn handle_show_columns(
if full {
bail!("SHOW FULL COLUMNS is not supported");
}
if filter.is_some() {
bail!("SHOW COLUMNS ... { LIKE | WHERE } is not supported");
}

let arena = RowArena::new();
let table_name = scx.resolve_name(table_name)?;
let column_descriptions: Vec<_> = scx
let rows: Vec<_> = scx
.catalog
.get(&table_name)?
.desc()?
.iter()
.map(|(name, typ)| {
let name = name.map(|n| n.to_string());
Row::pack(&[
Datum::String(name.as_deref().unwrap_or("?")),
vec![
Datum::String(name.map(|n| arena.push_string(n)).unwrap_or("?")),
Datum::String(if typ.nullable { "YES" } else { "NO" }),
Datum::String(pgrepr::Type::from(&typ.scalar_type).name()),
])
]
})
.collect();

Ok(Plan::SendRows(column_descriptions))
finish_show_where(scx, filter, rows, &SHOW_COLUMNS_DESC)
}

fn handle_show_create_view(
Expand Down
Loading

0 comments on commit 1a021ee

Please sign in to comment.