Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors #18787

Conversation

BryanCutler
Copy link
Member

What changes were proposed in this pull request?

This PR allows the creation of a ColumnarBatch from ReadOnlyColumnVectors where previously a columnar batch could only allocate vectors internally. This is useful for using ArrowColumnVectors in a batch form to do row-based iteration. Also added ArrowConverter.fromPayloadIterator which converts ArrowPayload iterator to InternalRow iterator and uses a ColumnarBatch internally.

How was this patch tested?

Added a new unit test for creating a ColumnarBatch with ReadOnlyColumnVectors and a test to verify the roundtrip of rows -> ArrowPayload -> rows, using toPayloadIterator and fromPayloadIterator.

@SparkQA
Copy link

SparkQA commented Jul 31, 2017

Test build #80094 has finished for PR 18787 at commit f35b92c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2017

Test build #80099 has finished for PR 18787 at commit 43214b1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 1, 2017

Test build #80108 has finished for PR 18787 at commit f906156.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ReadOnlyColumnVector[] columns,
int numRows) {
for (ReadOnlyColumnVector c: columns) {
assert(c.capacity >= numRows);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any good way to move this assert into other loop?
I am afraid that the loop with no body is executed in a production.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should throw an exception then?


public static ColumnarBatch createReadOnly(
StructType schema,
ReadOnlyColumnVector[] columns,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to restrict this to only ReadOnlyColumnVector?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary? What impact will it cause?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be restricted, but if they are ReadOnlyColumnVectors then it means they are already populated and it is safe to call setNumRows(numRows) here. If it took in any ColumnVector then it might cause issues by someone passing in unallocated vectors.

return batch;
}

private static ColumnarBatch create(StructType schema, ColumnVector[] columns, int capacity) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin , if we want to allow creating a ColumnarBatch from any Array of ColumnVectors then we could make this public as it doesn't call setNumRows and assume they are allocated already

@BryanCutler
Copy link
Member Author

@cloud-fan @icexelloss, this just adds the ability to create a ColumnarBatch with a row iterator from Arrow data. It should be usable for any vectorized UDF implementation, and I already tried it out in #18659 and it works quite well. Let me know if it works for you, thanks!

close()
}

private var _batch: ColumnarBatch = _
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: not needed

@SparkQA
Copy link

SparkQA commented Aug 9, 2017

Test build #80430 has finished for PR 18787 at commit 23d19df.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Aug 9, 2017

Test build #80438 has finished for PR 18787 at commit 23d19df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

int numRows) {
assert(schema.length() == columns.length);
ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
batch.setNumRows(numRows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check each ReadOnlyColumnVector has numRows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ArrowColumnVector.valueCount here would need to be moved to ReadOnlyColumnVector which could go in place of the capacity. If @ueshin thinks that's ok to do so here, I can add that.

ReadOnlyColumnVector[] columns,
int numRows) {
assert(schema.length() == columns.length);
ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the capacity is set to numRows inside the ctor but need to call batch.setNumRows() manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max capacity only has meaning when allocating ColumnVectors so it doesn't really do
anything for read-only vectors. You need to callsetNumRows to tell the batch how many rows there for the given columns, it doesn't look at the capacity in the individual vectors.

@BryanCutler
Copy link
Member Author

@ueshin @cloud-fan , what are your thoughts on merging this to enable ArrowColumnVector to be used in a batch?

@cloud-fan
Copy link
Contributor

Actually I think ReadOnlyColumnVector may not be a good abstraction. Ideally ColumnVector should be read only, and then we have a MutableColumnVector with write interfaces. @ueshin is working on it and will send the PR soon, can we hold this patch for a while? Thanks!

@BryanCutler
Copy link
Member Author

Yes, I agree with changing the interfaces as you suggest @cloud-fan , is there currently a JIRA open for that? I'm ok with holding off if it's planned to be soon, but I would like to get started on SPARK-20791 that will create a Spark DataFrame from Pandas with Arrow, which depends on this also. I don't think the changes you are suggesting would affect this PR much, just renaming the classes used. Any chance we can merge this first?

@BryanCutler
Copy link
Member Author

Updated to use the new API for ColumnarBatch, please take a look @ueshin @cloud-fan

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81122 has finished for PR 18787 at commit a90a71b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

new ArrowRowIterator {
private var reader: ArrowFileReader = null
private var schemaRead = StructType(Seq.empty)
private var rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply put Iterator.empty here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextBatch() returns the row iterator, so rowIter needs to be initialized here to the first row in the first batch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I thought the first call of hasNext would initialize it.

@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
s"vectorized reader"))
}
}

test("create read-only batch") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a columnar batch from Arrow column vectors or something?

batch.getRow(100)
}

columnVectors.foreach(_.close())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use batch.close() here.


val schema = StructType(Seq(StructField("int", IntegerType)))

val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, do we need to use ColumnarBatch for this test?
I guess we can simply create Iterator[InternalRow] and use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean just calling something like new ColumnarBatch(..).rowIterator()? We still need to set the number of rows in the batch I believe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you mean not using ArrowColumnVector at all and just make an Iterator[InternalRow] some other way? That would probably work, but I figured why not test out the columnar batch this way also.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant your second comment.
We do test the columnar batch with ArrowColumnVector in ColumnarBatchSuite and also we use it in ArrowConverters.fromPayloadIterator(), so I thought we don't need to use it here.

test("roundtrip payloads") {
val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue)
val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true)
.createVector(allocator).asInstanceOf[NullableIntVector]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the allocator and the vector be closed at the end of this test?

Copy link
Member Author

@BryanCutler BryanCutler Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks for catching that. I close them now.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81225 has finished for PR 18787 at commit 3fcdec5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

@ueshin I updated and had a couple of questions on your comments, please take a look, thanks!

@BryanCutler
Copy link
Member Author

@ueshin I updated the test to use a seq of Rows now

@ueshin
Copy link
Member

ueshin commented Aug 31, 2017

LGTM, pending Jenkins.

@SparkQA
Copy link

SparkQA commented Aug 31, 2017

Test build #81270 has finished for PR 18787 at commit ffcbf75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Aug 31, 2017

Thanks! merging to master.

@asfgit asfgit closed this in 964b507 Aug 31, 2017
@BryanCutler
Copy link
Member Author

Thanks @ueshin!

}

intercept[java.lang.AssertionError] {
batch.getRow(100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that is strange. I'll take a look, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably because the assert is being compiled out.. This should probably not be in the test then.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, please check the error message here. Please ignore this.

Copy link
Member Author

@BryanCutler BryanCutler Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that if the Java assertion is compiled out, then no error is produced and the test fails.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just made #19098 to remove this check - it's not really testing the functionality added here anyway but maybe another test should be added for checkout index out of bounds errors.

@BryanCutler BryanCutler deleted the arrow-ColumnarBatch-support-SPARK-21583 branch March 6, 2018 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants