-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector read-only and to introduce WritableColumnVector. #18958
Conversation
Test build #80720 has finished for PR 18958 at commit
|
Jenkins, retest this please. |
Test build #80723 has finished for PR 18958 at commit
|
assert (!columns[ordinal].isConstant); | ||
columns[ordinal].putNotNull(rowId); | ||
columns[ordinal].putBoolean(rowId, value); | ||
((MutableColumnVector) columns[ordinal]).putNotNull(rowId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move the assertion and the cast in a private getter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll add a private getter and update these.
" to false."; | ||
|
||
if (cause != null) { | ||
throw new RuntimeException(message, cause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are allowed to pass null
as a cause to the RuntimeException
constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I'll update it.
public OffHeapColumnVector(int capacity, DataType type) { | ||
super(capacity, type); | ||
|
||
if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try to move this initialization logic into the parent class? We should be able to factor out the on/off-heap specific initialization logic into a separate method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll try it.
* Reserve a integer column for ids of dictionary. | ||
*/ | ||
@Override | ||
public OffHeapColumnVector reserveDictionaryIds(int capacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as in the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll try it.
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); | ||
| for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { | ||
| aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length})); | ||
| batchVectors = new org.apache.spark.sql.execution.vectorized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens quite a few times. It might be better to create a static util method that creates the vectors for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll try it.
| } | ||
| // TODO: Possibly generate this projection in HashAggregate directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry but I'm not sure because this is from original code.
* elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas), | ||
* the lengths are known up front. | ||
* | ||
* A ColumnVector should be considered immutable once originally created. In other words, it is not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This contradicts the name of this class. Maybe reuseable is a better way of describing what is going on here. Also cc @michal-databricks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about WritableColumnVector
?
On a more generic level. We could also choose to make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this @ueshin! From the perspective of using this for ArrowColumnVector batches, LGTM. I just had one question about removing the capacity
var from ColumnarBatch
, I think we can get away with just using numRows
.
this.capacity = maxRows; | ||
this.columns = new ColumnVector[schema.size()]; | ||
this.columns = columns; | ||
this.capacity = capacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does capacity
really mean anything in here anymore since the ColumnVectors
are allocated and populated outside now? Could we just initialize this.numRows = 0
and delay initializing of this.filteredRows
until setNumRows()
is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found some places referring ColumnarBatch.capacity()
, so I'd be a little conservative to do that for now.
also cc @kiszk for another column vector pr. |
Test build #80766 has finished for PR 18958 at commit
|
Jenkins, retest this please. |
Test build #80775 has finished for PR 18958 at commit
|
@@ -450,14 +450,13 @@ class CodegenContext { | |||
/** | |||
* Returns the specialized code to set a given value in a column vector for a given `DataType`. | |||
*/ | |||
def setValue(batch: String, row: String, dataType: DataType, ordinal: Int, | |||
value: String): String = { | |||
def setValue(vector: String, row: String, dataType: DataType, value: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: row
-> rowId
@@ -433,7 +434,8 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE | |||
} | |||
|
|||
private void readFixedLenByteArrayBatch(int rowId, int num, | |||
ColumnVector column, int arrayLen) throws IOException { | |||
MutableColumnVector column, | |||
int arrayLen) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
private void xxx(
para1: XX,
para2: XX)
anyNullsSet = false; | ||
} | ||
} | ||
public abstract void reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ColumnVector
is read-only, why we need a reset
API?
this.resultArray = null; | ||
this.resultStruct = null; | ||
} | ||
this.isConstant = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think isConstant
should belong to MutableColumnVector
, because it's used to indicate that this column vector should not be updated.
assert (!columns[ordinal].isConstant); | ||
columns[ordinal].putNotNull(rowId); | ||
columns[ordinal].putByte(rowId, value); | ||
MutableColumnVector column = getColumnAsMutable(ordinal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little afraid about this per-call type cast, but JVM should be able to optimize it perfectly, cc @kiszk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my understanding, cast still occurs at runtime. The cast operation may consist compare and branch.
I am thinking about how we can reduce the cost of operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin @cloud-fan
Since MutableColumnVector
in each column in ColumnarBatch
is immutable, we can create an array of MutableColumnVector
by applying cast from ColumnVector
at initialization. If an cast exception occurs, we can ignore it since the column will not call setter APIs. Then, each setter in refers to an element of the array without a cast.
What do you think?
*/ | ||
protected int elementsAppended; | ||
|
||
/** | ||
* If this is a nested type (array or struct), the column for the child data. | ||
*/ | ||
protected ColumnVector[] childColumns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this to WritableColumnVector
? I think ColumnVector
only need ColumnVector getChildColumn(int ordinal)
, and WritableColumnVector
can overwrite it to WritableColumnVector getChildColumn(int ordinal)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this field for ArrowColumnVector
to store its child columns for now, too.
Do you want to make the method getChildColumn(int ordinal)
abstract and move the field to more concrete classes to manage by themselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, because mostly the child columns are of the same type of concrete column vector type.
@@ -307,64 +293,70 @@ public void update(int ordinal, Object value) { | |||
|
|||
@Override | |||
public void setNullAt(int ordinal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one question, does the rows returned by ColumnarBatch.rowIterator
have to be mutable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the rows returned by ColumnarBatch.rowIterator
doesn't need to be mutable with our current tests, but ColumnarBatch.Row
still needs to be mutable, the write apis of which are used in HashAggregateExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, then we really need to think about how to eliminate the per-call type cast...
Test build #80918 has finished for PR 18958 at commit
|
Jenkins, retest this please. |
Test build #80923 has finished for PR 18958 at commit
|
/** | ||
* Initialize child columns. | ||
*/ | ||
protected void initialize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move this method into the constructor.
Test build #80962 has finished for PR 18958 at commit
|
retest this please |
Test build #80968 has finished for PR 18958 at commit
|
Test build #80979 has finished for PR 18958 at commit
|
Jenkins, retest this please. |
Test build #80987 has finished for PR 18958 at commit
|
column.putDecimal(rowId, value, precision); | ||
} | ||
|
||
private WritableColumnVector getColumnAsWritable(int ordinal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getWritableColumn
LGTM except some minor comments |
Test build #81038 has finished for PR 18958 at commit
|
LGTM |
thanks, merging to master! |
What changes were proposed in this pull request?
This is a refactoring of
ColumnVector
hierarchy and related classes.ColumnVector
read-onlyWritableColumnVector
with write interfaceReadOnlyColumnVector
How was this patch tested?
Existing tests.