Skip to content

Commit

Permalink
[SPARK-45787][SQL] Support Catalog.listColumns for clustering columns
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Support listColumns API for clustering columns.
### Why are the changes needed?

Clustering columns should be supported, just like partition and bucket columns, for listColumns API.
### Does this PR introduce _any_ user-facing change?

Yes, listColumns will now show an additional field `isCluster` to indicate whether the column is a clustering column.
Old output for `spark.catalog.listColumns`:
```
+----+-----------+--------+--------+-----------+--------+
|name|description|dataType|nullable|isPartition|isBucket|
+----+-----------+--------+--------+-----------+--------+
|   a|       null|     int|    true|      false|   false|
|   b|       null|  string|    true|      false|   false|
|   c|       null|     int|    true|      false|   false|
|   d|       null|  string|    true|      false|   false|
+----+-----------+--------+--------+-----------+--------+
```

New output:
```
+----+-----------+--------+--------+-----------+--------+---------+
|name|description|dataType|nullable|isPartition|isBucket|isCluster|
+----+-----------+--------+--------+-----------+--------+---------+
|   a|       null|     int|    true|      false|   false|    false|
|   b|       null|  string|    true|      false|   false|    false|
|   c|       null|     int|    true|      false|   false|    false|
|   d|       null|  string|    true|      false|   false|    false|
+----+-----------+--------+--------+-----------+--------+---------+
```

### How was this patch tested?

New unit tests.
### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47451 from zedtang/list-clustering-columns.

Authored-by: Jiaheng Tang <jiaheng.tang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
zedtang authored and ilicmarkodb committed Jul 29, 2024
1 parent 59f0c4a commit 533ef2d
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 28 deletions.
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -4152,7 +4152,8 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
c <- listColumns("cars")
expect_equal(nrow(c), 2)
expect_equal(colnames(c),
c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
c("name", "description", "dataType", "nullable", "isPartition", "isBucket",
"isCluster"))
expect_equal(collect(c)[[1]][[1]], "speed")
expect_error(listColumns("zxwtyswklpf", "default"),
"[TABLE_OR_VIEW_NOT_FOUND]*`spark_catalog`.`default`.`zxwtyswklpf`*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ class Table(
* whether the column is a partition column.
* @param isBucket
* whether the column is a bucket column.
* @param isCluster
* whether the column is a clustering column.
* @since 3.5.0
*/
class Column(
Expand All @@ -161,17 +163,29 @@ class Column(
val dataType: String,
val nullable: Boolean,
val isPartition: Boolean,
val isBucket: Boolean)
val isBucket: Boolean,
val isCluster: Boolean)
extends DefinedByConstructorParams {

def this(
name: String,
description: String,
dataType: String,
nullable: Boolean,
isPartition: Boolean,
isBucket: Boolean) = {
this(name, description, dataType, nullable, isPartition, isBucket, isCluster = false)
}

override def toString: String = {
"Column[" +
s"name='$name', " +
Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"dataType='$dataType', " +
s"nullable='$nullable', " +
s"isPartition='$isPartition', " +
s"isBucket='$isBucket']"
s"isBucket='$isBucket', " +
s"isCluster='$isCluster']"
}

}
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Column(NamedTuple):
nullable: bool
isPartition: bool
isBucket: bool
isCluster: bool


class Function(NamedTuple):
Expand Down Expand Up @@ -663,6 +664,7 @@ def listColumns(self, tableName: str, dbName: Optional[str] = None) -> List[Colu
nullable=jcolumn.nullable(),
isPartition=jcolumn.isPartition(),
isBucket=jcolumn.isBucket(),
isCluster=jcolumn.isCluster(),
)
)
return columns
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/connect/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def listColumns(self, tableName: str, dbName: Optional[str] = None) -> List[Colu
nullable=table[3][i].as_py(),
isPartition=table[4][i].as_py(),
isBucket=table[5][i].as_py(),
isCluster=table[6][i].as_py(),
)
for i in range(table.num_rows)
]
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ def test_list_columns(self):
nullable=True,
isPartition=False,
isBucket=False,
isCluster=False,
),
)
self.assertEqual(
Expand All @@ -378,6 +379,7 @@ def test_list_columns(self):
nullable=True,
isPartition=False,
isBucket=False,
isCluster=False,
),
)
columns2 = sorted(
Expand All @@ -393,6 +395,7 @@ def test_list_columns(self):
nullable=True,
isPartition=False,
isBucket=False,
isCluster=False,
),
)
self.assertEqual(
Expand All @@ -404,6 +407,7 @@ def test_list_columns(self):
nullable=True,
isPartition=False,
isBucket=False,
isCluster=False,
),
)
self.assertRaisesRegex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
val newTbl1 = catalog.getTable("db2", "tbl1")
assert(!tbl1.properties.contains("toh"))
assert(newTbl1.properties.size == tbl1.properties.size + 1)
// clusteringColumns property is injected during newTable, so we need
// to filter it out before comparing the properties.
assert(newTbl1.properties.size ==
tbl1.properties.filter { case (key, _) => key != "clusteringColumns" }.size + 1)
assert(newTbl1.properties.get("toh") == Some("frem"))
}

