Skip to content

Commit

Permalink
[SPARK-29423][SS] lazily initialize StreamingQueryManager in SessionS…
Browse files Browse the repository at this point in the history
…tate

### What changes were proposed in this pull request?

This PR makes `SessionState` lazily initialize `StreamingQueryManager` to avoid constructing  `StreamingQueryManager` for each session when connecting to ThriftServer.

### Why are the changes needed?

Reduce memory usage.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?
manual test
1. Start thriftserver:
```
build/sbt clean package -Phive -Phadoop-3.2 -Phive-thriftserver
export SPARK_PREPEND_CLASSES=true
sbin/start-thriftserver.sh
```
2. Open a session:
```
bin/beeline -u jdbc:hive2://localhost:10000
```
3. Check `StreamingQueryManager` instance:
```
jcmd | grep HiveThriftServer2 | awk -F ' ' '{print $1}' | xargs jmap -histo | grep StreamingQueryManager
```

**Before this PR**:
```
[rootspark-3267648 spark]# jcmd | grep HiveThriftServer2 | awk -F ' ' '{print $1}' | xargs jmap -histo | grep StreamingQueryManager
1954:             2             96  org.apache.spark.sql.streaming.StreamingQueryManager
```

**After this PR**:
```
[rootspark-3267648 spark]# jcmd | grep HiveThriftServer2 | awk -F ' ' '{print $1}' | xargs jmap -histo | grep StreamingQueryManager
[rootspark-3267648 spark]#
```

Closes #26089 from wangyum/SPARK-29423.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
wangyum authored and dongjoon-hyun committed Oct 16, 2019
1 parent 51f10ed commit e00344e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ abstract class BaseSessionStateBuilder(
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
() => streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListen
* unresolved attributes and relations.
* @param optimizerBuilder a function to create the logical query plan optimizer.
* @param planner Planner that converts optimized logical plans to physical plans.
* @param streamingQueryManager Interface to start and stop streaming queries.
* @param streamingQueryManagerBuilder A function to create a streaming query manager to
* start and stop streaming queries.
* @param listenerManager Interface to register custom [[QueryExecutionListener]]s.
* @param resourceLoaderBuilder a function to create a session shared resource loader to load JARs,
* files, etc.
Expand All @@ -67,7 +68,7 @@ private[sql] class SessionState(
analyzerBuilder: () => Analyzer,
optimizerBuilder: () => Optimizer,
val planner: SparkPlanner,
val streamingQueryManager: StreamingQueryManager,
val streamingQueryManagerBuilder: () => StreamingQueryManager,
val listenerManager: ExecutionListenerManager,
resourceLoaderBuilder: () => SessionResourceLoader,
createQueryExecution: LogicalPlan => QueryExecution,
Expand All @@ -83,6 +84,10 @@ private[sql] class SessionState(

lazy val resourceLoader: SessionResourceLoader = resourceLoaderBuilder()

// The streamingQueryManager is lazy to avoid creating a StreamingQueryManager for each session
// when connecting to ThriftServer.
lazy val streamingQueryManager: StreamingQueryManager = streamingQueryManagerBuilder()

def catalogManager: CatalogManager = analyzer.catalogManager

def newHadoopConf(): Configuration = SessionState.newHadoopConf(
Expand Down

0 comments on commit e00344e

Please sign in to comment.