Skip to content

Commit

Permalink
[SPARK-29530][SQL] Make SQLConf in SQL parse process thread safe
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
As I have comment in  [SPARK-29516](#26172 (comment))
SparkSession.sql() method parse process not under current sparksession's conf, so some configuration about parser is not valid in multi-thread situation.

In this pr, we add a SQLConf parameter to AbstractSqlParser and initial it with SessionState's conf.
Then for each SparkSession's parser process. It will use's it's own SessionState's SQLConf and to be thread safe

### Why are the changes needed?
Fix bug

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
NO

Closes #26187 from AngersZhuuuu/SPARK-29530.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
AngersZhuuuu authored and cloud-fan committed Oct 22, 2019
1 parent 3d567a3 commit 484f93e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
/**
* Base SQL parsing infrastructure.
*/
abstract class AbstractSqlParser extends ParserInterface with Logging {
abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Logging {

/** Creates/Resolves DataType for a given SQL string. */
override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser =>
Expand Down Expand Up @@ -91,16 +91,16 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
lexer.ansi = SQLConf.get.ansiEnabled
lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
lexer.ansi = conf.ansiEnabled

val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
parser.ansi = SQLConf.get.ansiEnabled
parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced
parser.ansi = conf.ansiEnabled

try {
try {
Expand Down Expand Up @@ -134,12 +134,12 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
/**
* Concrete SQL parser for Catalyst-only SQL statements.
*/
class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser {
class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) {
val astBuilder = new AstBuilder(conf)
}

/** For test-only. */
object CatalystSqlParser extends AbstractSqlParser {
object CatalystSqlParser extends AbstractSqlParser(SQLConf.get) {
val astBuilder = new AstBuilder(SQLConf.get)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
/**
* Concrete parser for Spark SQL statements.
*/
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) {
val astBuilder = new SparkSqlAstBuilder(conf)

private val substitutor = new VariableSubstitution(conf)
Expand Down

0 comments on commit 484f93e

Please sign in to comment.