-
Notifications
You must be signed in to change notification settings - Fork 265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(java): support Lance Spark Batch Write #2500
Conversation
|
||
import java.util.Map; | ||
|
||
public class LanceCatalog implements TableCatalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review
/** | ||
* A custom arrow reader that supports writes Spark internal rows while reading data in batches. | ||
*/ | ||
public class InternalRowWriterArrowReader extends ArrowReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear what does this class do. RowWriter
vs ArrowReader
?
Can we have a better name?
Is this used by spark.write
or spark.read
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can we use package public for this class?
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.FutureTask; | ||
|
||
public class LanceDataWriter implements DataWriter<InternalRow> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
||
public class SparkWriteTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review
} | ||
arrowReader.setFinished(); | ||
} catch (Exception e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one swallows the exception?
// If the following code impacts performance, can be removed | ||
VectorSchemaRoot root = this.getVectorSchemaRoot(); | ||
VectorUnloader unloader = new VectorUnloader(root); | ||
try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A RecordBatch
is just consumed to update the totalBytesRead
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is no better way to compute bytes read. Alternative approach is to let bytesRead() throws unsupportedOperationException, in my testing, bytesRead is never called
} | ||
|
||
@Override | ||
public WriterCommitMessage commit() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is commit at at the end of the full write? If so, we will accumulate too much data in arrowReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current sequence is
- create arrow reader
- start the reader thread and execute Fragment.create(reader)
- write to the arrow reader
- reader thread (fragment.create) consumes data in batches
- commit which notify there is no more new data and wait for reader thread (fragment.create) finishes
return getSchema(config.getTablePath()); | ||
} | ||
|
||
public static Optional<StructType> getSchema(String tablePath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look right. So we have table
concept and Dataset
concept here. We need to be consistent.
Also, can we just call this class LanceDataset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class is for having a central place to put allocator and put all methods requires allocator in the same space
The LanceTable has been renamed to LanceDataset which allows user to read or write a single lance dataset
} | ||
} | ||
|
||
public static void createTable(String datasetUri, StructType sparkSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be consistent of dataset
or table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use dataset
@eddyxu PTAL, thanks |
Merge this one for now.
|
Add Lance Spark Batch Write Support.
Add a simple Lance catalog to support create table.
Batch write writes chunk to ArrowReader and zero-copy to Lance core.
TODO add the test cases for InternalRowWriterArrowReader
TODO larger scale manual testing