Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32802][SQL] Avoid using SpecificInternalRow in RunLengthEncoding#Encoder #29654

Closed
wants to merge 5 commits into from
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.nio.ByteOrder
import scala.collection.mutable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -182,8 +181,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
private var _uncompressedSize = 0
private var _compressedSize = 0

// Using `MutableRow` to store the last value to avoid boxing/unboxing cost.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the overhead of boxing is smaller than the overhead of indirections introduced by SpecificInternalRow?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark result does. Please let us know if we need other validation, @cloud-fan .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it, but it's better to spell out this key finding in the PR description, so that people don't need to infer it from the benchmark results by themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I also think this finding looks interesting. I'm not sure that this rule (the overhead of boxing is smaller than the overhead of indirections) always hold on any JVM implementations, but I think its importatnt to notice this performance property for future developement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments @cloud-fan @dongjoon-hyun and @maropu , and yes my bad for not putting enough info in the PR description. The comment is pretty old and I'm not sure what has changed over the years. By examine flamegraph from a test run I observed boxing both before and after the change:

Before:

Screen Shot 2020-09-14 at 10 30 05 AM

After:

Screen Shot 2020-09-14 at 10 37 05 AM

Even though the time spent on the boxing part is noticeably less after the change (it could have sth to do with my sample size though).

Perhaps we can open a JIRA to investigate this further? we could have more room for improvement if we can completely eliminate the boxing/unboxing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao It is hard to guess the difference of boxing between two flamegraphs, can you show the percentage of boxing in each flamegraph?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Sure. Here's complete graphs

Before:

Screen Shot 2020-09-14 at 11 45 43 AM

After:

Screen Shot 2020-09-14 at 11 45 52 AM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry @viirya just noticed you were asking for percentage. I calculated. Before the change it was 2.76% and after it is 1.31%.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice :-)

private val lastValue = new SpecificInternalRow(Seq(columnType.dataType))
private var lastValue: T#InternalType = _
private var lastRun = 0

override def uncompressedSize: Int = _uncompressedSize
Expand All @@ -195,16 +193,16 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
val actualSize = columnType.actualSize(row, ordinal)
_uncompressedSize += actualSize

if (lastValue.isNullAt(0)) {
columnType.copyField(row, ordinal, lastValue, 0)
if (lastValue == null) {
lastValue = columnType.clone(columnType.getField(row, ordinal))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need clone() here? While I guess that this is for UTF8String, lastValue is used only for keeping a value. Thus, it is ok with just keeping the reference for UTF8String.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk Yes it is because UTF8String. We reuse InternalRow during each iteration and since UTF8String is backed by the same memory region in the InternalRow, it will be updated each time as we load a new row, which is not correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thank you for your clarification.

lastRun = 1
_compressedSize += actualSize + 4
} else {
if (columnType.getField(lastValue, 0) == value) {
if (lastValue == value) {
lastRun += 1
} else {
_compressedSize += actualSize + 4
columnType.copyField(row, ordinal, lastValue, 0)
lastValue = columnType.clone(columnType.getField(row, ordinal))
lastRun = 1
}
}
Expand All @@ -214,30 +212,27 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
to.putInt(RunLengthEncoding.typeId)

if (from.hasRemaining) {
val currentValue = new SpecificInternalRow(Seq(columnType.dataType))
var currentRun = 1
val value = new SpecificInternalRow(Seq(columnType.dataType))

columnType.extract(from, currentValue, 0)
var currentValue = columnType.extract(from)

while (from.hasRemaining) {
columnType.extract(from, value, 0)
val value = columnType.extract(from)

if (value.get(0, columnType.dataType) == currentValue.get(0, columnType.dataType)) {
if (value == currentValue) {
currentRun += 1
} else {
// Writes current run
columnType.append(currentValue, 0, to)
columnType.append(currentValue, to)
to.putInt(currentRun)

// Resets current run
columnType.copyField(value, 0, currentValue, 0)
currentValue = value
currentRun = 1
}
}

// Writes the last run
columnType.append(currentValue, 0, to)
columnType.append(currentValue, to)
to.putInt(currentRun)
}

Expand Down