Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51350][SQL] Implement Show Procedures #50109

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ Below is a list of all the keywords in Spark SQL.
|PRECEDING|non-reserved|non-reserved|non-reserved|
|PRIMARY|reserved|non-reserved|reserved|
|PRINCIPALS|non-reserved|non-reserved|non-reserved|
|PROCEDURES|reserved|non-reserved|non-reserved|
|PROPERTIES|non-reserved|non-reserved|non-reserved|
|PURGE|non-reserved|non-reserved|non-reserved|
|QUARTER|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ POSITION: 'POSITION';
PRECEDING: 'PRECEDING';
PRIMARY: 'PRIMARY';
PRINCIPALS: 'PRINCIPALS';
PROCEDURES: 'PROCEDURES';
PROPERTIES: 'PROPERTIES';
PURGE: 'PURGE';
QUARTER: 'QUARTER';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ statement
| SHOW PARTITIONS identifierReference partitionSpec? #showPartitions
| SHOW identifier? FUNCTIONS ((FROM | IN) ns=identifierReference)?
(LIKE? (legacy=multipartIdentifier | pattern=stringLit))? #showFunctions
| SHOW PROCEDURES ((FROM | IN) identifierReference)?
(LIKE? pattern=stringLit)? #showProcedures
| SHOW CREATE TABLE identifierReference (AS SERDE)? #showCreateTable
| SHOW CURRENT namespace #showCurrentNamespace
| SHOW CATALOGS (LIKE? pattern=stringLit)? #showCatalogs
Expand Down Expand Up @@ -2160,6 +2162,7 @@ nonReserved
| PRECEDING
| PRIMARY
| PRINCIPALS
| PROCEDURES
| PROPERTIES
| PURGE
| QUARTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

package org.apache.spark.sql.connector.catalog;

import com.google.common.collect.Lists;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.util.StringUtils;
import org.apache.spark.sql.connector.catalog.procedures.UnboundProcedure;

import org.apache.spark.sql.errors.QueryExecutionErrors;
import scala.jdk.javaapi.CollectionConverters;

import java.util.Arrays;

/**
* A catalog API for working with procedures.
*
Expand All @@ -34,4 +41,24 @@ public interface ProcedureCatalog extends CatalogPlugin {
* @return the loaded unbound procedure
*/
UnboundProcedure loadProcedure(Identifier ident);

/**
* List all procedures in the specified database.
*/
default Identifier[] listProcedures(String[] namespace) {
throw QueryExecutionErrors.unsupportedShowProceduresError();
}

/**
* List all procedures in the specified database matching the specified pattern.
*/
default Identifier[] listProcedures(String[] namespace, String pattern) {
Identifier[] procedures = listProcedures(namespace);
return Arrays.stream(procedures).filter(proc ->
StringUtils
.filterPattern(
CollectionConverters.asScala(Lists.newArrayList(proc.name())).toSeq(), pattern)
.nonEmpty())
.toArray(Identifier[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ import scala.util.{Failure, Random, Success, Try}
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.resolver.{
AnalyzerBridgeState,
HybridAnalyzer,
Resolver => OperatorResolver,
ResolverExtension,
ResolverGuard
}
import org.apache.spark.sql.catalyst.analysis.resolver.{AnalyzerBridgeState, HybridAnalyzer, Resolver => OperatorResolver, ResolverExtension, ResolverGuard}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6307,6 +6307,15 @@ class AstBuilder extends DataTypeAstBuilder
}
}

override def visitShowProcedures(ctx: ShowProceduresContext): LogicalPlan = withOrigin(ctx) {
val ns = if (ctx.identifierReference() != null) {
withIdentClause(ctx.identifierReference, UnresolvedNamespace(_))
} else {
CurrentNamespace
}
ShowProcedures(ns, Option(ctx.pattern).map(x => string(visitStringLit(x))))
}

/**
* Check plan for any parameters.
* If it finds any throws UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,3 +1664,22 @@ case class Call(
override protected def withNewChildInternal(newChild: LogicalPlan): Call =
copy(procedure = newChild)
}

/**
* The logical plan of the SHOW PROCEDURES command.
*/
case class ShowProcedures(
namespace: LogicalPlan,
pattern: Option[String],
override val output: Seq[Attribute] = ShowProcedures.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace

override protected def withNewChildInternal(newChild: LogicalPlan): ShowProcedures =
copy(namespace = newChild)
}

object ShowProcedures {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("namespace", StringType, nullable = false)(),
AttributeReference("name", StringType, nullable = false)())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2737,6 +2737,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
new SparkUnsupportedOperationException("UNSUPPORTED_FEATURE.PURGE_TABLE")
}

def unsupportedShowProceduresError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException("UNSUPPORTED_FEATURE.SHOW_PROCEDURES")
}

def raiseError(
errorClass: UTF8String,
errorParms: MapData): RuntimeException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp
procedure
}

override def listProcedures(namespace: Array[String]): Array[Identifier] = {
procedures.asScala.filter{case (_, p) => !p.name().equals("dummy_increment")}
.keySet.toArray
}

object UnboundIncrement extends UnboundProcedure {
override def name: String = "dummy_increment"
override def description: String = "test method to increment an in-memory counter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case c: Call =>
ExplainOnlySparkPlan(c) :: Nil

case ShowProcedures(ResolvedNamespace(catalog, ns, _), pattern, output) =>
ShowProceduresExec(output, catalog.asTableCatalog, ns, pattern) :: Nil

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.LeafExecNode

/**
* Physical plan node for showing procedures.
*/
case class ShowProceduresExec(
output: Seq[Attribute],
catalog: TableCatalog,
namespace: Seq[String],
pattern: Option[String]) extends V2CommandExec with LeafExecNode {
override protected def run(): Seq[InternalRow] = {
val procedureCatalog = catalog.asProcedureCatalog
val procedures = pattern match {
case Some(p) => procedureCatalog.listProcedures(namespace.toArray, p)
case _ => procedureCatalog.listProcedures(namespace.toArray)
}
procedures.toSeq.map(p => toCatalystRow(p.namespace().quoted, p.name()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ POSITION false
PRECEDING false
PRIMARY true
PRINCIPALS false
PROCEDURES true
PROPERTIES false
PURGE false
QUARTER false
Expand Down Expand Up @@ -443,6 +444,7 @@ ORDER
OUTER
OVERLAPS
PRIMARY
PROCEDURES
RECURSIVE
REFERENCES
RIGHT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ POSITION false
PRECEDING false
PRIMARY false
PRINCIPALS false
PROCEDURES false
PROPERTIES false
PURGE false
QUARTER false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ POSITION false
PRECEDING false
PRIMARY false
PRINCIPALS false
PROCEDURES false
PROPERTIES false
PURGE false
QUARTER false
Expand Down
Loading