Expand Down Expand Up @@ -1111,7 +1114,9 @@ abstract class CatalogTestUtils {
},
provider = Some(defaultProvider),
partitionColumnNames = Seq("a", "b"),
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)),
properties = Map(
ClusterBySpec.toPropertyWithoutValidation(ClusterBySpec.fromColumnNames(Seq("c1", "c2")))))
}

def newView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
val newTbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
assert(!tbl1.properties.contains("toh"))
assert(newTbl1.properties.size == tbl1.properties.size + 1)
// clusteringColumns property is injected during newTable, so we need
// to filter it out before comparing the properties.
assert(newTbl1.properties.size ==
tbl1.properties.filter { case (key, _) => key != "clusteringColumns" }.size + 1)
assert(newTbl1.properties.get("toh") == Some("frem"))
// Alter table without explicitly specifying database
catalog.setCurrentDatabase("db2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class Table(
* @param nullable whether the column is nullable.
* @param isPartition whether the column is a partition column.
* @param isBucket whether the column is a bucket column.
* @param isCluster whether the column is a clustering column.
* @since 2.0.0
*/
@Stable
Expand All @@ -144,17 +145,29 @@ class Column(
val dataType: String,
val nullable: Boolean,
val isPartition: Boolean,
val isBucket: Boolean)
val isBucket: Boolean,
val isCluster: Boolean)
extends DefinedByConstructorParams {

def this(
name: String,
description: String,
dataType: String,
nullable: Boolean,
isPartition: Boolean,
isBucket: Boolean) = {
this(name, description, dataType, nullable, isPartition, isBucket, isCluster = false)
}

override def toString: String = {
"Column[" +
s"name='$name', " +
Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"dataType='$dataType', " +
s"nullable='$nullable', " +
s"isPartition='$isPartition', " +
s"isBucket='$isBucket']"
s"isBucket='$isBucket', " +
s"isCluster='$isCluster']"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,14 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

val columns = sparkSession.sessionState.executePlan(plan).analyzed match {
case ResolvedTable(_, _, table, _) =>
// TODO (SPARK-45787): Support clusterBySpec for listColumns().
val (partitionColumnNames, bucketSpecOpt, _) =
val (partitionColumnNames, bucketSpecOpt, clusterBySpecOpt) =
table.partitioning.toImmutableArraySeq.convertTransforms
val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
schemaToColumns(table.schema(), partitionColumnNames.contains, bucketColumnNames.contains)
val clusteringColumnNames = clusterBySpecOpt.map { clusterBySpec =>
clusterBySpec.columnNames.map(_.toString)
}.getOrElse(Nil).toSet
schemaToColumns(table.schema(), partitionColumnNames.contains, bucketColumnNames.contains,
clusteringColumnNames.contains)

case ResolvedPersistentView(_, _, metadata) =>
schemaToColumns(metadata.schema)
Expand All @@ -415,15 +418,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
private def schemaToColumns(
schema: StructType,
isPartCol: String => Boolean = _ => false,
isBucketCol: String => Boolean = _ => false): Seq[Column] = {
isBucketCol: String => Boolean = _ => false,
isClusteringCol: String => Boolean = _ => false): Seq[Column] = {
schema.map { field =>
new Column(
name = field.name,
description = field.getComment().orNull,
dataType = field.dataType.simpleString,
nullable = field.nullable,
isPartition = isPartCol(field.name),
isBucket = isBucketCol(field.name))
isBucket = isBucketCol(field.name),
isCluster = isClusteringCol(field.name))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(tableMetadata.schema.nonEmpty, "bad test")
assert(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
assert(tableMetadata.bucketSpec.isDefined, "bad test")
assert(tableMetadata.clusterBySpec.isDefined, "bad test")
assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
val clusteringColumnNames = tableMetadata.clusterBySpec.map { clusterBySpec =>
clusterBySpec.columnNames.map(_.toString)
}.getOrElse(Nil).toSet
columns.collect().foreach { col =>
assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
assert(col.isBucket == bucketColumnNames.contains(col.name))
assert(col.isCluster == clusteringColumnNames.contains(col.name))
}

dbName.foreach { db =>
Expand Down Expand Up @@ -406,37 +411,42 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf

test("SPARK-39615: qualified name with catalog - listColumns") {
val answers = Map(
"col1" -> ("int", true, false, true),
"col2" -> ("string", true, false, false),
"a" -> ("int", true, true, false),
"b" -> ("string", true, true, false)
"col1" -> ("int", true, false, true, false),
"col2" -> ("string", true, false, false, false),
"a" -> ("int", true, true, false, false),
"b" -> ("string", true, true, false, false)
)

assert(spark.catalog.currentCatalog() === "spark_catalog")
createTable("my_table1")

val columns1 = spark.catalog.listColumns("my_table1").collect()
assert(answers ===
columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
c.isCluster)).toMap)

val columns2 = spark.catalog.listColumns("default.my_table1").collect()
assert(answers ===
columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
c.isCluster)).toMap)

val columns3 = spark.catalog.listColumns("spark_catalog.default.my_table1").collect()
assert(answers ===
columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
c.isCluster)).toMap)

createDatabase("my_db1")
createTable("my_table2", Some("my_db1"))

val columns4 = spark.catalog.listColumns("my_db1.my_table2").collect()
assert(answers ===
columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
c.isCluster)).toMap)

