Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22143][SQL] Fix memory leak in OffHeapColumnVector #19367

Closed
wants to merge 3 commits into from

Conversation

hvanhovell
Copy link
Contributor

What changes were proposed in this pull request?

WriteableColumnVector does not close its child column vectors. This can create memory leaks for OffHeapColumnVector where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).

How was this patch tested?

I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnoses was done locally.

@hvanhovell
Copy link
Contributor Author

cc @ala @michal-databricks @ueshin

@@ -85,6 +85,7 @@ public long nullsNativeAddress() {

@Override
public void close() {
super.close();
Platform.freeMemory(nulls);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not related to this fix directly. However, is it better to check whether each field is not 0 before calling Platform.freeMemory()? For example, data or lengthData/offsetData may be 0`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is a bit weird. The doc in the JDK8's Unsafe. freeMemory() states the following: The address passed to this method may be null, in which case no action is taken. See: http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/sun/misc/Unsafe.java

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments.

dt: DataType)(
block: WritableColumnVector => Unit): Unit = {
test(name) {
val c1 = new OnHeapColumnVector(size, dt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this c1 for?

}

test("toArray for primitive types") {
// (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
(MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove a pair of brace like .foreach { memMode => ....

// (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
(MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
// (MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this line?

Copy link
Contributor

@michal-databricks michal-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@ala ala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just some nitpicks.

dt: DataType)(
block: WritableColumnVector => Unit): Unit = {
test(name) {
val c1 = new OnHeapColumnVector(size, dt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is c1 for?

test(name) {
modes.foreach { mode =>
val vector = allocate(size, dt, mode)
try block(vector, mode) finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way you use {} here is weird.

}

test("Float APIs") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
testVector("Float Apis", 1024, FloatType, MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing Apis to APIs would be better than the other way around.

testVector(
"Nest Struct in Array",
10,
new ArrayType(new StructType().add("int", IntegerType).add("long", LongType), true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean new ArrayType(structType)?

test("struct") {
val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
testVector = allocate(10, schema)
val structType: StructType = new StructType().add("int", IntegerType).add("double", DoubleType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we need : StructType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a class level field now, so scalastyle wants me type it...

name: String,
size: Int,
dt: DataType,
modes: MemoryMode*)(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do modes: Seq[MemoryMode] = Seq(MemoryMode.ON_HEAP, MemoryMode.ON_HEAP))(? Then, we can avoid repeat testVector(..., MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP) at each suite.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82243 has finished for PR 19367 at commit 4b494c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82247 has finished for PR 19367 at commit 6156758.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82248 has finished for PR 19367 at commit 84caf03.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82251 has finished for PR 19367 at commit 84caf03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

Merging to master.

@asfgit asfgit closed this in 02bb068 Sep 27, 2017
hvanhovell added a commit to hvanhovell/spark that referenced this pull request Sep 28, 2017
## What changes were proposed in this pull request?
`WriteableColumnVector` does not close its child column vectors. This can create memory leaks for `OffHeapColumnVector` where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).

## How was this patch tested?
I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnoses was done locally.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#19367 from hvanhovell/SPARK-22143.

# Conflicts:
#	sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
#	sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants