Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more SHOW ... WHERE #2649

Merged
merged 1 commit into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/materialized/tests/pgwire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ fn test_persistence() -> Result<(), Box<dyn Error>> {
&[
("@1".into(), 1),
("@2".into(), 2),
("@4".into(), 4),
("c".into(), 3),
("@4".into(), 4)
],
);
assert_eq!(
Expand Down
7 changes: 6 additions & 1 deletion src/sql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ pub fn plan_show_where(
Ok((
row_expr,
RowSetFinishing {
order_by: vec![],
order_by: (0..num_cols)
.map(|c| ColumnOrder {
column: c,
desc: false,
})
.collect(),
limit: None,
offset: 0,
project: (0..num_cols).collect(),
Expand Down
223 changes: 119 additions & 104 deletions src/sql/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,52 @@ use tokio::io::AsyncBufReadExt;
lazy_static! {
static ref SHOW_DATABASES_DESC: RelationDesc =
{ RelationDesc::empty().add_column("Database", ScalarType::String) };
static ref SHOW_INDEXES_DESC: RelationDesc = RelationDesc::new(
RelationType::new(vec![
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::Bool),
ColumnType::new(ScalarType::Int64),
]),
vec![
"Source_or_view",
"Key_name",
"Column_name",
"Expression",
"Null",
"Seq_in_index",
]
.into_iter()
.map(Some),
);
static ref SHOW_COLUMNS_DESC: RelationDesc = RelationDesc::empty()
.add_column("Field", ScalarType::String)
.add_column("Nullable", ScalarType::String)
.add_column("Type", ScalarType::String);
}

pub fn make_show_objects_desc(
object_type: ObjectType,
materialized: bool,
full: bool,
) -> RelationDesc {
let col_name = object_type_as_plural_str(object_type);
if full {
let mut relation_desc = RelationDesc::empty()
.add_column(col_name, ScalarType::String)
.add_column("TYPE", ScalarType::String);
if ObjectType::View == object_type {
relation_desc = relation_desc.add_column("QUERYABLE", ScalarType::Bool);
}
if !materialized && (ObjectType::View == object_type || ObjectType::Source == object_type) {
relation_desc = relation_desc.add_column("MATERIALIZED", ScalarType::Bool);
}
relation_desc
} else {
RelationDesc::empty().add_column(col_name, ScalarType::String)
}
}

pub fn describe_statement(
Expand Down Expand Up @@ -124,40 +170,9 @@ pub fn describe_statement(
vec![],
),

Statement::ShowColumns { .. } => (
Some(
RelationDesc::empty()
.add_column("Field", ScalarType::String)
.add_column("Nullable", ScalarType::String)
.add_column("Type", ScalarType::String),
),
vec![],
),
Statement::ShowColumns { .. } => (Some(SHOW_COLUMNS_DESC.clone()), vec![]),

Statement::ShowIndexes { .. } => (
Some(RelationDesc::new(
RelationType::new(vec![
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::String).nullable(true),
ColumnType::new(ScalarType::Bool),
ColumnType::new(ScalarType::Int64),
]),
vec![
"Source_or_view",
"Key_name",
"Column_name",
"Expression",
"Null",
"Seq_in_index",
]
.iter()
.map(|s| Some(*s))
.collect::<Vec<_>>(),
)),
vec![],
),
Statement::ShowIndexes { .. } => (Some(SHOW_INDEXES_DESC.clone()), vec![]),

Statement::ShowDatabases { .. } => (Some(SHOW_DATABASES_DESC.clone()), vec![]),

Expand All @@ -166,28 +181,10 @@ pub fn describe_statement(
full,
materialized,
..
} => {
let col_name = object_type_as_plural_str(object_type);
(
Some(if full {
let mut relation_desc = RelationDesc::empty()
.add_column(col_name, ScalarType::String)
.add_column("TYPE", ScalarType::String);
if ObjectType::View == object_type {
relation_desc = relation_desc.add_column("QUERYABLE", ScalarType::Bool);
}
if !materialized
&& (ObjectType::View == object_type || ObjectType::Source == object_type)
{
relation_desc = relation_desc.add_column("MATERIALIZED", ScalarType::Bool);
}
relation_desc
} else {
RelationDesc::empty().add_column(col_name, ScalarType::String)
}),
vec![],
)
}
} => (
Some(make_show_objects_desc(object_type, materialized, full)),
vec![],
),
Statement::ShowVariable { variable, .. } => {
if variable.value == unicase::Ascii::new("ALL") {
(
Expand Down Expand Up @@ -281,7 +278,7 @@ pub fn handle_statement(
from,
materialized,
filter,
} => handle_show_objects(scx, extended, full, materialized, ot, from, filter),
} => handle_show_objects(scx, extended, full, materialized, ot, from, filter.as_ref()),
Statement::ShowIndexes {
extended,
table_name,
Expand Down Expand Up @@ -346,17 +343,13 @@ fn handle_tail(scx: &StatementContext, from: ObjectName) -> Result<Plan, failure
}
}

fn handle_show_databases(
fn finish_show_where(
scx: &StatementContext,
filter: Option<&ShowStatementFilter>,
rows: Vec<Vec<Datum>>,
desc: &RelationDesc,
) -> Result<Plan, failure::Error> {
let rows = scx
.catalog
.databases()
.map(|database| vec![Datum::from(database)])
.collect();

let (r, finishing) = query::plan_show_where(scx, filter, rows, &SHOW_DATABASES_DESC)?;
let (r, finishing) = query::plan_show_where(scx, filter, rows, desc)?;

Ok(Plan::Peek {
source: r.decorrelate()?,
Expand All @@ -366,32 +359,45 @@ fn handle_show_databases(
})
}

fn handle_show_databases(
scx: &StatementContext,
filter: Option<&ShowStatementFilter>,
) -> Result<Plan, failure::Error> {
let rows = scx
.catalog
.databases()
.map(|database| vec![Datum::from(database)])
.collect();

finish_show_where(scx, filter, rows, &SHOW_DATABASES_DESC)
}

fn handle_show_objects(
scx: &StatementContext,
extended: bool,
full: bool,
materialized: bool,
object_type: ObjectType,
from: Option<ObjectName>,
filter: Option<ShowStatementFilter>,
filter: Option<&ShowStatementFilter>,
) -> Result<Plan, failure::Error> {
let classify_id = |id| match id {
GlobalId::System(_) => "SYSTEM",
GlobalId::User(_) => "USER",
};
let make_row = |name: &str, class| {
let arena = RowArena::new();
let make_row = |name: &str, class: &str| {
if full {
Row::pack(&[Datum::from(name), Datum::from(class)])
vec![
Datum::from(arena.push_string(name.to_string())),
Datum::from(arena.push_string(class.to_string())),
]
} else {
Row::pack(&[Datum::from(name)])
vec![Datum::from(arena.push_string(name.to_string()))]
}
};

if let ObjectType::Schema = object_type {
if filter.is_some() {
bail!("SHOW SCHEMAS ... {LIKE | WHERE} is not supported");
}

let schemas = if let Some(from) = from {
if from.0.len() != 1 {
bail!(
Expand Down Expand Up @@ -427,15 +433,15 @@ fn handle_show_objects(
rows.push(make_row(name, "SYSTEM"));
}
}
rows.sort_unstable_by(move |a, b| a.unpack_first().cmp(&b.unpack_first()));
Ok(Plan::SendRows(rows))
// TODO(justin): it's unfortunate that we call make_show_objects_desc twice, I think we
// should be able to restructure this so that it only gets called once.
finish_show_where(
scx,
filter,
rows,
&make_show_objects_desc(object_type, materialized, full),
)
} else {
let like_regex = match filter {
Some(ShowStatementFilter::Like(pattern)) => like_pattern::build_regex(&pattern)?,
Some(ShowStatementFilter::Where(_)) => bail!("SHOW ... WHERE is not supported"),
None => like_pattern::build_regex("%")?,
};

let empty_schema = BTreeMap::new();
let items = if let Some(mut from) = from {
if from.0.len() > 2 {
Expand Down Expand Up @@ -469,12 +475,20 @@ fn handle_show_objects(
let filtered_items = items
.iter()
.map(|(name, id)| (name, scx.catalog.get_by_id(id)))
.filter(|(_name, entry)| {
object_type_matches(object_type, entry.item())
&& like_regex.is_match(&entry.name().to_string())
});
.filter(|(_name, entry)| object_type_matches(object_type, entry.item()));

if object_type == ObjectType::View || object_type == ObjectType::Source {
// TODO(justin): we can't handle SHOW ... WHERE here yet because the coordinator adds
// extra columns to this result that we don't have access to here yet. This could be
// fixed by passing down this extra catalog info somehow.
let like_regex = match filter {
Some(ShowStatementFilter::Like(pattern)) => like_pattern::build_regex(&pattern)?,
Some(ShowStatementFilter::Where(_)) => bail!("SHOW ... WHERE is not supported"),
None => like_pattern::build_regex("%")?,
};

let filtered_items = filtered_items
.filter(|(_name, entry)| like_regex.is_match(&entry.name().to_string()));
Ok(Plan::ShowViews {
ids: filtered_items
.map(|(name, entry)| (name.clone(), entry.id()))
Expand All @@ -484,11 +498,16 @@ fn handle_show_objects(
limit_materialized: materialized,
})
} else {
let mut rows = filtered_items
let rows = filtered_items
.map(|(name, entry)| make_row(name, classify_id(entry.id())))
.collect::<Vec<_>>();
rows.sort_unstable_by(move |a, b| a.unpack_first().cmp(&b.unpack_first()));
Ok(Plan::SendRows(rows))

finish_show_where(
scx,
filter,
rows,
&make_show_objects_desc(object_type, materialized, full),
)
}
}
}
Expand All @@ -502,9 +521,6 @@ fn handle_show_indexes(
if extended {
bail!("SHOW EXTENDED INDEXES is not supported")
}
if filter.is_some() {
bail!("SHOW INDEXES ... WHERE is not supported");
}
let from_name = scx.resolve_name(from_name)?;
let from_entry = scx.catalog.get(&from_name)?;
if !object_type_matches(ObjectType::View, from_entry.item())
Expand All @@ -516,6 +532,7 @@ fn handle_show_indexes(
from_entry.item().type_string()
);
}
let arena = RowArena::new();
let rows = scx
.catalog
.iter()
Expand All @@ -541,7 +558,6 @@ fn handle_show_indexes(
for (i, (key_expr, key_sql)) in keys.iter().zip_eq(key_sqls).enumerate() {
let desc = scx.catalog.get_by_id(&on).desc().unwrap();
let key_sql = key_sql.to_string();
let arena = RowArena::new();
let (col_name, func) = match key_expr {
expr::ScalarExpr::Column(i) => {
let col_name = match desc.get_unambiguous_name(*i) {
Expand All @@ -552,21 +568,22 @@ fn handle_show_indexes(
}
_ => (Datum::Null, Datum::String(arena.push_string(key_sql))),
};
row_subset.push(Row::pack(&vec![
Datum::String(&from_entry.name().to_string()),
Datum::String(&entry.name().to_string()),
row_subset.push(vec![
Datum::String(arena.push_string(from_entry.name().to_string())),
Datum::String(arena.push_string(entry.name().to_string())),
col_name,
func,
Datum::from(key_expr.typ(desc.typ()).nullable),
Datum::from((i + 1) as i64),
]));
]);
}
row_subset
}
_ => unreachable!(),
})
.collect();
Ok(Plan::SendRows(rows))

finish_show_where(scx, filter, rows, &SHOW_INDEXES_DESC)
}

/// Create an immediate result that describes all the columns for the given table
Expand All @@ -583,27 +600,25 @@ fn handle_show_columns(
if full {
bail!("SHOW FULL COLUMNS is not supported");
}
if filter.is_some() {
bail!("SHOW COLUMNS ... { LIKE | WHERE } is not supported");
}

let arena = RowArena::new();
let table_name = scx.resolve_name(table_name)?;
let column_descriptions: Vec<_> = scx
let rows: Vec<_> = scx
.catalog
.get(&table_name)?
.desc()?
.iter()
.map(|(name, typ)| {
let name = name.map(|n| n.to_string());
Row::pack(&[
Datum::String(name.as_deref().unwrap_or("?")),
vec![
Datum::String(name.map(|n| arena.push_string(n)).unwrap_or("?")),
Datum::String(if typ.nullable { "YES" } else { "NO" }),
Datum::String(pgrepr::Type::from(&typ.scalar_type).name()),
])
]
})
.collect();

Ok(Plan::SendRows(column_descriptions))
finish_show_where(scx, filter, rows, &SHOW_COLUMNS_DESC)
}

fn handle_show_create_view(
Expand Down
Loading