val columns5 = spark.catalog.listColumns("spark_catalog.my_db1.my_table2").collect()
assert(answers ===
columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket,
c.isCluster)).toMap)

val catalogName = "testcat"
val dbName = "my_db2"
Expand Down Expand Up @@ -476,13 +486,13 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf

test("Column.toString") {
assert(new Column("namama", "descaca", "datatapa",
nullable = true, isPartition = false, isBucket = true).toString ==
nullable = true, isPartition = false, isBucket = true, isCluster = false).toString ==
"Column[name='namama', description='descaca', dataType='datatapa', " +
"nullable='true', isPartition='false', isBucket='true']")
"nullable='true', isPartition='false', isBucket='true', isCluster='false']")
assert(new Column("namama", null, "datatapa",
nullable = false, isPartition = true, isBucket = true).toString ==
nullable = false, isPartition = true, isBucket = true, isCluster = true).toString ==
"Column[name='namama', dataType='datatapa', " +
"nullable='false', isPartition='true', isBucket='true']")
"nullable='false', isPartition='true', isBucket='true', isCluster='true']")
}

test("catalog classes format in Dataset.show") {
Expand All @@ -491,7 +501,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
isTemporary = false)
val function = new Function("nama", "cataloa", Array("databasa"), "descripta", "classa", false)
val column = new Column(
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true,
isCluster = true)
val dbFields = getConstructorParameterValues(db)
val tableFields = getConstructorParameterValues(table)
val functionFields = getConstructorParameterValues(function)
Expand All @@ -503,7 +514,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert((functionFields(0), functionFields(1), functionFields(3), functionFields(4),
functionFields(5)) == ("nama", "cataloa", "descripta", "classa", false))
assert(functionFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true, true))
val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)
val functionString = CatalogImpl.makeDataset(Seq(function), spark).showString(10)
Expand Down

0 comments on commit 533ef2d

Please sign in to comment.