Skip to content

Commit

Permalink
rdar://84031512 Add stored procedure API (apache#1801)
Browse files Browse the repository at this point in the history
* rdar://84031512 Add stored procedure API

* Try to fix scala 2.13 build

* Trigger build

Co-authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
  • Loading branch information
3 people authored and GitHub Enterprise committed Jun 22, 2023
1 parent dfecec2 commit 1be6df3
Show file tree
Hide file tree
Showing 19 changed files with 720 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ Below is a list of all the keywords in Spark SQL.
|CACHE|non-reserved|non-reserved|non-reserved|
|CASCADE|non-reserved|non-reserved|non-reserved|
|CASE|reserved|non-reserved|reserved|
|CALL|reserved|non-reserved|reserved|
|CAST|reserved|non-reserved|reserved|
|CATALOG|non-reserved|non-reserved|non-reserved|
|CATALOGS|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ BUCKET: 'BUCKET';
BUCKETS: 'BUCKETS';
BY: 'BY';
CACHE: 'CACHE';
CALL: 'CALL';
CASCADE: 'CASCADE';
CASE: 'CASE';
CAST: 'CAST';
Expand Down Expand Up @@ -415,6 +416,7 @@ COLON: ':';
ARROW: '->';
HENT_START: '/*+';
HENT_END: '*/';
ARG_ARROW: '=>';

STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ statement
| SHOW CREATE TABLE multipartIdentifier (AS SERDE)? #showCreateTable
| SHOW CURRENT namespace #showCurrentNamespace
| SHOW CATALOGS (LIKE? pattern=stringLit)? #showCatalogs
| CALL multipartIdentifier
LEFT_PAREN (callArgument (COMMA callArgument)*)? RIGHT_PAREN #call
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) namespace EXTENDED?
multipartIdentifier #describeNamespace
Expand Down Expand Up @@ -845,6 +847,11 @@ transformArgument
| constant
;

callArgument
: expression #positionalArgument
| identifier ARG_ARROW expression #namedArgument
;

expression
: booleanExpression
;
Expand Down Expand Up @@ -1465,6 +1472,7 @@ nonReserved
| BUCKETS
| BY
| CACHE
| CALL
| CASCADE
| CASE
| CAST
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;

/**
* An interface representing a stored procedure available for execution.
*/
public interface Procedure {
/**
* Returns the input parameters of this procedure.
*/
ProcedureParameter[] parameters();

/**
* Returns the type of rows produced by this procedure.
*/
StructType outputType();

/**
* Executes this procedure.
* <p>
* Spark will align the provided arguments according to the input parameters
* defined in {@link #parameters()} either by position or by name before execution.
* <p>
* Implementations may provide a summary of execution by returning one or many rows
* as a result. The schema of output rows must match the defined output type
* in {@link #outputType()}.
*
* @param args input arguments
* @return the result of executing this procedure with the given arguments
*/
InternalRow[] call(InternalRow args);

/**
* Returns the description of this procedure.
*/
default String description() {
return this.getClass().toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;

/**
* A catalog API for working with stored procedures.
* <p>
* Implementations should implement this interface if they expose stored procedures that
* can be called via CALL statements.
*/
public interface ProcedureCatalog extends CatalogPlugin {
/**
* Load a {@link Procedure stored procedure} by {@link Identifier identifier}.
*
* @param ident a stored procedure identifier
* @return the stored procedure
* @throws NoSuchProcedureException if there is no matching stored procedure
*/
Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.types.DataType;

/**
* An input parameter of a {@link Procedure stored procedure}.
*/
public interface ProcedureParameter {

/**
* Creates a required input parameter.
*
* @param name the name of the parameter
* @param dataType the type of the parameter
* @return the constructed stored procedure parameter
*/
static ProcedureParameter required(String name, DataType dataType) {
return new ProcedureParameterImpl(name, dataType, true);
}

/**
* Creates an optional input parameter.
*
* @param name the name of the parameter.
* @param dataType the type of the parameter.
* @return the constructed optional stored procedure parameter
*/
static ProcedureParameter optional(String name, DataType dataType) {
return new ProcedureParameterImpl(name, dataType, false);
}

/**
* Returns the name of this parameter.
*/
String name();

/**
* Returns the type of this parameter.
*/
DataType dataType();

/**
* Returns true if this parameter is required.
*/
boolean required();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Objects;

import org.apache.spark.sql.types.DataType;

/**
* A {@link ProcedureParameter} implementation.
*/
class ProcedureParameterImpl implements ProcedureParameter {
private final String name;
private final DataType dataType;
private final boolean required;

ProcedureParameterImpl(String name, DataType dataType, boolean required) {
this.name = name;
this.dataType = dataType;
this.required = required;
}

@Override
public String name() {
return name;
}

@Override
public DataType dataType() {
return dataType;
}

@Override
public boolean required() {
return required;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}

ProcedureParameterImpl that = (ProcedureParameterImpl) other;
return required == that.required &&
Objects.equals(name, that.name) &&
Objects.equals(dataType, that.dataType);
}

@Override
public int hashCode() {
return Objects.hash(name, dataType, required);
}

@Override
public String toString() {
return String.format(
"ProcedureParameter(name='%s', type=%s, required=%b)",
name, dataType, required);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
KeepLegacyOutputs),
Batch("Resolution", fixedPoint,
new ResolveCatalogs(catalogManager) ::
ResolveProcedures(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
ResolveRelations ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.analysis.TypeCoercion.numericPrecedence
import org.apache.spark.sql.catalyst.analysis.TypeCoercion.{numericPrecedence, ProcedureArgumentCoercion}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -76,6 +76,7 @@ object AnsiTypeCoercion extends TypeCoercionBase {
override def typeCoercionRules: List[Rule[LogicalPlan]] =
UnpivotCoercion ::
WidenSetOperationTypes ::
ProcedureArgumentCoercion ::
new AnsiCombinedTypeCoercionRule(
InConversion ::
PromoteStrings ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ class NoSuchIndexException private(
messageParameters = messageParameters) {

def this(
errorClass: String,
messageParameters: Map[String, String],
cause: Option[Throwable]) = {
errorClass: String,
messageParameters: Map[String, String],
cause: Option[Throwable]) = {
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters),
cause,
Expand All @@ -269,3 +269,11 @@ class NoSuchIndexException private(
this(message, cause, errorClass = None, messageParameters = Map.empty[String, String])
}
}

class NoSuchProcedureException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause) {

def this(ident: Identifier) = {
this(s"Procedure ${ident.quoted} not found")
}
}
Loading

0 comments on commit 1be6df3

Please sign in to comment.